Mercurial > gemma
changeset 2242:786c3fb7efe1
Gauge measurements import: Re-factored to be re-usable for upcoming uploaded gauge measurements.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 13 Feb 2019 15:38:24 +0100 |
parents | 5529e1f08dba |
children | 783c1454ab47 |
files | pkg/imports/gm.go |
diffstat | 1 files changed, 133 insertions(+), 100 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/gm.go Wed Feb 13 15:16:12 2019 +0100 +++ b/pkg/imports/gm.go Wed Feb 13 15:38:24 2019 +0100 @@ -17,6 +17,7 @@ "context" "database/sql" "fmt" + "sort" "strings" "time" @@ -137,7 +138,51 @@ // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } -func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) { +// Do executes the actual bottleneck import. +func (gm *GaugeMeasurement) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + fetch := func() ([]*nts.RIS_Message_Type, error) { + client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) + + mt := nts.Message_type_typeWRM + + req := &nts.Get_messages_query{ + Message_type: &mt, + } + + resp, err := client.Get_messages(req) + if err != nil { + return nil, err + } + + result := resp.Result_message + if result == nil { + for _, e := range resp.Result_error { + if e != nil { + feedback.Error("Error code: %s", *e) + } else { + feedback.Error("Unknown error") + } + } + } + return result, nil + } + + return storeGaugeMeasurements( + ctx, + importID, + fetch, + conn, + feedback, + ) +} + +func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) { rows, err := tx.QueryContext(ctx, listGaugesSQL) if err != nil { @@ -145,33 +190,35 @@ } defer rows.Close() - var gauges []*gauge + var gauges []string for rows.Next() { - var g gauge + var g models.Isrs if err = rows.Scan( - &g.location.CountryCode, - &g.location.LoCode, - &g.location.FairwaySection, - &g.location.Orc, - &g.location.Hectometre, + &g.CountryCode, + &g.LoCode, + &g.FairwaySection, + &g.Orc, + &g.Hectometre, ); err != nil { return nil, err } - gauges = append(gauges, &g) + gauges = append(gauges, g.String()) } if err = rows.Err(); err != nil { return nil, err } + sort.Strings(gauges) + return gauges, nil } -// Do executes the actual bottleneck import. -func (gm *GaugeMeasurement) Do( +func storeGaugeMeasurements( ctx context.Context, importID int64, + fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) (interface{}, error) { @@ -191,7 +238,7 @@ } // TODO get date_issue for selected gauges - gids, err := gm.doForGM(ctx, gauges, tx, feedback) + gids, err := doForGM(ctx, gauges, fetch, tx, feedback) if err != nil { feedback.Error("Error processing %d gauges: %v", len(gauges), err) return nil, err @@ -245,110 +292,96 @@ return fn, nil } -func (gm *GaugeMeasurement) doForGM( +func doForGM( ctx context.Context, - gauges []*gauge, + gauges []string, + fetch func() ([]*nts.RIS_Message_Type, error), tx *sql.Tx, feedback Feedback, ) ([]string, error) { - client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) - - mt := nts.Message_type_typeWRM - insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() - var gids []string - for _, g := range gauges { + // lookup to see if we have gauges in the database. + isKnown := func(s string) bool { + idx := sort.SearchStrings(gauges, s) + return idx < len(gauges) && gauges[idx] == s + } + + result, err := fetch() + if err != nil { + return nil, err + } - isrsID := nts.Isrs_code_type(g.location.String()) - idPair := []*nts.Id_pair{{Id: &isrsID}} - - req := &nts.Get_messages_query{ - Message_type: &mt, - Ids: idPair, - } + var gids []string + for _, msg := range result { + var gid int64 + for _, wrm := range msg.Wrm { + curr := string(*wrm.Geo_object.Id) + currIsrs, err := models.IsrsFromString(curr) + if err != nil { + feedback.Warn("Invalid ISRS code %v", err) + continue + } + feedback.Info("Found measurements for %s", curr) + if !isKnown(curr) { + feedback.Warn("Gauge '%s' is not in database.", curr) + continue + } - resp, err := client.Get_messages(req) - if err != nil { - return nil, err - } - - if resp.Result_message == nil { - for _, e := range resp.Result_error { - if e != nil { - feedback.Error("No gauge measurements found for %s", g.location) + var referenceCode string + if wrm.Reference_code == nil { + feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") + referenceCode = "ZPG" + } else { + referenceCode = string(*wrm.Reference_code) + } + for _, measure := range wrm.Measure { + var unit string + if measure.Unit == nil { + feedback.Info("'Unit' not specified. Assuming 'cm'") + unit = "cm" } else { - feedback.Error("unknown") + unit = string(*measure.Unit) + } + convert, err := rescale(unit) + if err != nil { + return nil, err + } + isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL + err = insertStmt.QueryRowContext( + ctx, + currIsrs.CountryCode, + currIsrs.LoCode, + currIsrs.FairwaySection, + currIsrs.Orc, + currIsrs.Hectometre, + measure.Measuredate, + msg.Identification.From, + msg.Identification.Language_code, + msg.Identification.Country_code, + msg.Identification.Date_issue, + referenceCode, + convert(measure.Value), + measure.Predicted, + isWaterlevel, + convert(measure.Value_min), + convert(measure.Value_max), + msg.Identification.Date_issue, + msg.Identification.Originator, + true, // staging_done + ).Scan(&gid) + if err != nil { + return nil, err } } - continue - } - result := resp.Result_message - - for _, msg := range result { - var gid int64 - feedback.Info("Found measurements for %s", g.location) - for _, wrm := range msg.Wrm { - currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) - if err != nil { - feedback.Warn("Invalid ISRS code %v", err) - continue - } - var referenceCode string - if wrm.Reference_code == nil { - feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") - referenceCode = "ZPG" - } else { - referenceCode = string(*wrm.Reference_code) - } - for _, measure := range wrm.Measure { - var unit string - if measure.Unit == nil { - feedback.Info("'Unit' not specified. Assuming 'cm'") - unit = "cm" - } else { - unit = string(*measure.Unit) - } - convert, err := rescale(unit) - if err != nil { - return nil, err - } - isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL - err = insertStmt.QueryRowContext( - ctx, - currIsrs.CountryCode, - currIsrs.LoCode, - currIsrs.FairwaySection, - currIsrs.Orc, - currIsrs.Hectometre, - measure.Measuredate, - msg.Identification.From, - msg.Identification.Language_code, - msg.Identification.Country_code, - msg.Identification.Date_issue, - referenceCode, - convert(measure.Value), - measure.Predicted, - isWaterlevel, - convert(measure.Value_min), - convert(measure.Value_max), - msg.Identification.Date_issue, - msg.Identification.Originator, - true, // staging_done - ).Scan(&gid) - if err != nil { - return nil, err - } - } - feedback.Info("Inserted %d measurements for %s", - len(wrm.Measure), currIsrs) - gids = append(gids, currIsrs.String()) - } + feedback.Info("Inserted %d measurements for %s", + len(wrm.Measure), curr) + gids = append(gids, curr) } } return gids, nil