Mercurial > gemma
view pkg/imports/gm.go @ 3954:cb4fda122321
Completing: Don't create extra inbetween class breaks.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 15 Jul 2019 17:54:12 +0200 |
parents | d7b9d5c0ebad |
children | 0b65915757c8 6c5c15b2fb64 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "fmt" "log" "sort" "strings" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" "github.com/jackc/pgx/pgtype" ) // GaugeMeasurement is an import job to import // gauges measurement data from a NtS SOAP service. type GaugeMeasurement struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // GMJobKind is the import queue type identifier. const GMJobKind JobKind = "gm" const ( listGaugesSQL = ` SELECT (location).country_code, (location).locode, (location).fairway_section, (location).orc, (location).hectometre FROM waterway.gauges WHERE (location).country_code = users.current_user_country() OR pg_has_role('sys_admin', 'MEMBER') ` // Note: we do not expect corrections of data through this service. So // any constraint conflicts are triggered by redundant data which // can be dropped. insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( location, validity, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, date_info, source_organization, staging_done ) VALUES ( ($1, $2, $3, $4, $5), COALESCE( (SELECT validity FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND validity @> CAST($6 AS timestamp with time zone)), tstzrange(NULL, NULL)), $6, $7, $8, $9, $10, $11, $12, $13, $14, true ) ON CONFLICT DO NOTHING RETURNING 1 ` insertGPSQL = ` INSERT INTO waterway.gauge_predictions ( location, validity, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, conf_interval, date_info, source_organization ) VALUES( ($1, $2, $3, $4, $5), COALESCE( (SELECT validity FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND validity @> CAST($6 AS timestamp with time zone)), tstzrange(NULL, NULL)), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 ) ON CONFLICT DO NOTHING RETURNING 1 ` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create() Job { return new(GaugeMeasurement) } func (gmJobCreator) Depends() [2][]string { return [2][]string{ {"gauge_measurements"}, {"gauges"}, } } func (gmJobCreator) AutoAccept() bool { return true } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*nts.RIS_Message_Type, error) { client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) mt := nts.Message_type_typeWRM var dis []*nts.Date_pair dis = append(dis, &nts.Date_pair{ Date_start: nts.Date{time.Now().Add(time.Duration(-24) * time.Hour)}, Date_end: nts.Date{time.Now()}, }) req := &nts.Get_messages_query{ Message_type: &mt, Dates_issue: dis, } const maxTries = 3 tries := 0 again: resp, err := client.Get_messages(req) if err != nil { if t, ok := err.(interface{ Timeout() bool }); ok && t.Timeout() && tries < maxTries { log.Println("warn: NtS SOAP request timed out. Trying again.") tries++ goto again } return nil, fmt.Errorf( "Error requesting NtS service: %v", err) } result := resp.Result_message if result == nil { for _, e := range resp.Result_error { if e != nil { feedback.Error("Error code: %s", *e) } else { feedback.Error("Unknown error") } } } return result, nil } return storeGaugeMeasurements( ctx, importID, fetch, conn, feedback, ) } func loadGauges(ctx context.Context, conn *sql.Conn) ([]string, error) { rows, err := conn.QueryContext(ctx, listGaugesSQL) if err != nil { return nil, err } defer rows.Close() var gauges []string for rows.Next() { var g models.Isrs if err = rows.Scan( &g.CountryCode, &g.LoCode, &g.FairwaySection, &g.Orc, &g.Hectometre, ); err != nil { return nil, err } gauges = append(gauges, g.String()) } if err = rows.Err(); err != nil { return nil, err } if len(gauges) == 0 { return nil, UnchangedError( "No gauges for which measurements can be imported in database") } sort.Strings(gauges) return gauges, nil } func storeGaugeMeasurements( ctx context.Context, importID int64, fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() // Get gauges from database, for which user is allowed to import data gauges, err := loadGauges(ctx, conn) if err != nil { return nil, err } gids, err := doForGM(ctx, gauges, fetch, conn, feedback) if err != nil { return nil, err } if len(gids) == 0 { return nil, UnchangedError("No new gauge measurements found") } feedback.Info( "Importing gauge measurements took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } // rescale returns a scaling function to bring the unit all to cm. func rescale(unit string) (func(*float32), error) { var scale float32 switch strings.ToLower(unit) { case "mm": scale = 0.1 case "cm": scale = 1.0 case "dm": scale = 10.0 case "m": scale = 100.0 case "hm": scale = 10000.0 case "km": scale = 100000.0 default: return nil, fmt.Errorf("unknown unit '%s'", unit) } fn := func(x *float32) { if x != nil { *x *= scale } } return fn, nil } func doForGM( ctx context.Context, gauges []string, fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) ([]string, error) { insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL) if err != nil { return nil, err } defer insertGPStmt.Close() insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertGMStmt.Close() // lookup to see if data can be imported for gauge isKnown := func(s string) bool { idx := sort.SearchStrings(gauges, s) return idx < len(gauges) && gauges[idx] == s } result, err := fetch() if err != nil { return nil, err } var gids []string for _, msg := range result { for _, wrm := range msg.Wrm { curr := string(*wrm.Geo_object.Id) curr = strings.TrimSpace(curr) currIsrs, err := models.IsrsFromString(curr) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } feedback.Info("Found measurements/predictions for %s", curr) if !isKnown(curr) { feedback.Warn("Cannot find gauge %q for import", curr) continue } var referenceCode string if wrm.Reference_code == nil { feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } badValue := 0 newM, newP := 0, 0 for _, measure := range wrm.Measure { var unit string if *measure.Measure_code != nts.Measure_code_enumWAL { feedback.Warn("Ignored message with measure_code %s", *measure.Measure_code) continue } if measure.Unit == nil { feedback.Info("'Unit' not specified. Assuming 'cm'") unit = "cm" } else { unit = string(*measure.Unit) } convert, err := rescale(unit) if err != nil { return nil, err } convert(measure.Value) convert(measure.Value_min) convert(measure.Value_max) // -99999 is used by some gauges to signal an error if *measure.Value == -99999 { badValue++ continue } var dummy int if measure.Predicted { confInterval := pgtype.Numrange{ Lower: pgtype.Numeric{Status: pgtype.Null}, Upper: pgtype.Numeric{Status: pgtype.Null}, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Null, } if measure.Value_min != nil && measure.Value_max != nil { valueMin := pgtype.Numeric{Status: pgtype.Null} valueMax := pgtype.Numeric{Status: pgtype.Null} valueMin.Set(measure.Value_min) valueMax.Set(measure.Value_max) confInterval = pgtype.Numrange{ Lower: valueMin, Upper: valueMax, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Present, } } err = insertGPStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate.Time, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue.Time, referenceCode, measure.Value, &confInterval, msg.Identification.Date_issue.Time, msg.Identification.Originator, ).Scan(&dummy) switch { case err == sql.ErrNoRows: // thats expected, nothing to do case err != nil: feedback.Warn(handleError(err).Error()) default: newP++ } } else { if measure.Value == nil { feedback.Info("Missing value at %s. Ignored", measure.Measuredate.Format(time.RFC3339)) continue } err = insertGMStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate.Time, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue.Time, referenceCode, measure.Value, msg.Identification.Date_issue.Time, msg.Identification.Originator, ).Scan(&dummy) switch { case err == sql.ErrNoRows: // thats expected, nothing to do case err != nil: feedback.Warn(handleError(err).Error()) default: newM++ } } } if badValue > 0 { feedback.Warn("Ignored %d measurements with value -99999", badValue) } feedback.Info("Inserted %d measurements for %s", newM, curr) feedback.Info("Inserted %d predictions for %s", newP, curr) gids = append(gids, curr) } } return gids, nil }