Mercurial > gemma
view pkg/imports/gm.go @ 2104:c9af355d4a2c
staging: display stretch name
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 04 Feb 2019 14:35:47 +0100 |
parents | 8a62ce2a5c70 |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "fmt" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" ) // GaugeMeasurement is an import job to import // gauges measurement data from a NtS SOAP service. type GaugeMeasurement struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // gaugeMeasurement holds information about a gauge and the latest measurement type gaugeMeasurement struct { Gauge models.Isrs LatestDateIssue time.Time } // GMJobKind is the import queue type identifier. const GMJobKind JobKind = "gm" const ( listGaugesSQL = ` SELECT (location).country_code, (location).locode, (location).fairway_section, (location).orc, (location).hectometre FROM waterway.gauges WHERE (location).country_code = users.current_user_country()` // TODO: Currently this statement updates existing data sets. In case we want // 'historization' we need to develop an other mechanism to keep existing // data. insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, predicted, is_waterlevel, value_min, value_max, date_info, source_organization, staging_done ) VALUES( ($1, $2, $3, $4, $5), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19 ) ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key DO UPDATE SET country_code = EXCLUDED.country_code, sender = EXCLUDED.sender, language_code = EXCLUDED.language_code, date_issue = EXCLUDED.date_issue, reference_code= EXCLUDED.reference_code, water_level = EXCLUDED.water_level, predicted = EXCLUDED.predicted, is_waterlevel = EXCLUDED.is_waterlevel, value_min = EXCLUDED.value_min, value_max = EXCLUDED.value_max, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id ` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create(_ JobKind, data string) (Job, error) { gm := new(GaugeMeasurement) if err := common.FromJSONString(data, gm); err != nil { return nil, err } return gm, nil } func (gmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } func (gmJobCreator) AutoAccept() bool { return true } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available gauges from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listGaugesSQL) if err != nil { return nil, err } defer rows.Close() gauges := []gaugeMeasurement{} for rows.Next() { var g gaugeMeasurement if err = rows.Scan( &g.Gauge.CountryCode, &g.Gauge.LoCode, &g.Gauge.FairwaySection, &g.Gauge.Orc, &g.Gauge.Hectometre, ); err != nil { return nil, err } gauges = append(gauges, g) } if err = rows.Err(); err != nil { return nil, err } // TODO get date_issue for selected gauges gids, err := gm.doForGM(ctx, gauges, conn, feedback) if err != nil { feedback.Error("Error processing %d gauges: %v", len(gauges), err) return nil, err } if len(gids) == 0 { feedback.Info("No new gauge measurements found") return nil, nil } // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } // rescale returns a scaling function to bring the unit all to cm. func rescale(unit string) (func(float32) float32, error) { var scale float32 switch strings.ToLower(unit) { case "mm": scale = 0.1 case "cm": scale = 1.0 case "dm": scale = 10.0 case "m": scale = 100.0 case "hm": scale = 10000.0 case "km": scale = 100000.0 default: return nil, fmt.Errorf("unknown unit '%s'", unit) } fn := func(x float32) float32 { return scale * x } return fn, nil } func (gm *GaugeMeasurement) doForGM( ctx context.Context, gauges []gaugeMeasurement, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() mt := nts.Message_type_typeWRM var gids []string for _, g := range gauges { var idPair []*nts.Id_pair isrs := g.Gauge.String() isrsID := nts.Isrs_code_type(isrs) idPair = append(idPair, &nts.Id_pair{ Id: &isrsID, }) req := &nts.Get_messages_query{ Message_type: &mt, Ids: idPair, } resp, err := client.Get_messages(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Result_message == nil { for _, e := range resp.Result_error { if e != nil { feedback.Error("No gauge measurements found for %s", g.Gauge.String()) } else { feedback.Error("unknown") } } continue } result := resp.Result_message insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() for _, msg := range result { var gid int64 feedback.Info("Found measurements for %s", g.Gauge.String()) for _, wrm := range msg.Wrm { currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } var referenceCode string if wrm.Reference_code == nil { feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } for _, measure := range wrm.Measure { var unit string if measure.Unit == nil { feedback.Info("'Unit' not specified. Assuming 'cm'") unit = "cm" } else { unit = string(*measure.Unit) } convert, err := rescale(unit) if err != nil { return nil, err } isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL err = insertStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue, referenceCode, convert(measure.Value), measure.Predicted, isWaterlevel, convert(measure.Value_min), convert(measure.Value_max), msg.Identification.Date_issue, msg.Identification.Originator, true, // staging_done ).Scan(&gid) if err != nil { return nil, err } } feedback.Info("Inserted %d measurements for %s", len(wrm.Measure), currIsrs) gids = append(gids, currIsrs.String()) } } } feedback.Info("Storing gauge measurements took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of gauge measurements was successful") } return gids, nil }