changeset 5144:f11b9b50fcc9

Keep historic data of fairway dimensions ... and accordingly configure the respective layer as WMS-T.
author Tom Gottfried <tom@intevation.de>
date Tue, 31 Mar 2020 18:59:28 +0200
parents 733f7136a30e
children 1cb5fca140e2 1c5c9fdaf730
files pkg/imports/fd.go schema/default_sysconfig.sql schema/gemma.sql schema/updates/1437/01.historicise_fairway_dimensions.sql schema/updates/1437/02.configure_wms-t.sql schema/version.sql
diffstat 6 files changed, 243 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fd.go	Tue Mar 31 15:30:23 2020 +0200
+++ b/pkg/imports/fd.go	Tue Mar 31 18:59:28 2020 +0200
@@ -136,21 +136,54 @@
         relation = 'waterway.fairway_dimensions'::regclass)`
 
 	deleteFairwayDimensionSQL = `
+-- Delete entries to be replaced by those in staging area
 DELETE FROM waterway.fairway_dimensions
-WHERE staging_done
-  AND level_of_service = (
-    SELECT DISTINCT level_of_service FROM waterway.fairway_dimensions
-      WHERE id IN (
-        SELECT key FROM import.track_imports
-          WHERE import_id = $1
-            AND relation = 'waterway.fairway_dimensions'::regclass))
+  WHERE id IN (
+    SELECT key FROM import.track_imports
+      WHERE import_id = $1
+        AND relation = 'waterway.fairway_dimensions'::regclass
+        AND deletion)
 `
-
+	// Temporary table to collect IDs of unchanged entries
+	tmpTableSQL = `
+CREATE TEMP TABLE unchanged (id int PRIMARY KEY) ON COMMIT DROP
+`
 	// The ST_MakeValid and ST_Buffer below are a workarround to
 	// avoid errors due to reprojection.
 	insertFairwayDimensionSQL = `
 WITH resp AS (
   SELECT users.current_user_area_utm() AS a
+),
+g AS (
+  SELECT ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(
+    CASE WHEN pg_has_role('sys_admin', 'MEMBER')
+        OR ST_Covers((SELECT a FROM resp),
+          ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))
+      THEN new_fd
+      ELSE ST_Intersection(
+          (SELECT ST_Buffer(a, -0.0001) FROM resp),
+          ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
+      END,
+    4326)), 3)) AS new_fd
+  FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd)
+  WHERE pg_has_role('sys_admin', 'MEMBER')
+    OR ST_Intersects((SELECT a FROM resp),
+      ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
+),
+not_new AS (
+  -- Collect IDs of unchanged entries in temp table
+  INSERT INTO unchanged
+    SELECT id
+      FROM g, waterway.fairway_dimensions
+      WHERE staging_done
+        AND validity @> current_timestamp
+        AND (area, level_of_service,
+            min_width, max_width, min_depth, source_organization
+          ) IS NOT DISTINCT FROM (
+            new_fd, $3, $4, $5, $6, $8)
+    -- Return something if a duplicate in the data source is encountered
+    ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
+    RETURNING 1
 )
 INSERT INTO waterway.fairway_dimensions (
   area,
@@ -161,25 +194,58 @@
   date_info,
   source_organization)
 SELECT
-    ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(
-      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
-          OR ST_Covers((SELECT a FROM resp),
-            ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))
-        THEN new_fd
-        ELSE ST_Intersection(
-            (SELECT ST_Buffer(a, -0.0001) FROM resp),
-            ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
-        END,
-      4326)), 3)),
-    $3, $4, $5, $6, $7, $8
-  FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd)
-  WHERE pg_has_role('sys_admin', 'MEMBER')
-    OR ST_Intersects((SELECT a FROM resp),
-      ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
+    new_fd, $3, $4, $5, $6, $7, $8
+  FROM g
+  WHERE NOT EXISTS(SELECT 1 FROM not_new)
 RETURNING id,
   ST_X(ST_Centroid(area::geometry)),
   ST_Y(ST_Centroid(area::geometry))
  `
+	// Fetch IDs of entries removed from data source
+	selectOldSQL = `
+WITH resp AS (
+  SELECT users.current_user_area_utm() AS a
+)
+SELECT id FROM waterway.fairway_dimensions
+  WHERE staging_done
+    AND validity @> current_timestamp
+    AND level_of_service = $1
+    AND (pg_has_role('sys_admin', 'MEMBER')
+      OR ST_Covers((SELECT a FROM resp),
+        ST_Transform(CAST(area AS geometry), (SELECT ST_SRID(a) FROM resp))))
+    AND id NOT IN (SELECT id FROM unchanged)
+`
+	invalidateFairwayDimensionSQL = `
+WITH track AS (
+  -- Mark entry for deletion that has been removed from the data source
+  INSERT INTO import.track_imports (import_id, deletion, relation, key)
+    VALUES($1, true, 'waterway.fairway_dimensions', $2)
+)
+-- Insert historic version with respective validity
+INSERT INTO waterway.fairway_dimensions (
+  area,
+  validity,
+  level_of_service,
+  min_width,
+  max_width,
+  min_depth,
+  date_info,
+  source_organization)
+SELECT
+    area,
+    tstzrange(lower(validity), current_timestamp),
+    level_of_service,
+    min_width,
+    max_width,
+    min_depth,
+    date_info,
+    source_organization
+  FROM waterway.fairway_dimensions
+  WHERE id = $2
+RETURNING id,
+  ST_X(ST_Centroid(area::geometry)),
+  ST_Y(ST_Centroid(area::geometry))
+`
 )
 
 // Do executes the actual fairway dimension import.
@@ -226,12 +292,25 @@
 	}
 	defer tx.Rollback()
 
+	if _, err := tx.ExecContext(ctx, tmpTableSQL); err != nil {
+		return nil, err
+	}
+
 	insertStmt, err := tx.PrepareContext(ctx, insertFairwayDimensionSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer insertStmt.Close()
 
+	invalidateStmt, err := tx.PrepareContext(
+		ctx, invalidateFairwayDimensionSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer invalidateStmt.Close()
+
+	savepoint := Savepoint(ctx, tx, "feature")
+
 	var (
 		unsupported       = stringCounter{}
 		missingProperties int
@@ -265,8 +344,6 @@
 		feedback.Info(
 			"Found %d features in data source", len(rfc.Features))
 
-		savepoint := Savepoint(ctx, tx, "feature")
-
 	features:
 		for _, feature := range rfc.Features {
 			if feature.Geometry.Coordinates == nil {
@@ -347,11 +424,73 @@
 	}
 
 	if outside > 0 {
-		feedback.Info("Features outside responsibility area: %d", outside)
+		feedback.Info(
+			"Features outside responsibility area or unchanged: %d", outside)
 	}
 
 	if features == 0 {
-		return nil, UnchangedError("No features found")
+		feedback.Info("No new features found")
+	}
+
+	// Invalidate features that have been removed from data source
+	res, err := tx.QueryContext(ctx, selectOldSQL, fd.LOS)
+	if err != nil {
+		return nil, err
+	}
+	defer res.Close()
+	var oldIDs []int64
+	for res.Next() {
+		var oldID int64
+		if err := res.Scan(&oldID); err != nil {
+			return nil, err
+		}
+		oldIDs = append(oldIDs, oldID)
+	}
+	if err := res.Err(); err != nil {
+		return nil, err
+	}
+
+	if features == 0 && len(oldIDs) == 0 {
+		return nil, UnchangedError("Nothing changed")
+	}
+
+	if len(oldIDs) > 0 {
+		feedback.Info(
+			"Number of features removed from data source: %d", len(oldIDs))
+
+		var old int
+		for _, oldID := range oldIDs {
+			var fdid int64
+			var lat, lon float64
+			if err := savepoint(func() error {
+				return invalidateStmt.QueryRowContext(
+					ctx,
+					importID,
+					oldID,
+				).Scan(&fdid, &lat, &lon)
+			}); err != nil {
+				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+				continue
+			}
+			fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon})
+
+			if err := track(
+				ctx, tx, importID, "waterway.fairway_dimensions", fdid,
+			); err != nil {
+				return nil, err
+			}
+
+			old++
+		}
+
+		// Do not fail if features > 0 because otherwise new features are lost
+		if features == 0 && old == 0 {
+			return nil, fmt.Errorf("Invalidating features failed")
+		}
+
+		if old > 0 {
+			feedback.Info("Number of features invalidated: %d", old)
+		}
 	}
 
 	if err = tx.Commit(); err == nil {
--- a/schema/default_sysconfig.sql	Tue Mar 31 15:30:23 2020 +0200
+++ b/schema/default_sysconfig.sql	Tue Mar 31 18:59:28 2020 +0200
@@ -112,8 +112,7 @@
 -- Directly accessed tables
 INSERT INTO sys_admin.published_services (schema, name) VALUES
     ('waterway', 'waterway_area'),
-    ('waterway', 'waterway_profiles'),
-    ('waterway', 'fairway_dimensions');
+    ('waterway', 'waterway_profiles');
 
 -- GeoServer SQL views without time support
 INSERT INTO sys_admin.published_services (
@@ -306,6 +305,21 @@
                     ORDER BY bottleneck_id DESC) AS srl
                 ON b.bottleneck_id = srl.bottleneck_id
     $$),
+    ('waterway', 'fairway_dimensions', 4326, 'id',
+        'valid_from', 'valid_to', $$
+            SELECT id,
+                lower(validity) AS valid_from,
+                COALESCE(upper(validity), current_timestamp) AS valid_to,
+                area,
+                level_of_service,
+                min_width,
+                max_width,
+                min_depth,
+                date_info,
+                source_organization,
+                staging_done
+            FROM waterway.fairway_dimensions
+        $$),
     ('waterway', 'waterway_axis', 4326, 'id',
         'valid_from', 'valid_to', $$
         SELECT id,
--- a/schema/gemma.sql	Tue Mar 31 15:30:23 2020 +0200
+++ b/schema/gemma.sql	Tue Mar 31 18:59:28 2020 +0200
@@ -726,14 +726,20 @@
         min_depth smallint NOT NULL,
         date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
         source_organization varchar NOT NULL,
-        staging_done boolean NOT NULL DEFAULT false
+        staging_done boolean NOT NULL DEFAULT false,
+        validity tstzrange NOT NULL DEFAULT tstzrange(current_timestamp, NULL)
+            CHECK (NOT isempty(validity))
     )
     CREATE TRIGGER fairway_dimensions_date_info
         BEFORE UPDATE ON fairway_dimensions
         FOR EACH ROW EXECUTE PROCEDURE update_date_info()
     CREATE CONSTRAINT TRIGGER fairway_dimensions_area_unique
-        AFTER INSERT OR UPDATE OF area, staging_done ON fairway_dimensions
-        FOR EACH ROW EXECUTE FUNCTION prevent_st_equals('area', 'staging_done')
+        AFTER INSERT OR UPDATE OF area, validity, staging_done
+        ON fairway_dimensions
+        FOR EACH ROW EXECUTE FUNCTION prevent_st_equals(
+            'area', 'validity WITH &&', 'staging_done')
+    CREATE INDEX fairway_dimensions_validity
+        ON fairway_dimensions USING GiST (validity)
 
     --
     -- Bottlenecks
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1437/01.historicise_fairway_dimensions.sql	Tue Mar 31 18:59:28 2020 +0200
@@ -0,0 +1,30 @@
+ALTER TABLE waterway.fairway_dimensions
+    ADD validity tstzrange NOT NULL DEFAULT tstzrange(current_timestamp, NULL)
+        CHECK (NOT isempty(validity));
+
+-- Assume existing entries have been valid since last accepted import
+WITH imps AS (
+    SELECT changed, CAST(summary AS jsonb)->'fd-area' AS fd_area
+        FROM import.imports
+        WHERE kind = 'fd' AND state = 'accepted'
+)
+UPDATE waterway.fairway_dimensions fd SET validity = tstzrange(
+    COALESCE(
+        (SELECT max(changed)
+            FROM imps
+            WHERE fd.id IN(
+                SELECT id
+                FROM jsonb_to_recordset(fd_area)
+                    AS fd_area (id bigint, lat numeric, lon numeric))),
+        current_timestamp),
+    NULL);
+
+DROP TRIGGER fairway_dimensions_area_unique ON waterway.fairway_dimensions;
+CREATE CONSTRAINT TRIGGER fairway_dimensions_area_unique
+    AFTER INSERT OR UPDATE OF area, validity, staging_done
+    ON waterway.fairway_dimensions
+    FOR EACH ROW EXECUTE FUNCTION prevent_st_equals(
+        'area', 'validity WITH &&', 'staging_done');
+
+CREATE INDEX fairway_dimensions_validity
+    ON waterway.fairway_dimensions USING GiST (validity);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1437/02.configure_wms-t.sql	Tue Mar 31 18:59:28 2020 +0200
@@ -0,0 +1,21 @@
+UPDATE sys_admin.published_services
+    SET
+        srid = 4326,
+        key_column = 'id',
+        wmst_attribute = 'valid_from',
+        wmst_end_attribute = 'valid_to',
+        view_def = $$
+            SELECT id,
+                lower(validity) AS valid_from,
+                COALESCE(upper(validity), current_timestamp) AS valid_to,
+                area,
+                level_of_service,
+                min_width,
+                max_width,
+                min_depth,
+                date_info,
+                source_organization,
+                staging_done
+            FROM waterway.fairway_dimensions
+            $$
+    WHERE schema = 'waterway' AND name = 'fairway_dimensions';
--- a/schema/version.sql	Tue Mar 31 15:30:23 2020 +0200
+++ b/schema/version.sql	Tue Mar 31 18:59:28 2020 +0200
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1436);
+INSERT INTO gemma_schema_version(version) VALUES (1437);