diff pkg/imports/gm.go @ 1637:dd31be75ce6d

Implemented gauge measurement import.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 20 Dec 2018 12:06:37 +0100
parents
children 2a35bbbb4d93
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/gm.go	Thu Dec 20 12:06:37 2018 +0100
@@ -0,0 +1,262 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Raimund Renkert <raimund.renkert@intevation.de>
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"errors"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/models"
+	"gemma.intevation.de/gemma/pkg/soap/nts"
+)
+
+type GaugeMeasurement struct {
+	URL      string `json:"url"`
+	Insecure bool   `json:"insecure"`
+}
+
+const GMJobKind JobKind = "gm"
+
+const (
+	listGaugesSQL = `
+SELECT
+  (location).country_code,
+  (location).locode,
+  (location).fairway_section,
+  (location).orc,
+  (location).hectometre
+FROM waterway.gauges
+WHERE (location).country_code = users.current_user_country()`
+
+	hasGaugeMeasurementSQL = `
+SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1`
+
+	insertGMSQL = `
+INSERT INTO waterway.gauge_measurements (
+  fk_gauge_id,
+  measure_date,
+  sender,
+  language_code,
+  date_issue,
+  water_level,
+  predicted,
+  is_waterlevel,
+  value_min,
+  value_max,
+  date_info,
+  source_organization
+) VALUES(
+  ($1, $2, $3, $4, $5),
+  $6,
+  $7,
+  $8,
+  $9,
+  $10,
+  $11,
+  $12,
+  $13,
+  $14,
+  $15,
+  $16
+)
+RETURNING id`
+)
+
+type gmJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(GMJobKind, gmJobCreator{})
+}
+
+func (gmJobCreator) Create(_ JobKind, data string) (Job, error) {
+	gm := new(GaugeMeasurement)
+	if err := common.FromJSONString(data, gm); err != nil {
+		return nil, err
+	}
+	return gm, nil
+}
+
+func (gmJobCreator) Depends() []string {
+	return []string{
+		"waterway.gauges",
+		"waterway.gauge_measurements",
+	}
+}
+
+// StageDone moves the imported gauge measurement out of the staging area.
+// Currently doing nothing.
+func (gmJobCreator) StageDone(
+	ctx context.Context,
+	tx *sql.Tx,
+	id int64,
+) error {
+	return nil
+}
+
+// CleanUp of a gauge measurement import is a NOP.
+func (gm *GaugeMeasurement) CleanUp() error { return nil }
+
+// Do executes the actual bottleneck import.
+func (gm *GaugeMeasurement) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	// Get available gauges from database for use as filter in SOAP request
+	var rows *sql.Rows
+
+	rows, err := conn.QueryContext(ctx, listGaugesSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	gauges := []models.GaugeMeasurement{}
+
+	for rows.Next() {
+		var g models.GaugeMeasurement
+		if err = rows.Scan(
+			&g.Gauge.CountryCode,
+			&g.Gauge.LoCode,
+			&g.Gauge.FairwaySection,
+			&g.Gauge.Orc,
+			&g.Gauge.Hectometre,
+		); err != nil {
+			return nil, err
+		}
+		gauges = append(gauges, g)
+	}
+
+	if err = rows.Err(); err != nil {
+		return nil, err
+	}
+
+	// TODO get date_issue for selected gauges
+	gids, err := gm.doForGM(ctx, gauges, conn, feedback)
+	if err != nil {
+		feedback.Error("Error processing %d gauges: %s", len(gauges), err)
+	}
+	if len(gids) == 0 {
+		feedback.Error("No new gauge measurements found")
+		return nil, errors.New("No new gauge measurements found")
+	}
+	// TODO: needs to be filled more useful.
+	summary := struct {
+		GaugeMeasuremets []string `json:"gaugeMeasurements"`
+	}{
+		GaugeMeasuremets: gids,
+	}
+	return &summary, err
+}
+
+func (gm *GaugeMeasurement) doForGM(
+	ctx context.Context,
+	gauges []models.GaugeMeasurement,
+	conn *sql.Conn,
+	feedback Feedback,
+) ([]string, error) {
+	start := time.Now()
+
+	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
+
+	var idPairs []*nts.Id_pair
+	for _, g := range gauges {
+		isrs := g.Gauge.String()
+		isrsID := nts.Isrs_code_type(isrs)
+		idPairs = append(idPairs, &nts.Id_pair{
+			Id: &isrsID,
+		})
+	}
+	mt := nts.Message_type_typeWRM
+	req := &nts.Get_messages_query{
+		Message_type: &mt,
+		Ids:          idPairs,
+	}
+	resp, err := client.Get_messages(req)
+	if err != nil {
+		feedback.Error("%v", err)
+		return nil, err
+	}
+
+	if resp.Result_message == nil {
+		err := errors.New("no Gauge Measurements found")
+		for i, e := range resp.Result_error {
+			feedback.Error("%d: %v", i, string(*e))
+		}
+		return nil, err
+	}
+
+	result := resp.Result_message
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	var gid int64
+	var gids []string
+	for _, msg := range result {
+		feedback.Info("Found %d gauges with measurements", len(msg.Wrm))
+		for _, wrm := range msg.Wrm {
+			currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
+			for _, measure := range wrm.Measure {
+				isWaterlevel := false
+				if *measure.Measure_code == nts.Measure_code_enumWAL {
+					isWaterlevel = true
+				}
+				err = insertStmt.QueryRowContext(
+					ctx,
+					currIsrs.CountryCode,
+					currIsrs.LoCode,
+					currIsrs.FairwaySection,
+					currIsrs.Orc,
+					currIsrs.Hectometre,
+					measure.Measuredate,
+					msg.Identification.From,
+					msg.Identification.Language_code,
+					msg.Identification.Date_issue,
+					measure.Value,
+					measure.Predicted,
+					isWaterlevel,
+					measure.Value_min,
+					measure.Value_max,
+					msg.Identification.Date_issue,
+					msg.Identification.Originator,
+				).Scan(&gid)
+				if err != nil {
+					return nil, err
+				}
+				feedback.Info("Inserted '%s'", currIsrs)
+			}
+			gids = append(gids, currIsrs.String())
+		}
+	}
+	feedback.Info("Storing gauge measurements took %s", time.Since(start))
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Import of gauge measurements was successful")
+	}
+
+	return gids, nil
+}