Mercurial > gemma
view pkg/imports/gm.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | 8a62ce2a5c70 |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "fmt" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" ) // GaugeMeasurement is an import job to import // gauges measurement data from a NtS SOAP service. type GaugeMeasurement struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // gaugeMeasurement holds information about a gauge and the latest measurement type gaugeMeasurement struct { Gauge models.Isrs LatestDateIssue time.Time } // GMJobKind is the import queue type identifier. const GMJobKind JobKind = "gm" const ( listGaugesSQL = ` SELECT (location).country_code, (location).locode, (location).fairway_section, (location).orc, (location).hectometre FROM waterway.gauges WHERE (location).country_code = users.current_user_country()` // TODO: Currently this statement updates existing data sets. In case we want // 'historization' we need to develop an other mechanism to keep existing // data. insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, predicted, is_waterlevel, value_min, value_max, date_info, source_organization, staging_done ) VALUES( ($1, $2, $3, $4, $5), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19 ) ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key DO UPDATE SET country_code = EXCLUDED.country_code, sender = EXCLUDED.sender, language_code = EXCLUDED.language_code, date_issue = EXCLUDED.date_issue, reference_code= EXCLUDED.reference_code, water_level = EXCLUDED.water_level, predicted = EXCLUDED.predicted, is_waterlevel = EXCLUDED.is_waterlevel, value_min = EXCLUDED.value_min, value_max = EXCLUDED.value_max, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id ` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create(_ JobKind, data string) (Job, error) { gm := new(GaugeMeasurement) if err := common.FromJSONString(data, gm); err != nil { return nil, err } return gm, nil } func (gmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } func (gmJobCreator) AutoAccept() bool { return true } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available gauges from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listGaugesSQL) if err != nil { return nil, err } defer rows.Close() gauges := []gaugeMeasurement{} for rows.Next() { var g gaugeMeasurement if err = rows.Scan( &g.Gauge.CountryCode, &g.Gauge.LoCode, &g.Gauge.FairwaySection, &g.Gauge.Orc, &g.Gauge.Hectometre, ); err != nil { return nil, err } gauges = append(gauges, g) } if err = rows.Err(); err != nil { return nil, err } // TODO get date_issue for selected gauges gids, err := gm.doForGM(ctx, gauges, conn, feedback) if err != nil { feedback.Error("Error processing %d gauges: %v", len(gauges), err) return nil, err } if len(gids) == 0 { feedback.Info("No new gauge measurements found") return nil, nil } // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } // rescale returns a scaling function to bring the unit all to cm. func rescale(unit string) (func(float32) float32, error) { var scale float32 switch strings.ToLower(unit) { case "mm": scale = 0.1 case "cm": scale = 1.0 case "dm": scale = 10.0 case "m": scale = 100.0 case "hm": scale = 10000.0 case "km": scale = 100000.0 default: return nil, fmt.Errorf("unknown unit '%s'", unit) } fn := func(x float32) float32 { return scale * x } return fn, nil } func (gm *GaugeMeasurement) doForGM( ctx context.Context, gauges []gaugeMeasurement, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() mt := nts.Message_type_typeWRM var gids []string for _, g := range gauges { var idPair []*nts.Id_pair isrs := g.Gauge.String() isrsID := nts.Isrs_code_type(isrs) idPair = append(idPair, &nts.Id_pair{ Id: &isrsID, }) req := &nts.Get_messages_query{ Message_type: &mt, Ids: idPair, } resp, err := client.Get_messages(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Result_message == nil { for _, e := range resp.Result_error { if e != nil { feedback.Error("No gauge measurements found for %s", g.Gauge.String()) } else { feedback.Error("unknown") } } continue } result := resp.Result_message insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() for _, msg := range result { var gid int64 feedback.Info("Found measurements for %s", g.Gauge.String()) for _, wrm := range msg.Wrm { currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } var referenceCode string if wrm.Reference_code == nil { feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } for _, measure := range wrm.Measure { var unit string if measure.Unit == nil { feedback.Info("'Unit' not specified. Assuming 'cm'") unit = "cm" } else { unit = string(*measure.Unit) } convert, err := rescale(unit) if err != nil { return nil, err } isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL err = insertStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue, referenceCode, convert(measure.Value), measure.Predicted, isWaterlevel, convert(measure.Value_min), convert(measure.Value_max), msg.Identification.Date_issue, msg.Identification.Originator, true, // staging_done ).Scan(&gid) if err != nil { return nil, err } } feedback.Info("Inserted %d measurements for %s", len(wrm.Measure), currIsrs) gids = append(gids, currIsrs.String()) } } } feedback.Info("Storing gauge measurements took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of gauge measurements was successful") } return gids, nil }