Mercurial > gemma
changeset 1981:e89368aec538
Import: Make one request per gauge for gauge measurement data.
author | Raimund Renkert <raimund.renkert@intevation.de> |
---|---|
date | Wed, 23 Jan 2019 15:02:50 +0100 |
parents | c8e2f6838eaf |
children | a7e47a9d890b |
files | pkg/imports/gm.go |
diffstat | 1 files changed, 95 insertions(+), 94 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/gm.go Wed Jan 23 14:17:14 2019 +0100 +++ b/pkg/imports/gm.go Wed Jan 23 15:02:50 2019 +0100 @@ -16,7 +16,6 @@ import ( "context" "database/sql" - "errors" "fmt" "strings" "time" @@ -233,110 +232,112 @@ client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) - var idPairs []*nts.Id_pair - for _, g := range gauges { - isrs := g.Gauge.String() - isrsID := nts.Isrs_code_type(isrs) - idPairs = append(idPairs, &nts.Id_pair{ - Id: &isrsID, - }) - } - mt := nts.Message_type_typeWRM - req := &nts.Get_messages_query{ - Message_type: &mt, - Ids: idPairs, - } - resp, err := client.Get_messages(req) - if err != nil { - feedback.Error("%v", err) - return nil, err - } - - if resp.Result_message == nil { - err := errors.New("no gauge measurements found") - for _, e := range resp.Result_error { - if e != nil { - feedback.Error("%v", *e) - } else { - feedback.Error("unknown") - } - } - return nil, err - } - - result := resp.Result_message - tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() - insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) - if err != nil { - return nil, err - } - defer insertStmt.Close() + mt := nts.Message_type_typeWRM - var gid int64 var gids []string - for _, msg := range result { - feedback.Info("Found %d gauges with measurements", len(msg.Wrm)) - for _, wrm := range msg.Wrm { - currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) - if err != nil { - feedback.Warn("Invalid ISRS code %v", err) - continue - } - var referenceCode string - if wrm.Reference_code == nil { - feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") - referenceCode = "ZPG" - } else { - referenceCode = string(*wrm.Reference_code) - } - for _, measure := range wrm.Measure { - var unit string - if measure.Unit == nil { - feedback.Info("'Unit' not specified. Assuming 'cm'") - unit = "cm" + for _, g := range gauges { + + var idPair []*nts.Id_pair + isrs := g.Gauge.String() + isrsID := nts.Isrs_code_type(isrs) + idPair = append(idPair, &nts.Id_pair{ + Id: &isrsID, + }) + + req := &nts.Get_messages_query{ + Message_type: &mt, + Ids: idPair, + } + + resp, err := client.Get_messages(req) + if err != nil { + feedback.Error("%v", err) + return nil, err + } + + if resp.Result_message == nil { + for _, e := range resp.Result_error { + if e != nil { + feedback.Error("No gauge measurements found for %s", g.Gauge.String()) } else { - unit = string(*measure.Unit) - } - convert, err := rescale(unit) - if err != nil { - return nil, err - } - isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL - err = insertStmt.QueryRowContext( - ctx, - currIsrs.CountryCode, - currIsrs.LoCode, - currIsrs.FairwaySection, - currIsrs.Orc, - currIsrs.Hectometre, - measure.Measuredate, - msg.Identification.From, - msg.Identification.Language_code, - msg.Identification.Country_code, - msg.Identification.Date_issue, - referenceCode, - convert(measure.Value), - measure.Predicted, - isWaterlevel, - convert(measure.Value_min), - convert(measure.Value_max), - msg.Identification.Date_issue, - msg.Identification.Originator, - true, // staging_done - ).Scan(&gid) - if err != nil { - return nil, err + feedback.Error("unknown") } } - feedback.Info("Inserted %d measurements for %s", - len(wrm.Measure), currIsrs) - gids = append(gids, currIsrs.String()) + continue + } + result := resp.Result_message + + insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + for _, msg := range result { + var gid int64 + feedback.Info("Found measurements for %s", g.Gauge.String()) + for _, wrm := range msg.Wrm { + currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) + if err != nil { + feedback.Warn("Invalid ISRS code %v", err) + continue + } + var referenceCode string + if wrm.Reference_code == nil { + feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") + referenceCode = "ZPG" + } else { + referenceCode = string(*wrm.Reference_code) + } + for _, measure := range wrm.Measure { + var unit string + if measure.Unit == nil { + feedback.Info("'Unit' not specified. Assuming 'cm'") + unit = "cm" + } else { + unit = string(*measure.Unit) + } + convert, err := rescale(unit) + if err != nil { + return nil, err + } + isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL + err = insertStmt.QueryRowContext( + ctx, + currIsrs.CountryCode, + currIsrs.LoCode, + currIsrs.FairwaySection, + currIsrs.Orc, + currIsrs.Hectometre, + measure.Measuredate, + msg.Identification.From, + msg.Identification.Language_code, + msg.Identification.Country_code, + msg.Identification.Date_issue, + referenceCode, + convert(measure.Value), + measure.Predicted, + isWaterlevel, + convert(measure.Value_min), + convert(measure.Value_max), + msg.Identification.Date_issue, + msg.Identification.Originator, + true, // staging_done + ).Scan(&gid) + if err != nil { + return nil, err + } + } + feedback.Info("Inserted %d measurements for %s", + len(wrm.Measure), currIsrs) + gids = append(gids, currIsrs.String()) + } } } feedback.Info("Storing gauge measurements took %s", time.Since(start))