changeset 3302:ec6163c6687d

'Historicise' gauges on import Gauge data sets will be updated or a new version will be inserted depending on temporal validity and a timestamp marking the last update in the RIS-Index of a data set. The trigger on date_info is removed because the value is actually an attribut coming from the RIS-Index. Gauge measurements and predictions are associated to the version with matching temporal validity. Bottlenecks are always associated to the actual version of the gauge, although this might change as soon as bottlenecks are 'historicised', too.
author Tom Gottfried <tom@intevation.de>
date Thu, 16 May 2019 18:41:43 +0200
parents 6514b943654e
children d6405c6769a3 9dc7d803e51f
files pkg/controllers/bottlenecks.go pkg/controllers/gauges.go pkg/controllers/surveys.go pkg/imports/agm.go pkg/imports/bn.go pkg/imports/gm.go pkg/imports/wg.go pkg/models/sr.go schema/auth.sql schema/auth_tests.sql schema/gemma.sql schema/geoserver_views.sql schema/tap_tests_data.sql
diffstat 13 files changed, 297 insertions(+), 147 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/bottlenecks.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/controllers/bottlenecks.go	Thu May 16 18:41:43 2019 +0200
@@ -73,9 +73,10 @@
 SELECT
   grwl.depth_reference,
   grwl.value
-FROM waterway.gauges_reference_water_levels grwl JOIN 
-     waterway.bottlenecks bns
-	 ON bns.fk_g_fid = grwl.gauge_id
+FROM waterway.gauges_reference_water_levels grwl
+  JOIN waterway.bottlenecks bns
+    ON grwl.location = bns.gauge_location
+      AND grwl.validity = bns.gauge_validity
 WHERE bns.objnam = $1 AND (
   grwl.depth_reference like 'HDC%' OR
   grwl.depth_reference like 'LDC%' OR
@@ -85,9 +86,10 @@
 	selectGaugeLDCSQL = `
 SELECT
   grwl.value
-FROM waterway.gauges_reference_water_levels grwl JOIN
-     waterway.bottlenecks bns
-	 ON bns.fk_g_fid = grwl.gauge_id
+FROM waterway.gauges_reference_water_levels grwl
+  JOIN waterway.bottlenecks bns
+    ON grwl.location = bns.gauge_location
+      AND grwl.validity = bns.gauge_validity
 WHERE bns.objnam = $1 AND grwl.depth_reference like 'LDC%'
 `
 )
--- a/pkg/controllers/gauges.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/controllers/gauges.go	Thu May 16 18:41:43 2019 +0200
@@ -44,7 +44,7 @@
   water_level
 FROM (
   SELECT
-    fk_gauge_id,
+    location,
     measure_date,
     date_issue,
     false AS predicted,
@@ -52,7 +52,7 @@
   FROM waterway.gauge_measurements
   UNION ALL
   SELECT
-    fk_gauge_id,
+    location,
     measure_date,
     date_issue,
     true AS predicted,
@@ -60,7 +60,7 @@
   FROM waterway.gauge_predictions
 ) AS gmp
 WHERE
-  fk_gauge_id = (
+  location = (
     $1::char(2),
     $2::char(3),
     $3::char(5),
@@ -80,7 +80,7 @@
   predicted
 FROM (
   SELECT
-    fk_gauge_id,
+    location,
     measure_date,
     date_issue,
     water_level,
@@ -90,7 +90,7 @@
   FROM waterway.gauge_measurements
   UNION ALL
   SELECT
-    fk_gauge_id,
+    location,
     measure_date,
     date_issue,
     water_level,
@@ -106,8 +106,9 @@
 SELECT
   min(measure_date),
   max(measure_date)
-FROM waterway.gauge_measurements WHERE
-  fk_gauge_id = (
+FROM waterway.gauge_measurements
+WHERE
+  location = (
     $1::char(2),
     $2::char(3),
     $3::char(5),
@@ -127,8 +128,9 @@
   avg(water_level) AS mean,
   min(water_level) AS min,
   max(water_level) AS max
-FROM waterway.gauge_measurements WHERE
-  fk_gauge_id = (
+FROM waterway.gauge_measurements
+WHERE
+  location = (
     $1::char(2),
     $2::char(3),
     $3::char(5),
@@ -145,14 +147,14 @@
   water_level
 FROM waterway.gauge_measurements
 WHERE
-  staging_done AND
-  fk_gauge_id = (
+  location = (
     $1::char(2),
     $2::char(3),
     $3::char(5),
     $4::char(5),
     $5::int
-  )
+   )::isrs
+  AND staging_done
   AND measure_date BETWEEN $6 AND $7
 ORDER BY measure_date
 `
@@ -663,7 +665,7 @@
 
 	filters := filterAnd{
 		buildFilterTerm(
-			"fk_gauge_id = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)",
+			"location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)",
 			isrs.CountryCode,
 			isrs.LoCode,
 			isrs.FairwaySection,
@@ -674,8 +676,9 @@
 			&filterNot{&filterTerm{format: "predicted"}},
 			buildFilterTerm(
 				`date_issue = (
-                 SELECT max(date_issue) FROM waterway.gauge_measurements
-                 WHERE fk_gauge_id = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`,
+                 SELECT max(date_issue)
+                 FROM waterway.gauge_measurements gm
+                 WHERE location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`,
 				isrs.CountryCode,
 				isrs.LoCode,
 				isrs.FairwaySection,
--- a/pkg/controllers/surveys.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/controllers/surveys.go	Thu May 16 18:41:43 2019 +0200
@@ -29,19 +29,15 @@
   s.bottleneck_id,
   s.date_info::text,
   s.depth_reference,
-  bg.objname AS gauge_objname,
+  g.objname AS gauge_objname,
   r.value AS waterlevel_value
-FROM
-  (
-	( SELECT * FROM waterway.bottlenecks AS b, waterway.gauges AS g
-		WHERE b.fk_g_fid = g.location
-	) AS bg
-	JOIN waterway.sounding_results AS s
-	ON bg.id = s.bottleneck_id
-  )
-LEFT JOIN waterway.gauges_reference_water_levels AS r
-ON s.depth_reference = r.depth_reference AND bg.location = r.gauge_id
-WHERE bg.objnam=$1`
+FROM waterway.bottlenecks AS b
+  JOIN waterway.gauges AS g
+    ON b.gauge_location = g.location AND b.gauge_validity = g.validity
+  JOIN waterway.sounding_results AS s ON b.id = s.bottleneck_id
+  LEFT JOIN waterway.gauges_reference_water_levels AS r
+    USING (depth_reference, location, validity)
+WHERE b.objnam = $1`
 )
 
 func listSurveys(
--- a/pkg/imports/agm.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/imports/agm.go	Thu May 16 18:41:43 2019 +0200
@@ -75,7 +75,7 @@
   SELECT o.id AS id
   FROM waterway.gauge_measurements o
   JOIN waterway.gauge_measurements n
-    ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
+    USING (location, validity, measure_date)
     WHERE n.id IN (SELECT key FROM staged)
 	  AND o.id NOT IN (SELECT key FROM staged)
 )
@@ -159,12 +159,16 @@
   source_organization
 FROM waterway.gauge_measurements
 WHERE
-  fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND
-  measure_date = $6 AND staging_done`
+  location
+    = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+  AND measure_date = $6
+  AND staging_done
+`
 
 	agmInsertSQL = `
 INSERT INTO waterway.gauge_measurements (
-  fk_gauge_id,
+  location,
+  validity,
   measure_date,
   country_code,
   sender,
@@ -175,8 +179,12 @@
   date_info,
   source_organization,
   staging_done
-) VALUES(
+) VALUES (
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
+  (SELECT validity FROM waterway.gauges
+     WHERE location
+          = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+       AND validity @> CAST($6 AS timestamp with time zone)),
   $6,
   $7,
   $8,
--- a/pkg/imports/bn.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/imports/bn.go	Thu May 16 18:41:43 2019 +0200
@@ -47,7 +47,8 @@
 	insertBottleneckSQL = `
 INSERT INTO waterway.bottlenecks (
   bottleneck_id,
-  fk_g_fid,
+  gauge_location,
+  gauge_validity,
   objnam,
   nobjnm,
   stretch,
@@ -59,9 +60,11 @@
   limiting,
   date_info,
   source_organization
-) VALUES(
+) VALUES (
   $1,
   isrs_fromText($2),
+  (SELECT validity FROM waterway.gauges
+     WHERE location = isrs_fromText($2) AND NOT erased),
   $3,
   $4,
   isrsrange(least(isrs_fromText($5), isrs_fromText($6)),
--- a/pkg/imports/gm.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/imports/gm.go	Thu May 16 18:41:43 2019 +0200
@@ -55,11 +55,12 @@
 `
 
 	// Note: we do not expect corrections of data through this service.  So
-	// any constraint conflicts are triggered by actual redundat data which
+	// any constraint conflicts are triggered by redundant data which
 	// can be dropped.
 	insertGMSQL = `
 INSERT INTO waterway.gauge_measurements (
-  fk_gauge_id,
+  location,
+  validity,
   measure_date,
   sender,
   language_code,
@@ -70,8 +71,12 @@
   date_info,
   source_organization,
   staging_done
-) VALUES(
+) VALUES (
   ($1, $2, $3, $4, $5),
+  (SELECT validity FROM waterway.gauges
+     WHERE location
+          = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+       AND validity @> CAST($6 AS timestamp with time zone)),
   $6,
   $7,
   $8,
@@ -89,7 +94,8 @@
 
 	insertGPSQL = `
 INSERT INTO waterway.gauge_predictions (
-  fk_gauge_id,
+  location,
+  validity,
   measure_date,
   sender,
   language_code,
@@ -102,6 +108,10 @@
   source_organization
 ) VALUES(
   ($1, $2, $3, $4, $5),
+  (SELECT validity FROM waterway.gauges
+     WHERE location
+          = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+       AND validity @> CAST($6 AS timestamp with time zone)),
   $6,
   $7,
   $8,
--- a/pkg/imports/wg.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/imports/wg.go	Thu May 16 18:41:43 2019 +0200
@@ -65,11 +65,15 @@
 func (*WaterwayGauge) CleanUp() error { return nil }
 
 const (
-	deleteReferenceWaterLevelsSQL = `
-DELETE FROM waterway.gauges_reference_water_levels
-WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
-  AND depth_reference <> ALL($6)
-RETURNING depth_reference
+	eraseGaugeSQL = `
+UPDATE waterway.gauges SET
+  erased = true,
+   -- Set enddate of old entry to new startdate in case of overlap:
+  validity = validity - $2
+WHERE isrs_astext(location) = $1
+  AND NOT erased
+  -- Don't touch old entry if validity did not change: will be updated
+  AND validity <> $2
 `
 
 	insertGaugeSQL = `
@@ -83,7 +87,8 @@
   zero_point,
   geodref,
   date_info,
-  source_organization
+  source_organization,
+  lastupdate
 ) VALUES (
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
   $6,
@@ -94,17 +99,36 @@
   $12,
   $13,
   $14,
-  $15
-) ON CONFLICT (location) DO UPDATE SET
-    objname = EXCLUDED.objname,
-    geom = EXCLUDED.geom,
-    applicability_from_km = EXCLUDED.applicability_from_km,
-    applicability_to_km = EXCLUDED.applicability_to_km,
-    validity = EXCLUDED.validity,
-    zero_point = EXCLUDED.zero_point,
-    geodref = EXCLUDED.geodref,
-    date_info = EXCLUDED.date_info,
-    source_organization = EXCLUDED.source_organization
+  $15,
+  $16
+-- Exclusion constraints are not supported as arbiters.
+-- Thus we need to DO NOTHING here and use an extra UPDATE statement
+) ON CONFLICT DO NOTHING
+RETURNING 1
+`
+	updateGaugeSQL = `
+UPDATE waterway.gauges SET
+  objname = $6,
+  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
+  applicability_from_km = $9,
+  applicability_to_km = $10,
+  zero_point = $11,
+  geodref = $12,
+  date_info = $13,
+  source_organization = $14,
+  lastupdate = $15
+WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+  AND NOT erased
+  AND $15 > lastupdate
+RETURNING 1
+`
+
+	deleteReferenceWaterLevelsSQL = `
+DELETE FROM waterway.gauges_reference_water_levels
+WHERE isrs_astext(location) = $1
+  AND validity = $2
+  AND depth_reference <> ALL($3)
+RETURNING depth_reference
 `
 
 	isNtSDepthRefSQL = `
@@ -112,14 +136,16 @@
 
 	insertReferenceWaterLevelsSQL = `
 INSERT INTO waterway.gauges_reference_water_levels (
-  gauge_id,
+  location,
+  validity,
   depth_reference,
   value
 ) VALUES (
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
   $6,
-  $7
-) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET
+  $7,
+  $8
+) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
     value = EXCLUDED.value
 `
 )
@@ -190,21 +216,24 @@
 			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
 		}
 	}
-	feedback.Info("ignored gauges: %d", ignored)
-	feedback.Info("insert/update gauges: %d", len(gauges))
+	feedback.Info("Ignored gauges: %d", ignored)
+	feedback.Info("Further process %d gauges", len(gauges))
 
 	if len(gauges) == 0 {
-		return nil, UnchangedError("nothing to do")
+		return nil, UnchangedError("Nothing to do")
 	}
 
 	// insert/update the gauges
-	var insertStmt, deleteReferenceWaterLevelsStmt,
+	var eraseGaugeStmt, insertStmt, updateStmt,
+		deleteReferenceWaterLevelsStmt,
 		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
 	for _, x := range []struct {
 		sql  string
 		stmt **sql.Stmt
 	}{
+		{eraseGaugeSQL, &eraseGaugeStmt},
 		{insertGaugeSQL, &insertStmt},
+		{updateGaugeSQL, &updateStmt},
 		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
 		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
 		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
@@ -216,11 +245,13 @@
 		defer (*x.stmt).Close()
 	}
 
+	var unchanged int
+
 	for i := range gauges {
 		ic := &gauges[i]
 		dr := responseData[ic.jdx].RisdataReturn[ic.idx]
 
-		feedback.Info("insert/update %s", ic.code)
+		feedback.Info("Processing %s", ic.code)
 
 		var from, to sql.NullInt64
 
@@ -265,7 +296,7 @@
 			Lower:     tfrom,
 			Upper:     tto,
 			LowerType: pgtype.Inclusive,
-			UpperType: pgtype.Inclusive,
+			UpperType: pgtype.Exclusive,
 			Status:    pgtype.Present,
 		}
 
@@ -302,7 +333,22 @@
 		}
 		defer tx.Rollback()
 
-		if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
+		// Mark old entries of gauge as erased, if applicable
+		if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
+			ic.code.String(),
+			validity,
+		); err != nil {
+			feedback.Warn(handleError(err).Error())
+			if err2 := tx.Rollback(); err2 != nil {
+				return nil, err2
+			}
+			unchanged++
+			continue
+		}
+
+		// Try to insert gauge entry
+		var dummy int
+		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
 			ic.code.CountryCode,
 			ic.code.LoCode,
 			ic.code.FairwaySection,
@@ -317,42 +363,83 @@
 			geodref,
 			&dateInfo,
 			source,
-		); err != nil {
+			time.Time(*dr.Lastupdate),
+		).Scan(&dummy)
+		switch {
+		case err == sql.ErrNoRows:
+			// Assume constraint conflict, try to update
+			err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
+				ic.code.CountryCode,
+				ic.code.LoCode,
+				ic.code.FairwaySection,
+				ic.code.Orc,
+				ic.code.Hectometre,
+				string(*dr.Objname.Loc),
+				float64(*dr.Lon), float64(*dr.Lat),
+				from,
+				to,
+				float64(*dr.Zeropoint),
+				geodref,
+				&dateInfo,
+				source,
+				time.Time(*dr.Lastupdate),
+			).Scan(&dummy)
+			switch {
+			case err2 == sql.ErrNoRows:
+				feedback.Info("unchanged")
+				if err3 := tx.Rollback(); err3 != nil {
+					return nil, err3
+				}
+				unchanged++
+				continue
+			case err2 != nil:
+				feedback.Warn(handleError(err2).Error())
+				if err3 := tx.Rollback(); err3 != nil {
+					return nil, err3
+				}
+				unchanged++
+				continue
+			default:
+				feedback.Info("update")
+			}
+
+			// Remove obsolete reference water levels
+			var currLevels pgtype.VarcharArray
+			currLevels.Set([]string{
+				string(*dr.Reflevel1code),
+				string(*dr.Reflevel2code),
+				string(*dr.Reflevel3code),
+			})
+			rwls, err2 := tx.StmtContext(ctx,
+				deleteReferenceWaterLevelsStmt).QueryContext(ctx,
+				ic.code.String(),
+				&validity,
+				&currLevels,
+			)
+			if err2 != nil {
+				return nil, err2
+			}
+			defer rwls.Close()
+			for rwls.Next() {
+				var delRef string
+				if err2 = rwls.Scan(&delRef); err2 != nil {
+					return nil, err2
+				}
+				feedback.Warn("Removed reference water level %s from %s",
+					delRef, ic.code)
+			}
+		case err != nil:
 			feedback.Warn(handleError(err).Error())
-			tx.Rollback()
+			if err2 := tx.Rollback(); err2 != nil {
+				return nil, err2
+			}
+			unchanged++
 			continue
+		default:
+			feedback.Info("insert new version")
 		}
 
-		// Remove obsolete reference water levels
-		var currLevels pgtype.VarcharArray
-		currLevels.Set([]string{
-			string(*dr.Reflevel1code),
-			string(*dr.Reflevel2code),
-			string(*dr.Reflevel3code),
-		})
-		rwls, err := tx.StmtContext(
-			ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx,
-			ic.code.CountryCode,
-			ic.code.LoCode,
-			ic.code.FairwaySection,
-			ic.code.Orc,
-			ic.code.Hectometre,
-			&currLevels,
-		)
-		if err != nil {
-			return nil, err
-		}
-		defer rwls.Close()
-		for rwls.Next() {
-			var delRef string
-			if err = rwls.Scan(&delRef); err != nil {
-				return nil, err
-			}
-			feedback.Warn("Removed reference water level %s from %s",
-				delRef, ic.code)
-		}
-
-		// Insert/update reference water levels
+		// "Upsert" reference water levels
 		for _, wl := range []struct {
 			level **erdms.RisreflevelcodeType
 			value **erdms.RisreflevelvalueType
@@ -388,6 +475,7 @@
 				ic.code.FairwaySection,
 				ic.code.Orc,
 				ic.code.Hectometre,
+				&validity,
 				string(**wl.level),
 				int64(**wl.value),
 			); err != nil {
@@ -402,8 +490,12 @@
 		}
 	}
 
-	feedback.Info("Refreshing gauges took %s.",
+	feedback.Info("Importing gauges took %s",
 		time.Since(start))
 
+	if unchanged == len(gauges) {
+		return nil, UnchangedError("All gauges unchanged")
+	}
+
 	return nil, err
 }
--- a/pkg/models/sr.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/models/sr.go	Thu May 16 18:41:43 2019 +0200
@@ -38,8 +38,9 @@
 	checkDepthReferenceSQL = `
 SELECT EXISTS(SELECT 1
   FROM waterway.bottlenecks bn
-    JOIN waterway.gauges g ON g.location = bn.fk_g_fid
-    JOIN waterway.gauges_reference_water_levels rl ON rl.gauge_id = g.location
+    JOIN waterway.gauges g
+      ON bn.gauge_location = g.location AND bn.gauge_validity = g.validity
+    JOIN waterway.gauges_reference_water_levels rl USING (location, validity)
   WHERE bn.objnam = $1
     AND rl.depth_reference = $2)`
 
--- a/schema/auth.sql	Thu May 16 17:22:33 2019 +0200
+++ b/schema/auth.sql	Thu May 16 18:41:43 2019 +0200
@@ -133,7 +133,9 @@
 
 CREATE POLICY same_country ON waterway.gauge_measurements
     FOR ALL TO waterway_admin
-    USING ((fk_gauge_id).country_code = (SELECT country FROM users.list_users WHERE username = current_user));
+    USING ((location).country_code
+        = (SELECT country FROM users.list_users WHERE username = current_user)
+    );
 
 CREATE POLICY same_country ON waterway.waterway_profiles
     FOR ALL TO waterway_admin
--- a/schema/auth_tests.sql	Thu May 16 17:22:33 2019 +0200
+++ b/schema/auth_tests.sql	Thu May 16 18:41:43 2019 +0200
@@ -75,16 +75,18 @@
 
 PREPARE bn_insert (varchar, geometry(MULTIPOLYGON, 4326)) AS
     INSERT INTO waterway.bottlenecks (
-        bottleneck_id, fk_g_fid, stretch, area, rb, lb, responsible_country,
+        gauge_location, gauge_validity,
+        bottleneck_id, stretch, area, rb, lb, responsible_country,
         revisiting_time, limiting, source_organization)
-        VALUES (
+        SELECT
+            location, validity,
             $1,
-            ('AT', 'XXX', '00001', 'G0001', 1)::isrs,
             isrsrange(('AT', 'XXX', '00001', '00000', 0)::isrs,
                 ('AT', 'XXX', '00001', '00000', 2)::isrs),
             $2, 'AT', 'AT', 'AT',
             1, 'depth', 'testorganization'
-        );
+        FROM waterway.gauges
+        WHERE location = ('AT', 'XXX', '00001', 'G0001', 1)::isrs;
 SELECT lives_ok($$
     EXECUTE bn_insert(
         'test1',
--- a/schema/gemma.sql	Thu May 16 17:22:33 2019 +0200
+++ b/schema/gemma.sql	Thu May 16 18:41:43 2019 +0200
@@ -273,7 +273,7 @@
 
 
     CREATE TABLE gauges (
-        location isrs PRIMARY KEY CHECK(
+        location isrs CHECK(
             (location).orc SIMILAR TO 'G[[:digit:]]{4}'
             AND CAST(substring((location).orc from 2 for 4) AS int) < 2048),
         objname varchar NOT NULL,
@@ -281,56 +281,70 @@
         applicability_from_km int8,
         applicability_to_km int8,
         validity tstzrange,
-        -- pasted text from a more general specification is given
-        -- (a gauge is not a berth!)
-        -- TODO: Ranges need a joint exclusion constaint to prevent overlaps?
         zero_point double precision NOT NULL,
         geodref varchar,
-        date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
-        source_organization varchar
+        date_info timestamp with time zone NOT NULL,
+        source_organization varchar,
+        lastupdate timestamp with time zone NOT NULL,
+        -- entry removed from external data source (RIS-Index)/historicised:
+        erased boolean NOT NULL DEFAULT false,
+        PRIMARY KEY (location, validity),
+        EXCLUDE USING GiST (isrs_astext(location) WITH =, validity WITH &&)
     )
-    CREATE TRIGGER gauges_date_info BEFORE UPDATE ON gauges
-        FOR EACH ROW EXECUTE PROCEDURE update_date_info()
+    -- Allow only one non-erased entry per location
+    CREATE UNIQUE INDEX gauges_erased_unique_constraint
+        ON gauges (location)
+        WHERE NOT erased
 
     CREATE TABLE gauges_reference_water_levels (
-        gauge_id isrs NOT NULL REFERENCES gauges,
+        location isrs NOT NULL,
+        validity tstzrange NOT NULL,
+        FOREIGN KEY (location, validity) REFERENCES gauges,
         -- Omit foreign key constraint to be able to store not NtS-compliant
         -- names, too:
         depth_reference varchar NOT NULL, -- REFERENCES depth_references,
-        PRIMARY KEY (gauge_id, depth_reference),
+        PRIMARY KEY (location, validity, depth_reference),
         value int NOT NULL
     )
 
     CREATE TABLE gauge_measurements (
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-        fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges,
+        location isrs NOT NULL,
+        validity tstzrange NOT NULL,
+        CONSTRAINT gauge_key
+            FOREIGN KEY (location, validity) REFERENCES gauges,
         measure_date timestamp with time zone NOT NULL,
         country_code char(2) NOT NULL REFERENCES countries,
         sender varchar NOT NULL, -- "from" element from NtS response
         language_code varchar NOT NULL REFERENCES language_codes,
-        date_issue timestamp with time zone NOT NULL,
+        date_issue timestamp with time zone
+            NOT NULL CHECK (date_issue <@ validity),
         reference_code varchar(4) NOT NULL REFERENCES depth_references,
         water_level double precision NOT NULL,
         date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
         source_organization varchar NOT NULL, -- "originator" from NtS response
         staging_done boolean NOT NULL DEFAULT false,
-        UNIQUE (fk_gauge_id, measure_date, staging_done)
+        UNIQUE (location, validity, measure_date, staging_done)
     )
 
     CREATE TABLE gauge_predictions (
-        fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges,
+        location isrs NOT NULL,
+        validity tstzrange NOT NULL,
+        CONSTRAINT gauge_key
+            FOREIGN KEY (location, validity) REFERENCES gauges,
         measure_date timestamp with time zone NOT NULL,
         country_code char(2) NOT NULL REFERENCES countries,
         sender varchar NOT NULL, -- "from" element from NtS response
         language_code varchar NOT NULL REFERENCES language_codes,
-        date_issue timestamp with time zone NOT NULL,
+        date_issue timestamp with time zone
+            NOT NULL CHECK (date_issue <@ validity),
         reference_code varchar(4) NOT NULL REFERENCES depth_references,
         water_level double precision NOT NULL,
         conf_interval numrange
             CHECK (conf_interval @> CAST(water_level AS numeric)),
         date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
         source_organization varchar NOT NULL, -- "originator" from NtS response
-        PRIMARY KEY (fk_gauge_id, measure_date, date_issue)
+        PRIMARY KEY (location, validity, measure_date, date_issue)
     )
 
     CREATE TABLE waterway_axis (
@@ -474,7 +488,9 @@
     CREATE TABLE bottlenecks (
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
         bottleneck_id varchar UNIQUE NOT NULL,
-        fk_g_fid isrs NOT NULL REFERENCES gauges,
+        gauge_location isrs NOT NULL,
+        gauge_validity tstzrange NOT NULL,
+        FOREIGN KEY (gauge_location, gauge_validity) REFERENCES gauges,
         -- XXX: DRC references "ch. 3.1.1", which does not exist in document.
         objnam varchar,
         nobjnm varchar,
--- a/schema/geoserver_views.sql	Thu May 16 17:22:33 2019 +0200
+++ b/schema/geoserver_views.sql	Thu May 16 18:41:43 2019 +0200
@@ -16,8 +16,9 @@
             AS reference_water_levels
     FROM waterway.gauges g
         LEFT JOIN waterway.gauges_reference_water_levels r
-            ON r.gauge_id = g.location
-    GROUP BY g.location;
+            USING (location, validity)
+    WHERE NOT g.erased
+    GROUP BY g.location, g.validity;
 
 CREATE OR REPLACE VIEW waterway.distance_marks_geoserver AS
     SELECT location_code,
@@ -44,9 +45,9 @@
             FROM waterway.fairway_availability
             ORDER BY bottleneck_id, date_info DESC),
     waterlevel_latest AS (
-        SELECT DISTINCT ON (fk_gauge_id) fk_gauge_id, water_level
+        SELECT DISTINCT ON (location) location, water_level
             FROM waterway.gauge_measurements
-            ORDER BY fk_gauge_id, measure_date DESC)
+            ORDER BY location, measure_date DESC)
     SELECT
         b.id,
         b.bottleneck_id,
@@ -71,14 +72,16 @@
         wl.water_level AS gm_waterlevel
     FROM waterway.bottlenecks b
         LEFT JOIN waterway.gauges g
-            ON b.fk_g_fid = g.location
+            ON b.gauge_location = g.location AND b.gauge_validity = g.validity
         LEFT JOIN waterway.gauges_reference_water_levels r
-            ON g.location = r.gauge_id
+            USING (location, validity)
         LEFT JOIN fairway_availability_latest fal
             ON b.id = fal.bottleneck_id
         LEFT JOIN waterlevel_latest wl
-            ON b.fk_g_fid = wl.fk_gauge_id
-    GROUP BY b.id, g.location, fal.date_info, fal.critical, wl.water_level;
+            USING (location)
+    WHERE NOT g.erased
+    GROUP BY b.id, g.location, g.validity,
+        fal.date_info, fal.critical, wl.water_level;
 
 CREATE OR REPLACE VIEW waterway.stretches_geoserver AS
     SELECT
--- a/schema/tap_tests_data.sql	Thu May 16 17:22:33 2019 +0200
+++ b/schema/tap_tests_data.sql	Thu May 16 18:41:43 2019 +0200
@@ -37,22 +37,30 @@
 
 INSERT INTO limiting_factors VALUES ('depth'), ('width');
 
-INSERT INTO waterway.gauges (
-    location, objname, geom, zero_point, source_organization)
+WITH
+gs AS (
+    INSERT INTO waterway.gauges (
+        location,
+        validity,
+        objname,
+        geom,
+        zero_point,
+        date_info,
+        source_organization,
+        lastupdate)
     VALUES (
         ('AT', 'XXX', '00001', 'G0001', 1)::isrs,
+        tstzrange(current_timestamp - '1 day'::interval, current_timestamp),
         'testgauge',
         ST_geomfromtext('POINT(0 0)', 4326),
         0,
-        'testorganization'
-    );
-
-INSERT INTO waterway.bottlenecks (
-    bottleneck_id, fk_g_fid, stretch, area, rb, lb, responsible_country,
-    revisiting_time, limiting, source_organization, staging_done)
+        current_timestamp,
+        'testorganization',
+        current_timestamp)
+    RETURNING location, validity),
+bns AS (
     VALUES (
         'testbottleneck1',
-        ('AT', 'XXX', '00001', 'G0001', 1)::isrs,
         isrsrange(('AT', 'XXX', '00001', '00000', 0)::isrs,
             ('AT', 'XXX', '00001', '00000', 2)::isrs),
         ST_geomfromtext('MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)))', 4326),
@@ -60,13 +68,17 @@
         1, 'depth', 'testorganization', false
     ), (
         'testbottleneck2',
-        ('AT', 'XXX', '00001', 'G0001', 1)::isrs,
         isrsrange(('AT', 'XXX', '00001', '00000', 0)::isrs,
             ('AT', 'XXX', '00001', '00000', 2)::isrs),
         ST_geomfromtext('MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)))', 4326),
         'AT', 'AT', 'AT',
         1, 'depth', 'testorganization', true
-    );
+    ))
+INSERT INTO waterway.bottlenecks (
+    gauge_location, gauge_validity,
+    bottleneck_id, stretch, area, rb, lb, responsible_country,
+    revisiting_time, limiting, source_organization, staging_done)
+    SELECT * FROM gs, bns;
 
 INSERT INTO waterway.distance_marks_virtual VALUES (
     ('AT', 'XXX', '00001', '00000', 0)::isrs,