Mercurial > gemma
view pkg/imports/gm.go @ 3250:246754028bf4
client: cleanup code (fairwayprofile diagram)
author | Fadi Abbud <fadi.abbud@intevation.de> |
---|---|
date | Mon, 13 May 2019 16:31:35 +0200 |
parents | 4acbee65275d |
children | 9ae43313b463 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "fmt" "strings" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" ) // GaugeMeasurement is an import job to import // gauges measurement data from a NtS SOAP service. type GaugeMeasurement struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // GMJobKind is the import queue type identifier. const GMJobKind JobKind = "gm" const ( // Note: we do not expect corrections of data through this service. So // any constraint conflicts are triggered by actual redundat data which // can be dropped. insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, predicted, is_waterlevel, value_min, value_max, date_info, source_organization, staging_done ) VALUES( ($1, $2, $3, $4, $5), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19 ) ON CONFLICT DO NOTHING RETURNING id ` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create() Job { return new(GaugeMeasurement) } func (gmJobCreator) Depends() [2][]string { return [2][]string{ {"gauge_measurements"}, {"gauges"}, } } func (gmJobCreator) AutoAccept() bool { return true } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*nts.RIS_Message_Type, error) { client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) mt := nts.Message_type_typeWRM req := &nts.Get_messages_query{ Message_type: &mt, } resp, err := client.Get_messages(req) if err != nil { return nil, err } result := resp.Result_message if result == nil { for _, e := range resp.Result_error { if e != nil { feedback.Error("Error code: %s", *e) } else { feedback.Error("Unknown error") } } } return result, nil } return storeGaugeMeasurements( ctx, importID, fetch, conn, feedback, ) } func storeGaugeMeasurements( ctx context.Context, importID int64, fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() // TODO get date_issue for selected gauges gids, err := doForGM(ctx, fetch, conn, feedback) if err != nil { feedback.Error("Error processing gauges: %v", err) return nil, err } if len(gids) == 0 { return nil, UnchangedError("No new gauge measurements found") } feedback.Info( "Importing gauge measurements took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } // rescale returns a scaling function to bring the unit all to cm. func rescale(unit string) (func(float32) float32, error) { var scale float32 switch strings.ToLower(unit) { case "mm": scale = 0.1 case "cm": scale = 1.0 case "dm": scale = 10.0 case "m": scale = 100.0 case "hm": scale = 10000.0 case "km": scale = 100000.0 default: return nil, fmt.Errorf("unknown unit '%s'", unit) } fn := func(x float32) float32 { return scale * x } return fn, nil } func doForGM( ctx context.Context, fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) ([]string, error) { insertStmt, err := conn.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() result, err := fetch() if err != nil { return nil, err } var gids []string for _, msg := range result { var gid int64 for _, wrm := range msg.Wrm { curr := string(*wrm.Geo_object.Id) currIsrs, err := models.IsrsFromString(curr) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } feedback.Info("Found measurements for %s", curr) var referenceCode string if wrm.Reference_code == nil { feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } var newCnt int = 0 for _, measure := range wrm.Measure { var unit string if measure.Unit == nil { feedback.Info("'Unit' not specified. Assuming 'cm'") unit = "cm" } else { unit = string(*measure.Unit) } convert, err := rescale(unit) if err != nil { return nil, err } isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL err = insertStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue, referenceCode, convert(measure.Value), measure.Predicted, isWaterlevel, convert(measure.Value_min), convert(measure.Value_max), msg.Identification.Date_issue, msg.Identification.Originator, true, // staging_done ).Scan(&gid) switch { case err == sql.ErrNoRows: // thats expected, nothing to do case err != nil: feedback.Warn(handleError(err).Error()) default: newCnt++ } } feedback.Info("Inserted %d measurements for %s", newCnt, curr) gids = append(gids, curr) } } return gids, nil }