Mercurial > gemma
view pkg/imports/gm.go @ 1676:4407ecaa2192
Imports: Cosmetics.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 26 Dec 2018 10:30:19 +0100 |
parents | 819f67c31dfb |
children | 774174d09d30 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" ) type GaugeMeasurement struct { URL string `json:"url"` Insecure bool `json:"insecure"` } const GMJobKind JobKind = "gm" const ( listGaugesSQL = ` SELECT (location).country_code, (location).locode, (location).fairway_section, (location).orc, (location).hectometre FROM waterway.gauges WHERE (location).country_code = users.current_user_country()` hasGaugeMeasurementSQL = ` SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1` insertGMSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, sender, language_code, date_issue, water_level, predicted, is_waterlevel, value_min, value_max, date_info, source_organization ) VALUES( ($1, $2, $3, $4, $5), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 ) RETURNING id` ) type gmJobCreator struct{} func init() { RegisterJobCreator(GMJobKind, gmJobCreator{}) } func (gmJobCreator) Description() string { return "gauge measurements" } func (gmJobCreator) Create(_ JobKind, data string) (Job, error) { gm := new(GaugeMeasurement) if err := common.FromJSONString(data, gm); err != nil { return nil, err } return gm, nil } func (gmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } // StageDone moves the imported gauge measurement out of the staging area. // Currently doing nothing. func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available gauges from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listGaugesSQL) if err != nil { return nil, err } defer rows.Close() gauges := []models.GaugeMeasurement{} for rows.Next() { var g models.GaugeMeasurement if err = rows.Scan( &g.Gauge.CountryCode, &g.Gauge.LoCode, &g.Gauge.FairwaySection, &g.Gauge.Orc, &g.Gauge.Hectometre, ); err != nil { return nil, err } gauges = append(gauges, g) } if err = rows.Err(); err != nil { return nil, err } // TODO get date_issue for selected gauges gids, err := gm.doForGM(ctx, gauges, conn, feedback) if err != nil { feedback.Error("Error processing %d gauges: %s", len(gauges), err) } if len(gids) == 0 { feedback.Info("No new gauge measurements found") return nil, nil } // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` }{ GaugeMeasuremets: gids, } return &summary, err } func (gm *GaugeMeasurement) doForGM( ctx context.Context, gauges []models.GaugeMeasurement, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) var idPairs []*nts.Id_pair for _, g := range gauges { isrs := g.Gauge.String() isrsID := nts.Isrs_code_type(isrs) idPairs = append(idPairs, &nts.Id_pair{ Id: &isrsID, }) } mt := nts.Message_type_typeWRM req := &nts.Get_messages_query{ Message_type: &mt, Ids: idPairs, } resp, err := client.Get_messages(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Result_message == nil { err := errors.New("no gauge measurements found") for i, e := range resp.Result_error { feedback.Error("%d: %v", i, e) } return nil, err } result := resp.Result_message tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() var gid int64 var gids []string for _, msg := range result { feedback.Info("Found %d gauges with measurements", len(msg.Wrm)) for _, wrm := range msg.Wrm { currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) if err != nil { feedback.Warn("Invalid ISRS code %v", err) continue } for _, measure := range wrm.Measure { isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL err = insertStmt.QueryRowContext( ctx, currIsrs.CountryCode, currIsrs.LoCode, currIsrs.FairwaySection, currIsrs.Orc, currIsrs.Hectometre, measure.Measuredate, msg.Identification.From, msg.Identification.Language_code, msg.Identification.Date_issue, measure.Value, measure.Predicted, isWaterlevel, measure.Value_min, measure.Value_max, msg.Identification.Date_issue, msg.Identification.Originator, ).Scan(&gid) if err != nil { return nil, err } } feedback.Info("Inserted %d measurements for %s", len(wrm.Measure), currIsrs) gids = append(gids, currIsrs.String()) } } feedback.Info("Storing gauge measurements took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of gauge measurements was successful") } return gids, nil }