diff pkg/imports/gm.go @ 1879:9a2fbeaabd52 dev-pdf-generation

merging in from branch default
author Bernhard Reiter <bernhard@intevation.de>
date Tue, 15 Jan 2019 10:07:10 +0100
parents 48791416bea5
children 15af36e41f27
line wrap: on
line diff
--- a/pkg/imports/gm.go	Tue Jan 15 09:54:46 2019 +0100
+++ b/pkg/imports/gm.go	Tue Jan 15 10:07:10 2019 +0100
@@ -10,12 +10,15 @@
 //
 // Author(s):
 //  * Raimund Renkert <raimund.renkert@intevation.de>
+
 package imports
 
 import (
 	"context"
 	"database/sql"
 	"errors"
+	"fmt"
+	"strings"
 	"time"
 
 	"gemma.intevation.de/gemma/pkg/common"
@@ -23,11 +26,17 @@
 	"gemma.intevation.de/gemma/pkg/soap/nts"
 )
 
+// GaugeMeasurement is an import job to import
+// gauges measurement data from a NtS SOAP service.
 type GaugeMeasurement struct {
-	URL      string `json:"url"`
-	Insecure bool   `json:"insecure"`
+	// URL is the URL of the SOAP service.
+	URL string `json:"url"`
+	// Insecure indicates if HTTPS traffic
+	// should validate certificates or not.
+	Insecure bool `json:"insecure"`
 }
 
+// GMJobKind is the import queue type identifier.
 const GMJobKind JobKind = "gm"
 
 const (
@@ -41,23 +50,23 @@
 FROM waterway.gauges
 WHERE (location).country_code = users.current_user_country()`
 
-	hasGaugeMeasurementSQL = `
-SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1`
-
 	insertGMSQL = `
 INSERT INTO waterway.gauge_measurements (
   fk_gauge_id,
   measure_date,
   sender,
   language_code,
+  country_code,
   date_issue,
+  reference_code,
   water_level,
   predicted,
   is_waterlevel,
   value_min,
   value_max,
   date_info,
-  source_organization
+  source_organization,
+  staging_done
 ) VALUES(
   ($1, $2, $3, $4, $5),
   $6,
@@ -70,7 +79,10 @@
   $13,
   $14,
   $15,
-  $16
+  $16,
+  $17,
+  $18,
+  $19
 )
 RETURNING id`
 )
@@ -81,6 +93,10 @@
 	RegisterJobCreator(GMJobKind, gmJobCreator{})
 }
 
+func (gmJobCreator) Description() string {
+	return "gauge measurements"
+}
+
 func (gmJobCreator) Create(_ JobKind, data string) (Job, error) {
 	gm := new(GaugeMeasurement)
 	if err := common.FromJSONString(data, gm); err != nil {
@@ -91,23 +107,21 @@
 
 func (gmJobCreator) Depends() []string {
 	return []string{
-		"waterway.gauges",
-		"waterway.gauge_measurements",
+		"gauges",
+		"gauge_measurements",
 	}
 }
 
+func (gmJobCreator) AutoAccept() bool { return true }
+
 // StageDone moves the imported gauge measurement out of the staging area.
 // Currently doing nothing.
-func (gmJobCreator) StageDone(
-	ctx context.Context,
-	tx *sql.Tx,
-	id int64,
-) error {
+func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
 	return nil
 }
 
 // CleanUp of a gauge measurement import is a NOP.
-func (gm *GaugeMeasurement) CleanUp() error { return nil }
+func (*GaugeMeasurement) CleanUp() error { return nil }
 
 // Do executes the actual bottleneck import.
 func (gm *GaugeMeasurement) Do(
@@ -149,7 +163,8 @@
 	// TODO get date_issue for selected gauges
 	gids, err := gm.doForGM(ctx, gauges, conn, feedback)
 	if err != nil {
-		feedback.Error("Error processing %d gauges: %s", len(gauges), err)
+		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
+		return nil, err
 	}
 	if len(gids) == 0 {
 		feedback.Info("No new gauge measurements found")
@@ -164,6 +179,32 @@
 	return &summary, err
 }
 
+// rescale returns a scaling function to bring the unit all to cm.
+func rescale(unit string) (func(float32) float32, error) {
+
+	var scale float32
+
+	switch strings.ToLower(unit) {
+	case "mm":
+		scale = 0.1
+	case "cm":
+		scale = 1.0
+	case "dm":
+		scale = 10.0
+	case "m":
+		scale = 100.0
+	case "hm":
+		scale = 10000.0
+	case "km":
+		scale = 100000.0
+	default:
+		return nil, fmt.Errorf("unknown unit '%s'", unit)
+	}
+
+	fn := func(x float32) float32 { return scale * x }
+	return fn, nil
+}
+
 func (gm *GaugeMeasurement) doForGM(
 	ctx context.Context,
 	gauges []models.GaugeMeasurement,
@@ -225,7 +266,25 @@
 				feedback.Warn("Invalid ISRS code %v", err)
 				continue
 			}
+			var referenceCode string
+			if wrm.Reference_code == nil {
+				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
+				referenceCode = "ZPG"
+			} else {
+				referenceCode = string(*wrm.Reference_code)
+			}
 			for _, measure := range wrm.Measure {
+				var unit string
+				if measure.Unit == nil {
+					feedback.Info("'Unit' not specified. Assuming 'cm'")
+					unit = "cm"
+				} else {
+					unit = string(*measure.Unit)
+				}
+				convert, err := rescale(unit)
+				if err != nil {
+					return nil, err
+				}
 				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
 				err = insertStmt.QueryRowContext(
 					ctx,
@@ -237,14 +296,17 @@
 					measure.Measuredate,
 					msg.Identification.From,
 					msg.Identification.Language_code,
+					msg.Identification.Country_code,
 					msg.Identification.Date_issue,
-					measure.Value,
+					referenceCode,
+					convert(measure.Value),
 					measure.Predicted,
 					isWaterlevel,
-					measure.Value_min,
-					measure.Value_max,
+					convert(measure.Value_min),
+					convert(measure.Value_max),
 					msg.Identification.Date_issue,
 					msg.Identification.Originator,
+					true, // staging_done
 				).Scan(&gid)
 				if err != nil {
 					return nil, err