changeset 3277:232fc90e6ee2

Disentangle gauge measurements and predictions Representing both in one table has led to the necessity to make the distinction at many places such as statements, definitions of partial indexes and application code. At least in one place in the AGM import the distinction in application code was too late and measurements matching an approved measurement could have been missed.
author Tom Gottfried <tom@intevation.de>
date Wed, 15 May 2019 19:08:49 +0200
parents 75db3199f76e
children 831193935739
files pkg/controllers/gauges.go pkg/imports/agm.go pkg/imports/gm.go schema/gemma.sql schema/geoserver_views.sql
diffstat 5 files changed, 187 insertions(+), 147 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/gauges.go	Wed May 15 17:55:38 2019 +0200
+++ b/pkg/controllers/gauges.go	Wed May 15 19:08:49 2019 +0200
@@ -42,7 +42,23 @@
   date_issue,
   predicted,
   water_level
-FROM waterway.gauge_measurements
+FROM (
+  SELECT
+    fk_gauge_id,
+    measure_date,
+    date_issue,
+    false AS predicted,
+    water_level
+  FROM waterway.gauge_measurements
+  UNION ALL
+  SELECT
+    fk_gauge_id,
+    measure_date,
+    date_issue,
+    true AS predicted,
+    water_level
+  FROM waterway.gauge_predictions
+) AS gmp
 WHERE
   fk_gauge_id = (
     $1::char(2),
@@ -62,7 +78,27 @@
   value_min,
   value_max,
   predicted
-FROM waterway.gauge_measurements
+FROM (
+  SELECT
+    fk_gauge_id,
+    measure_date,
+    date_issue,
+    water_level,
+    NULL AS value_min,
+    NULL AS value_max,
+    false AS predicted
+  FROM waterway.gauge_measurements
+  UNION ALL
+  SELECT
+    fk_gauge_id,
+    measure_date,
+    date_issue,
+    water_level,
+    lower(conf_interval) AS value_min,
+    upper(conf_interval) AS value_max,
+    true AS predicted
+  FROM waterway.gauge_predictions
+) AS gmp
 WHERE
 `
 
@@ -78,7 +114,6 @@
     $4::char(5),
     $5::int
    )::isrs
-   AND NOT predicted
    AND staging_done
 `
 
@@ -100,7 +135,6 @@
     $4::char(5),
     $5::int
    )::isrs
-   AND NOT predicted
    AND staging_done
 GROUP BY extract(day from measure_date)::varchar || ':' ||
          extract(month from measure_date)::varchar;
@@ -111,7 +145,6 @@
   water_level
 FROM waterway.gauge_measurements
 WHERE
-  NOT predicted AND
   staging_done AND
   fk_gauge_id = (
     $1::char(2),
--- a/pkg/imports/agm.go	Wed May 15 17:55:38 2019 +0200
+++ b/pkg/imports/agm.go	Wed May 15 19:08:49 2019 +0200
@@ -78,7 +78,6 @@
     ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
     WHERE n.id IN (SELECT key FROM staged)
 	  AND o.id NOT IN (SELECT key FROM staged)
-          AND NOT o.predicted
 )
 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
 
@@ -120,38 +119,23 @@
 }
 
 type agmLine struct {
-	CountryCode        string   `json:"country-code"`
-	Sender             string   `json:"sender"`
-	LanguageCode       string   `json:"language-code"`
-	DateIssue          timetz   `json:"date-issue"`
-	ReferenceCode      string   `json:"reference-code"`
-	WaterLevel         float64  `json:"water-level"`
-	Predicted          bool     `json:"predicted"`
-	ValueMin           *float64 `json:"value-min"`
-	ValueMax           *float64 `json:"value-max"`
-	DateInfo           timetz   `json:"date-info"`
-	SourceOrganization string   `json:"source-organization"`
+	CountryCode        string  `json:"country-code"`
+	Sender             string  `json:"sender"`
+	LanguageCode       string  `json:"language-code"`
+	DateIssue          timetz  `json:"date-issue"`
+	ReferenceCode      string  `json:"reference-code"`
+	WaterLevel         float64 `json:"water-level"`
+	DateInfo           timetz  `json:"date-info"`
+	SourceOrganization string  `json:"source-organization"`
 }
 
 func (a *agmLine) hasDiff(b *agmLine) bool {
 	const eps = 0.00001
-	fdiff := func(x, y *float64) bool {
-		if x == nil && y == nil {
-			return false
-		}
-		if (x == nil && y != nil) || (x != nil && y == nil) {
-			return true
-		}
-		return math.Abs(*x-*y) > eps
-	}
 	return a.CountryCode != b.CountryCode ||
 		a.Sender != b.Sender ||
 		a.LanguageCode != b.LanguageCode ||
 		a.ReferenceCode != b.ReferenceCode ||
 		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
-		a.Predicted != b.Predicted ||
-		fdiff(a.ValueMin, b.ValueMin) ||
-		fdiff(a.ValueMax, b.ValueMax) ||
 		a.SourceOrganization != b.SourceOrganization
 }
 
@@ -171,9 +155,6 @@
   date_issue,
   reference_code,
   water_level,
-  predicted,
-  value_min,
-  value_max,
   date_info,
   source_organization
 FROM waterway.gauge_measurements
@@ -191,9 +172,6 @@
   date_issue,
   reference_code,
   water_level,
-  predicted,
-  value_min,
-  value_max,
   date_info,
   source_organization,
   is_waterlevel,
@@ -209,9 +187,6 @@
   $12,
   $13,
   $14,
-  $15,
-  $16,
-  $17,
   true,
   false
 )
@@ -382,9 +357,6 @@
 			oldDateIssue          time.Time
 			oldReferenceCode      string
 			oldValue              float64
-			oldPredicted          bool
-			oldValueMin           sql.NullFloat64
-			oldValueMax           sql.NullFloat64
 			oldDateInfo           time.Time
 			oldSourceOrganization string
 		)
@@ -405,9 +377,6 @@
 			&oldDateIssue,
 			&oldReferenceCode,
 			&oldValue,
-			&oldPredicted,
-			&oldValueMin,
-			&oldValueMax,
 			&oldDateInfo,
 			&oldSourceOrganization,
 		)
@@ -433,17 +402,6 @@
 		}
 		newValue := value
 
-		newPredicted := false
-
-		newValueMin := sql.NullFloat64{
-			Float64: 0,
-			Valid:   true,
-		}
-		newValueMax := sql.NullFloat64{
-			Float64: 0,
-			Valid:   true,
-		}
-
 		newDateInfo := newDateIssue
 
 		newSourceOrganization := newSender
@@ -470,9 +428,6 @@
 			newDateIssue,
 			newReferenceCode,
 			newValue,
-			newPredicted,
-			newValueMin,
-			newValueMax,
 			newDateInfo,
 			newSourceOrganization,
 		).Scan(&newID); err != nil {
@@ -499,9 +454,6 @@
 			newDateIssue,
 			newReferenceCode,
 			newValue,
-			newPredicted,
-			newValueMin,
-			newValueMax,
 			newDateInfo,
 			newSourceOrganization,
 		)
@@ -521,14 +473,11 @@
 				oldDateIssue,
 				oldReferenceCode,
 				oldValue,
-				oldPredicted,
-				oldValueMin,
-				oldValueMax,
 				oldDateInfo,
 				oldSourceOrganization,
 			)
 			// Ignore if there is no diff.
-			if o.Predicted || !n.hasDiff(o) {
+			if !n.hasDiff(o) {
 				continue
 			}
 			ase.Versions = []*agmLine{o, n}
@@ -549,19 +498,9 @@
 	dateIssue time.Time,
 	referenceCode string,
 	waterLevel float64,
-	predicted bool,
-	valueMin sql.NullFloat64,
-	valueMax sql.NullFloat64,
 	dateInfo time.Time,
 	sourceOrganization string,
 ) *agmLine {
-	nilFloat := func(v sql.NullFloat64) *float64 {
-		var p *float64
-		if v.Valid {
-			p = &v.Float64
-		}
-		return p
-	}
 	return &agmLine{
 		CountryCode:        countryCode,
 		Sender:             sender,
@@ -569,9 +508,6 @@
 		DateIssue:          timetz{dateIssue},
 		ReferenceCode:      referenceCode,
 		WaterLevel:         waterLevel,
-		Predicted:          predicted,
-		ValueMin:           nilFloat(valueMin),
-		ValueMax:           nilFloat(valueMax),
 		DateInfo:           timetz{dateInfo},
 		SourceOrganization: sourceOrganization,
 	}
--- a/pkg/imports/gm.go	Wed May 15 17:55:38 2019 +0200
+++ b/pkg/imports/gm.go	Wed May 15 19:08:49 2019 +0200
@@ -24,6 +24,7 @@
 
 	"gemma.intevation.de/gemma/pkg/models"
 	"gemma.intevation.de/gemma/pkg/soap/nts"
+	"github.com/jackc/pgx/pgtype"
 )
 
 // GaugeMeasurement is an import job to import
@@ -53,10 +54,7 @@
   date_issue,
   reference_code,
   water_level,
-  predicted,
   is_waterlevel,
-  value_min,
-  value_max,
   date_info,
   source_organization,
   staging_done
@@ -72,13 +70,42 @@
   $13,
   $14,
   $15,
-  $16,
-  $17,
-  $18,
-  $19
+  true
 )
 ON CONFLICT DO NOTHING
-RETURNING id
+RETURNING 1
+`
+
+	insertGPSQL = `
+INSERT INTO waterway.gauge_predictions (
+  fk_gauge_id,
+  measure_date,
+  sender,
+  language_code,
+  country_code,
+  date_issue,
+  reference_code,
+  water_level,
+  is_waterlevel,
+  conf_interval,
+  date_info,
+  source_organization
+) VALUES(
+  ($1, $2, $3, $4, $5),
+  $6,
+  $7,
+  $8,
+  $9,
+  $10,
+  $11,
+  $12,
+  $13,
+  $14,
+  $15,
+  $16
+)
+ON CONFLICT DO NOTHING
+RETURNING 1
 `
 )
 
@@ -224,11 +251,17 @@
 	feedback Feedback,
 ) ([]string, error) {
 
-	insertStmt, err := conn.PrepareContext(ctx, insertGMSQL)
+	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
 	if err != nil {
 		return nil, err
 	}
-	defer insertStmt.Close()
+	defer insertGPStmt.Close()
+
+	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertGMStmt.Close()
 
 	result, err := fetch()
 	if err != nil {
@@ -237,7 +270,7 @@
 
 	var gids []string
 	for _, msg := range result {
-		var gid int64
+		var dummy int
 		for _, wrm := range msg.Wrm {
 			curr := string(*wrm.Geo_object.Id)
 			currIsrs, err := models.IsrsFromString(curr)
@@ -245,7 +278,7 @@
 				feedback.Warn("Invalid ISRS code %v", err)
 				continue
 			}
-			feedback.Info("Found measurements for %s", curr)
+			feedback.Info("Found measurements/predictions for %s", curr)
 
 			var referenceCode string
 			if wrm.Reference_code == nil {
@@ -255,7 +288,7 @@
 				referenceCode = string(*wrm.Reference_code)
 			}
 
-			var newCnt int = 0
+			newM, newP := 0, 0
 			for _, measure := range wrm.Measure {
 				var unit string
 				if measure.Unit == nil {
@@ -273,39 +306,81 @@
 				convert(measure.Value_max)
 
 				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
-				err = insertStmt.QueryRowContext(
-					ctx,
-					currIsrs.CountryCode,
-					currIsrs.LoCode,
-					currIsrs.FairwaySection,
-					currIsrs.Orc,
-					currIsrs.Hectometre,
-					measure.Measuredate,
-					msg.Identification.From,
-					msg.Identification.Language_code,
-					msg.Identification.Country_code,
-					msg.Identification.Date_issue,
-					referenceCode,
-					measure.Value,
-					measure.Predicted,
-					isWaterlevel,
-					measure.Value_min,
-					measure.Value_max,
-					msg.Identification.Date_issue,
-					msg.Identification.Originator,
-					true, // staging_done
-				).Scan(&gid)
-				switch {
-				case err == sql.ErrNoRows:
-					// thats expected, nothing to do
-				case err != nil:
-					feedback.Warn(handleError(err).Error())
-				default:
-					newCnt++
+
+				if measure.Predicted {
+					var confInterval pgtype.Numrange
+					if measure.Value_min != nil && measure.Value_max != nil {
+						var valueMin, valueMax pgtype.Numeric
+						valueMin.Set(measure.Value_min)
+						valueMax.Set(measure.Value_max)
+						confInterval = pgtype.Numrange{
+							Lower:     valueMin,
+							Upper:     valueMax,
+							LowerType: pgtype.Inclusive,
+							UpperType: pgtype.Inclusive,
+							Status:    pgtype.Present,
+						}
+					}
+					err = insertGPStmt.QueryRowContext(
+						ctx,
+						currIsrs.CountryCode,
+						currIsrs.LoCode,
+						currIsrs.FairwaySection,
+						currIsrs.Orc,
+						currIsrs.Hectometre,
+						measure.Measuredate,
+						msg.Identification.From,
+						msg.Identification.Language_code,
+						msg.Identification.Country_code,
+						msg.Identification.Date_issue,
+						referenceCode,
+						measure.Value,
+						isWaterlevel,
+						&confInterval,
+						msg.Identification.Date_issue,
+						msg.Identification.Originator,
+					).Scan(&dummy)
+					switch {
+					case err == sql.ErrNoRows:
+						// thats expected, nothing to do
+					case err != nil:
+						feedback.Warn(handleError(err).Error())
+					default:
+						newP++
+					}
+				} else {
+					err = insertGMStmt.QueryRowContext(
+						ctx,
+						currIsrs.CountryCode,
+						currIsrs.LoCode,
+						currIsrs.FairwaySection,
+						currIsrs.Orc,
+						currIsrs.Hectometre,
+						measure.Measuredate,
+						msg.Identification.From,
+						msg.Identification.Language_code,
+						msg.Identification.Country_code,
+						msg.Identification.Date_issue,
+						referenceCode,
+						measure.Value,
+						isWaterlevel,
+						msg.Identification.Date_issue,
+						msg.Identification.Originator,
+					).Scan(&dummy)
+					switch {
+					case err == sql.ErrNoRows:
+						// thats expected, nothing to do
+					case err != nil:
+						feedback.Warn(handleError(err).Error())
+					default:
+						newM++
+					}
 				}
 			}
 			feedback.Info("Inserted %d measurements for %s",
-				newCnt, curr)
+				newM, curr)
+			feedback.Info("Inserted %d predictions for %s",
+				newP, curr)
 			gids = append(gids, curr)
 		}
 	}
--- a/schema/gemma.sql	Wed May 15 17:55:38 2019 +0200
+++ b/schema/gemma.sql	Wed May 15 19:08:49 2019 +0200
@@ -306,38 +306,34 @@
         fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges,
         measure_date timestamp with time zone NOT NULL,
         country_code char(2) NOT NULL REFERENCES countries,
-        -- TODO: add relations to stuff provided as enumerations
-        sender varchar NOT NULL, -- "from" attribute from DRC
+        sender varchar NOT NULL, -- "from" element from NtS response
         language_code varchar NOT NULL REFERENCES language_codes,
         date_issue timestamp with time zone NOT NULL,
         reference_code varchar(4) NOT NULL REFERENCES depth_references,
         water_level double precision NOT NULL,
-        predicted boolean NOT NULL,
         is_waterlevel boolean NOT NULL,
-        -- XXX: "measure_code" if really only W or Q
-        -- XXX: Do we need "unit" attribute or can we normalise on import?
-        value_min double precision, -- XXX: NOT NULL if predicted?
-        value_max double precision, -- XXX: NOT NULL if predicted?
-        --- TODO: Add a double range type for checking?
         date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
-        source_organization varchar NOT NULL, -- "originator"
-        staging_done boolean NOT NULL DEFAULT false
+        source_organization varchar NOT NULL, -- "originator" from NtS response
+        staging_done boolean NOT NULL DEFAULT false,
+        UNIQUE (fk_gauge_id, measure_date, staging_done)
     )
-    -- Constraints are conditional for gauge_measurements, as they
-    -- differ between predicted values and measured ones.  PG does not
-    -- have real conditional unique constraints, but we can use unique
-    -- indeces for that.
-    --
-    -- So we can have a staged and a non-staged
-    -- fk_gauge_id/measure_date pairs in measured values.
-    CREATE UNIQUE INDEX gm_measured_unique_constraint
-        ON gauge_measurements (fk_gauge_id, measure_date, staging_done)
-        WHERE NOT predicted
-    -- And we can have multiple predictions for one point in time
-    -- (but they are never staged).
-    CREATE UNIQUE INDEX gm_predicted_unique_constraint
-        ON gauge_measurements (fk_gauge_id, measure_date, date_issue)
-        WHERE predicted
+
+    CREATE TABLE gauge_predictions (
+        fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges,
+        measure_date timestamp with time zone NOT NULL,
+        country_code char(2) NOT NULL REFERENCES countries,
+        sender varchar NOT NULL, -- "from" element from NtS response
+        language_code varchar NOT NULL REFERENCES language_codes,
+        date_issue timestamp with time zone NOT NULL,
+        reference_code varchar(4) NOT NULL REFERENCES depth_references,
+        water_level double precision NOT NULL,
+        is_waterlevel boolean NOT NULL,
+        conf_interval numrange
+            CHECK (conf_interval @> CAST(water_level AS numeric)),
+        date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
+        source_organization varchar NOT NULL, -- "originator" from NtS response
+        PRIMARY KEY (fk_gauge_id, measure_date, date_issue)
+    )
 
     CREATE TABLE waterway_axis (
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
--- a/schema/geoserver_views.sql	Wed May 15 17:55:38 2019 +0200
+++ b/schema/geoserver_views.sql	Wed May 15 19:08:49 2019 +0200
@@ -46,7 +46,7 @@
     waterlevel_latest AS (
         SELECT DISTINCT ON (fk_gauge_id) fk_gauge_id, water_level
             FROM waterway.gauge_measurements
-            WHERE is_waterlevel AND NOT predicted
+            WHERE is_waterlevel
             ORDER BY fk_gauge_id, measure_date DESC)
     SELECT
         b.id,