changeset 3666:db87f34805fb

Align bottleneck validity at gauges Ensuring the validity of a bottleneck version is always contained by the validity of the referenced gauge version allows to reliably determine matching reference values of the gauge at a point in time. Since this implies that a bottleneck version might be cut into more than one time ranges, the concept of having only one non-erased version is no longer applicable and replaced by using the 'current' version of a bottleneck. Fairway availability data are always kept with the 'current' bottleneck version to have them at hand alltogether for analyses over longer time ranges.
author Tom Gottfried <tom@intevation.de>
date Sat, 15 Jun 2019 14:36:50 +0200
parents 29ef6d41e4af
children c91bcb92e0b7
files pkg/controllers/bottlenecks.go pkg/controllers/search.go pkg/controllers/stretches.go pkg/imports/bn.go pkg/imports/fa.go schema/auth.sql schema/gemma.sql schema/gemma_tests.sql schema/geoserver_views.sql
diffstat 9 files changed, 221 insertions(+), 199 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/bottlenecks.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/controllers/bottlenecks.go	Sat Jun 15 14:36:50 2019 +0200
@@ -33,7 +33,9 @@
 
 const (
 	selectLimitingSQL = `
-SELECT limiting from waterway.bottlenecks WHERE NOT erased AND objnam = $1`
+SELECT limiting from waterway.bottlenecks
+  WHERE bn.validity @> current_timestamp AND objnam = $1
+`
 
 	selectAvailableDepthSQL = `
 WITH data AS (
@@ -48,7 +50,7 @@
   JOIN waterway.bottlenecks bn
     ON fa.bottleneck_id = bn.id
   WHERE
-    NOT bn.erased AND
+    bn.validity @> current_timestamp AND
     bn.objnam = $1 AND
     efa.level_of_service = $2 AND
     efa.measure_type = 'Measured' AND
@@ -82,7 +84,9 @@
   JOIN waterway.bottlenecks bns
     ON grwl.location = bns.gauge_location
       AND grwl.validity = bns.gauge_validity
-WHERE NOT bns.erased AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%'
+WHERE bns.validity @> current_timestamp
+  AND bns.objnam = $1
+  AND grwl.depth_reference like 'LDC%'
 `
 )
 
--- a/pkg/controllers/search.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/controllers/search.go	Sat Jun 15 14:36:50 2019 +0200
@@ -42,7 +42,7 @@
     ST_AsGeoJSON(ST_Centroid(area))::json AS geom,
     'bottleneck' AS type
   FROM waterway.bottlenecks
-  WHERE NOT erased
+  WHERE validity @> current_timestamp
 ORDER BY objnam) r
 `
 )
--- a/pkg/controllers/stretches.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/controllers/stretches.go	Sat Jun 15 14:36:50 2019 +0200
@@ -35,14 +35,18 @@
   distinct(b.objnam),
   b.limiting
 FROM waterway.sections s, waterway.bottlenecks b
-WHERE NOT b.erased AND ST_Intersects(b.area, s.area) AND s.name = $1`
+WHERE b.validity @> current_timestamp
+  AND ST_Intersects(b.area, s.area)
+  AND s.name = $1`
 
 	selectStretchBottlenecks = `
 SELECT
   distinct(b.objnam),
   b.limiting
 FROM waterway.stretches s, waterway.bottlenecks b
-WHERE NOT b.erased AND ST_Intersects(b.area, s.area) AND s.name = $1`
+WHERE b.validity @> current_timestamp
+  AND ST_Intersects(b.area, s.area)
+  AND s.name = $1`
 )
 
 type (
--- a/pkg/imports/bn.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/imports/bn.go	Sat Jun 15 14:36:50 2019 +0200
@@ -43,21 +43,6 @@
 const BNJobKind JobKind = "bn"
 
 const (
-	hasBottleneckSQL = `
-WITH upd AS (
-  UPDATE waterway.bottlenecks SET
-    erased = true
-  WHERE bottleneck_id = $1
-    AND NOT erased
-    -- Don't touch old entry if new validity contains old: will be updated
-    AND NOT validity <@ $2
-  RETURNING 1
-)
--- Decide whether a new version will be INSERTed
-SELECT EXISTS(SELECT 1 FROM upd)
-  OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1)
-`
-
 	insertBottleneckSQL = `
 WITH
 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
@@ -80,15 +65,11 @@
   limiting,
   date_info,
   source_organization
-) VALUES (
+) SELECT
   $1,
-  $2,
-  isrs_fromText($3),
-  COALESCE(
-    (SELECT validity FROM waterway.gauges
-       WHERE location = isrs_fromText($3)
-         AND validity @> lower(CAST($2 AS tstzrange))),
-    tstzrange(NULL, NULL)),
+  validity * $2, -- intersections with gauge validity ranges
+  location,
+  validity,
   $4,
   $5,
   (SELECT r FROM r),
@@ -104,67 +85,57 @@
   $12,
   $13,
   $14
-)
-RETURNING id`
+  FROM waterway.gauges
+  WHERE location = isrs_fromText($3) AND validity && $2
+ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
+    gauge_location = EXCLUDED.gauge_location,
+    gauge_validity = EXCLUDED.gauge_validity,
+    objnam = EXCLUDED.objnam,
+    nobjnm = EXCLUDED.nobjnm,
+    stretch = EXCLUDED.stretch,
+    area = EXCLUDED.area,
+    rb = EXCLUDED.rb,
+    lb = EXCLUDED.lb,
+    responsible_country = EXCLUDED.responsible_country,
+    revisiting_time = EXCLUDED.revisiting_time,
+    limiting = EXCLUDED.limiting,
+    date_info = EXCLUDED.date_info,
+    source_organization = EXCLUDED.source_organization
+RETURNING id
+`
+
+	// Alignment with gauge validity might have generated new entries
+	// for the same time range. Thus, remove the old ones
+	deleteObsoleteBNSQL = `
+DELETE FROM waterway.bottlenecks
+WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
+`
 
 	fixBNValiditySQL = `
 UPDATE waterway.bottlenecks SET
    -- Set enddate of old entry to new startdate in case of overlap:
   validity = validity - $2
 WHERE bottleneck_id = $1
-  AND validity && $2
-  AND erased
-`
-
-	updateBottleneckSQL = `
-WITH
-bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))),
-r AS (SELECT isrsrange(
-    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
-    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
-UPDATE waterway.bottlenecks b SET
-  gauge_location = isrs_fromtext($2),
-  gauge_validity = COALESCE(
-    (SELECT validity FROM waterway.gauges g
-       WHERE g.location = isrs_fromText($2)
-         AND g.validity @> lower(b.validity)),
-    tstzrange(NULL, NULL)),
-  objnam = $3,
-  nobjnm = $4,
-  stretch = (SELECT r FROM r),
-  area = ISRSrange_area(
-    ISRSrange_axis((SELECT r FROM r), $14),
-    (SELECT ST_Collect(CAST(area AS geometry))
-        FROM waterway.waterway_area)),
-  rb = $7,
-  lb = $8,
-  responsible_country = $9,
-  revisiting_time = $10,
-  limiting = $11,
-  date_info = $12,
-  source_organization = $13,
-  validity = $15
-WHERE bottleneck_id = $1
-  AND NOT erased
-  AND $12 > date_info
-RETURNING id
+  AND validity && $2 AND NOT validity <@ $2
 `
 
 	deleteBottleneckMaterialSQL = `
-DELETE FROM waterway.bottlenecks_riverbed_materials
-WHERE bottleneck_id = $1
-  AND riverbed <> ALL($2)
-RETURNING riverbed
+WITH del AS (
+  DELETE FROM waterway.bottlenecks_riverbed_materials
+  WHERE bottleneck_id = ANY($1)
+    AND riverbed <> ALL($2)
+  RETURNING riverbed)
+SELECT DISTINCT riverbed FROM del
 `
 
 	insertBottleneckMaterialSQL = `
 INSERT INTO waterway.bottlenecks_riverbed_materials (
    bottleneck_id,
    riverbed
-) VALUES (
-   $1,
-   $2
-) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
+) SELECT *
+FROM unnest(CAST($1 AS int[])) AS bns,
+  unnest(CAST($2 AS varchar[])) AS materials
+ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
 `
 )
 
@@ -265,17 +236,16 @@
 
 	feedback.Info("Found %d bottlenecks for import", len(bns))
 
-	var hasStmt, insertStmt, fixValidityStmt, updateStmt,
+	var insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 		deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt
 
 	for _, x := range []struct {
 		sql  string
 		stmt **sql.Stmt
 	}{
-		{hasBottleneckSQL, &hasStmt},
 		{insertBottleneckSQL, &insertStmt},
+		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
 		{fixBNValiditySQL, &fixValidityStmt},
-		{updateBottleneckSQL, &updateStmt},
 		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
 		{insertBottleneckMaterialSQL, &insertMaterialStmt},
 		{trackImportSQL, &trackStmt},
@@ -294,7 +264,7 @@
 	for _, bn := range bns {
 		if err := storeBottleneck(
 			ctx, importID, conn, feedback, bn, &nids, tolerance,
-			hasStmt, insertStmt, fixValidityStmt, updateStmt,
+			insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 			deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil {
 			return nil, err
 		}
@@ -321,7 +291,7 @@
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
 	tolerance float64,
-	hasStmt, insertStmt, fixValidityStmt, updateStmt,
+	insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 	deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt,
 ) error {
 	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
@@ -408,99 +378,57 @@
 	}
 	defer tx.Rollback()
 
-	var isNew bool
-	var nid int64
-	err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx,
+	var bnIds []int64
+	bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
 		bn.Bottleneck_id,
-		validity,
-	).Scan(&isNew)
-	switch {
-	case err != nil:
+		&validity,
+		bn.Fk_g_fid,
+		bn.OBJNAM,
+		bn.NOBJNM,
+		bn.From_ISRS, bn.To_ISRS,
+		rb,
+		lb,
+		country,
+		revisitingTime,
+		limiting,
+		bn.Date_Info,
+		bn.Source,
+		tolerance,
+	)
+	if err != nil {
+		feedback.Warn(handleError(err).Error())
+		return nil
+	}
+	defer bns.Close()
+	for bns.Next() {
+		var nid int64
+		if err := bns.Scan(&nid); err != nil {
+			return err
+		}
+		bnIds = append(bnIds, nid)
+	}
+	if err := bns.Err(); err != nil {
+		return err
+	}
+	if len(bnIds) == 0 {
+		feedback.Warn(
+			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
+		return nil
+	}
+
+	// Remove obsolete bottleneck version entries
+	var pgBnIds pgtype.Int8Array
+	pgBnIds.Set(bnIds)
+	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
+		bn.Bottleneck_id,
+		&validity,
+		&pgBnIds,
+	); err != nil {
 		feedback.Warn(handleError(err).Error())
 		if err2 := tx.Rollback(); err2 != nil {
 			return err2
 		}
 		return nil
-	case isNew:
-		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(
-			ctx,
-			bn.Bottleneck_id,
-			&validity,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-		).Scan(&nid)
-		if err != nil {
-			feedback.Warn(handleError(err).Error())
-			return nil
-		}
-		feedback.Info("insert new version")
-	case !isNew:
-		// try to update
-		err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
-			bn.Bottleneck_id,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-			&validity,
-		).Scan(&nid)
-		switch {
-		case err == sql.ErrNoRows:
-			feedback.Info("unchanged")
-			if err := tx.Rollback(); err != nil {
-				return err
-			}
-			return nil
-		case err != nil:
-			feedback.Warn(handleError(err).Error())
-			if err := tx.Rollback(); err != nil {
-				return err
-			}
-			return nil
-		default:
-			feedback.Info("update")
-
-			// Remove obsolete riverbed materials
-			var pgMaterials pgtype.VarcharArray
-			pgMaterials.Set(materials)
-			mtls, err := tx.StmtContext(ctx,
-				deleteMaterialStmt).QueryContext(ctx,
-				nid,
-				&pgMaterials,
-			)
-			if err != nil {
-				return err
-			}
-			defer mtls.Close()
-			for mtls.Next() {
-				var delMat string
-				if err := mtls.Scan(&delMat); err != nil {
-					return err
-				}
-				feedback.Warn("Removed riverbed material %s", delMat)
-			}
-			if err := mtls.Err(); err != nil {
-				return err
-			}
-		}
 	}
 
 	// Set end of validity of old version to start of new version
@@ -516,22 +444,47 @@
 		return nil
 	}
 
-	// Insert riverbed materials
 	if materials != nil {
-		for _, mat := range materials {
-			if _, err := tx.StmtContext(ctx,
-				insertMaterialStmt).ExecContext(
-				ctx, nid, mat); err != nil {
-				feedback.Warn("Failed to insert riverbed material '%s'", mat)
-				feedback.Warn(handleError(err).Error())
+		// Remove obsolete riverbed materials
+		var pgMaterials pgtype.VarcharArray
+		pgMaterials.Set(materials)
+		mtls, err := tx.StmtContext(ctx,
+			deleteMaterialStmt).QueryContext(ctx,
+			&pgBnIds,
+			&pgMaterials,
+		)
+		if err != nil {
+			return err
+		}
+		defer mtls.Close()
+		for mtls.Next() {
+			var delMat string
+			if err := mtls.Scan(&delMat); err != nil {
+				return err
 			}
+			feedback.Warn("Removed riverbed material %s", delMat)
+		}
+		if err := mtls.Err(); err != nil {
+			return err
+		}
+
+		// Insert riverbed materials
+		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
+			&pgBnIds,
+			&pgMaterials,
+		); err != nil {
+			feedback.Warn("Failed to insert riverbed materials")
+			feedback.Warn(handleError(err).Error())
+			return nil
 		}
 	}
 
-	if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-		ctx, importID, "waterway.bottlenecks", nid,
-	); err != nil {
-		return err
+	for _, nid := range bnIds {
+		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
+			ctx, importID, "waterway.bottlenecks", nid,
+		); err != nil {
+			return err
+		}
 	}
 	if err = tx.Commit(); err != nil {
 		return err
--- a/pkg/imports/fa.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/imports/fa.go	Sat Jun 15 14:36:50 2019 +0200
@@ -79,8 +79,10 @@
   source_organization
 ) VALUES (
   $1,
-  (SELECT id FROM waterway.bottlenecks
-     WHERE NOT erased AND bottleneck_id = $2),
+  -- Always associate fairway availability data to newest bottleneck
+  -- version to prevent problems in analysis over longer time periods
+  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2
+     ORDER BY validity DESC FETCH FIRST ROW ONLY),
   $3,
   $4,
   $5,
--- a/schema/auth.sql	Sat Jun 15 09:24:28 2019 +0200
+++ b/schema/auth.sql	Sat Jun 15 14:36:50 2019 +0200
@@ -38,9 +38,6 @@
 --
 GRANT INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA waterway
     TO waterway_admin;
--- TODO: will there ever be UPDATEs and DELETEs or can we drop that for
--- imported data due to historicisation? Special tables like
--- import_configuration will further need UPDATE and DELETE privileges.
 
 GRANT INSERT, UPDATE, DELETE ON
     users.templates TO waterway_admin;
--- a/schema/gemma.sql	Sat Jun 15 09:24:28 2019 +0200
+++ b/schema/gemma.sql	Sat Jun 15 14:36:50 2019 +0200
@@ -76,6 +76,9 @@
 -- associating time-based referencing objects to matching version
 CREATE OR REPLACE FUNCTION move_gauge_referencing() RETURNS trigger AS
 $$
+DECLARE
+    new_bn int;
+    new_bns int[];
 BEGIN
     -- Avoid unnecessary execution ON UPDATE if validity did not change
     IF OLD IS NULL OR NEW.validity <> OLD.validity THEN
@@ -84,10 +87,63 @@
         WHERE location = NEW.location
             AND measure_date <@ NEW.validity;
 
-        UPDATE waterway.bottlenecks
-        SET gauge_validity = NEW.validity
+        -- build bottleneck validities from intersections with gauge validities
+        FOR new_bn IN
+            INSERT INTO waterway.bottlenecks (
+                    bottleneck_id,
+                    validity,
+                    gauge_location,
+                    gauge_validity,
+                    objnam,
+                    nobjnm,
+                    stretch,
+                    area,
+                    rb,
+                    lb,
+                    responsible_country,
+                    revisiting_time,
+                    limiting,
+                    date_info,
+                    source_organization,
+                    staging_done
+                ) SELECT
+                    b.bottleneck_id,
+                    -- Anticipate non-intersecting gauge validities:
+                    b.validity * CASE WHEN g.validity = NEW.validity
+                        THEN NEW.validity ELSE g.validity - NEW.validity END,
+                    b.gauge_location,
+                    g.validity,
+                    b.objnam,
+                    b.nobjnm,
+                    b.stretch,
+                    b.area,
+                    b.rb,
+                    b.lb,
+                    b.responsible_country,
+                    b.revisiting_time,
+                    b.limiting,
+                    b.date_info,
+                    b.source_organization,
+                    b.staging_done
+                FROM waterway.bottlenecks b JOIN waterway.gauges g
+                    ON b.gauge_location = g.location
+                WHERE b.gauge_location = NEW.location
+                    AND b.validity && NEW.validity
+                    -- Avoid duplicate intersection results:
+                    AND NOT (b.validity <@ NEW.validity
+                        AND g.validity <> NEW.validity)
+            ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
+                -- Associate to new matching gauge version
+                gauge_validity = EXCLUDED.gauge_validity
+            RETURNING id
+        LOOP
+            new_bns = new_bns || new_bn;
+        END LOOP;
+        -- Delete bottleneck versions superseded by new intersections:
+        DELETE FROM waterway.bottlenecks
         WHERE gauge_location = NEW.location
-            AND lower(validity) <@ NEW.validity;
+            AND validity && NEW.validity
+            AND id <> ALL(new_bns);
     END IF;
     RETURN NULL; -- ignored
 END;
@@ -103,6 +159,19 @@
         SET bottleneck_validity = NEW.validity
         WHERE bottleneck_id = NEW.bottleneck_id
           AND CAST(date_info AS timestamptz) <@ NEW.validity;
+
+        -- Always associate fairway availability data to newest bottleneck
+        -- version to prevent problems in analysis over longer time periods
+        WITH
+        bn AS (SELECT id, validity FROM waterway.bottlenecks
+            WHERE bottleneck_id = NEW.bottleneck_id),
+        latest AS (SELECT id FROM bn
+            -- Candidates are past new validity or just inserted/updated
+            WHERE NOT validity &< NEW.validity OR id = NEW.id
+            ORDER BY upper(validity) DESC FETCH FIRST ROW ONLY)
+        UPDATE waterway.fairway_availability
+        SET bottleneck_id = (SELECT id FROM latest)
+        WHERE bottleneck_id IN(SELECT id FROM bn EXCEPT SELECT id FROM latest);
     END IF;
     RETURN NULL; -- ignored
 END;
@@ -534,7 +603,7 @@
             DEFERRABLE INITIALLY DEFERRED,
         gauge_location isrs NOT NULL,
         gauge_validity tstzrange NOT NULL,
-        CHECK(lower(validity) <@ gauge_validity),
+        CHECK(validity <@ gauge_validity),
         CONSTRAINT gauge_key
             FOREIGN KEY (gauge_location, gauge_validity) REFERENCES gauges
                 ON UPDATE CASCADE,
@@ -558,13 +627,8 @@
         -- XXX: Also an attribut of sounding result?
         date_info timestamp with time zone NOT NULL,
         source_organization varchar NOT NULL,
-        erased boolean NOT NULL DEFAULT false,
         staging_done boolean NOT NULL DEFAULT false
     )
-    -- Allow only one non-erased entry per bottleneck
-    CREATE UNIQUE INDEX bottlenecks_erased_unique_constraint
-        ON bottlenecks (bottleneck_id)
-        WHERE NOT erased
     -- Associate referencing objects to matching bottleneck version
     CREATE TRIGGER move_referencing AFTER INSERT OR UPDATE OF validity
         ON bottlenecks FOR EACH ROW
--- a/schema/gemma_tests.sql	Sat Jun 15 09:24:28 2019 +0200
+++ b/schema/gemma_tests.sql	Sat Jun 15 14:36:50 2019 +0200
@@ -63,9 +63,7 @@
     AND NOT erased;
 COMMIT;
 SELECT results_eq($$
-    SELECT DISTINCT gauge_validity FROM waterway.bottlenecks
+    SELECT count(*) FROM waterway.bottlenecks GROUP BY bottleneck_id;
     $$,
-    $$
-    SELECT v FROM new_v
-    $$,
-    'Bottlenecks have been associated to new matching gauge version');
+    CAST(ARRAY[2,2] AS bigint[]),
+    'Bottlenecks have been split to two new matching gauge versions');
--- a/schema/geoserver_views.sql	Sat Jun 15 09:24:28 2019 +0200
+++ b/schema/geoserver_views.sql	Sat Jun 15 14:36:50 2019 +0200
@@ -136,7 +136,7 @@
             ON b.id = fal.bottleneck_id
         LEFT JOIN sounding_result_latest srl
             ON b.bottleneck_id = srl.bottleneck_id
-    WHERE NOT b.erased;
+    WHERE b.validity @> current_timestamp;
 
 CREATE OR REPLACE VIEW waterway.stretches_geoserver AS
     SELECT
@@ -201,7 +201,7 @@
         SELECT bottleneck_id, max(date_info) AS current
             FROM waterway.sounding_results
             GROUP BY bottleneck_id) sr ON sr.bottleneck_id = bn.bottleneck_id
-    WHERE NOT bn.erased
+    WHERE bn.validity @> current_timestamp
     ORDER BY objnam;
 
 CREATE OR REPLACE VIEW waterway.sounding_differences AS