Mercurial > gemma
view pkg/imports/gm.go @ 2549:9bf6b767a56a
client: refactored and improved splitscreen for diagrams
To make different diagrams possible, the splitscreen view needed to be decoupled from the cross profiles.
Also the style has changed to make it more consistent with the rest of the app. The standard box header
is now used and there are collapse and expand animations.
author | Markus Kottlaender <markus@intevation.de> |
---|---|
date | Fri, 08 Mar 2019 08:50:47 +0100 |
parents | 7d784840a9a7 |
children | b8972e4671fa |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "fmt" "sort" "strings" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" ) // GaugeMeasurement is an import job to import // gauges measurement data from a NtS SOAP service. type GaugeMeasurement struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // GMJobKind is the import queue type identifier. const GMJobKind JobKind = "gm" const ( listGaugesSQL = ` SELECT (location).country_code, (location).locode, (location).fairway_section, (location).orc, (location).hectometre FROM waterway.gauges WHERE (location).country_code = users.current_user_country()` // TODO: Currently this statement updates existing data sets. In case we want // 'historization' we need to develop an other mechanism to keep existing // data. insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, sender, language_code, country_code, date_issue, reference_code, water_level, predicted, is_waterlevel, value_min, value_max, date_info, source_organization, staging_done ) VALUES( ($1, $2, $3, $4, $5), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19 ) ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key DO UPDATE SET country_code = EXCLUDED.country_code, sender = EXCLUDED.sender, language_code = EXCLUDED.language_code, date_issue = EXCLUDED.date_issue, reference_code= EXCLUDED.reference_code, water_level = EXCLUDED.water_level, predicted = EXCLUDED.predicted, is_waterlevel = EXCLUDED.is_waterlevel, value_min = EXCLUDED.value_min, value_max = EXCLUDED.value_max, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id ` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create() Job { return new(GaugeMeasurement) } func (gmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } func (gmJobCreator) AutoAccept() bool { return true } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*nts.RIS_Message_Type, error) { client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) mt := nts.Message_type_typeWRM req := &nts.Get_messages_query{ Message_type: &mt, } resp, err := client.Get_messages(req) if err != nil { return nil, err } result := resp.Result_message if result == nil { for _, e := range resp.Result_error { if e != nil { feedback.Error("Error code: %s", *e) } else { feedback.Error("Unknown error") } } } return result, nil } return storeGaugeMeasurements( ctx, importID, fetch, conn, feedback, ) } func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) { rows, err := tx.QueryContext(ctx, listGaugesSQL) if err != nil { return nil, err } defer rows.Close() var gauges []string for rows.Next() { var g models.Isrs if err = rows.Scan( &g.CountryCode, &g.LoCode, &g.FairwaySection, &g.Orc, &g.Hectometre, ); err != nil { return nil, err } gauges = append(gauges, g.String()) } if err = rows.Err(); err != nil { return nil, err } sort.Strings(gauges) return gauges, nil } func storeGaugeMeasurements( ctx context.Context, importID int64, fetch func() ([]*nts.RIS_Message_Type, error), conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() // Get available gauges from database for use as filter in SOAP request gauges, err := loadGauges(ctx, tx) if err != nil { return nil, err } // TODO get date_issue for selected gauges gids, err := doForGM(ctx, gauges, fetch, tx, feedback) if err != nil { feedback.Error("Error processing %d gauges: %v", len(gauges), err) return nil, err } if len(gids) == 0 { feedback.Info("No new gauge measurements found") return nil, UnchangedError("No new gauge measurements found") } if err = tx.Commit(); err != nil { feedback.Info( "Importing gauge measurements failed after %s", time.Since(start)) return nil, err } feedback.Info( "Importing gauge measurements successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } // rescale returns a scaling function to bring the unit all to cm. func rescale(unit string) (func(float32) float32, error) { var scale float32 switch strings.ToLower(unit) { case "mm": scale = 0.1 case "cm": scale = 1.0 case "dm": scale = 10.0 case "m": scale = 100.0 case "hm": scale = 10000.0 case "km": scale = 100000.0 default: return nil, fmt.Errorf("unknown unit '%s'", unit) } fn := func(x float32) float32 { return scale * x } return fn, nil } func doForGM( ctx context.Context, gauges []string, fetch func() ([]*nts.RIS_Message_Type, error), tx *sql.Tx, feedback Feedback, ) ([]string, error) { insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() // lookup to see if we have gauges in the database. isKnown := func(s string) bool { idx := sort.SearchStrings(gauges, s) return idx < len(gauges) && gauges[idx] == s } result, err := fetch() if err != nil { return nil, err } var gids []string for _, msg := range result { var gid int64 for _, wrm := range msg.Wrm { curr := string(*wrm.Geo_object.Id) currIsrs, err := models.IsrsFromString(curr) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } feedback.Info("Found measurements for %s", curr) if !isKnown(curr) { feedback.Warn("Gauge '%s' is not in database.", curr) continue } var referenceCode string if wrm.Reference_code == nil { feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } for _, measure := range wrm.Measure { var unit string if measure.Unit == nil { feedback.Info("'Unit' not specified. Assuming 'cm'") unit = "cm" } else { unit = string(*measure.Unit) } convert, err := rescale(unit) if err != nil { return nil, err } isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL err = insertStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Country_code, msg.Identification.Date_issue, referenceCode, convert(measure.Value), measure.Predicted, isWaterlevel, convert(measure.Value_min), convert(measure.Value_max), msg.Identification.Date_issue, msg.Identification.Originator, true, // staging_done ).Scan(&gid) if err != nil { return nil, err } } feedback.Info("Inserted %d measurements for %s", len(wrm.Measure), curr) gids = append(gids, curr) } } return gids, nil }