# HG changeset patch # User Tom Gottfried # Date 1560602210 -7200 # Node ID db87f34805fb17d806d4a372f68ad1402fa5e1b4 # Parent 29ef6d41e4af2904e5918b809ce522b8d40eb5da Align bottleneck validity at gauges Ensuring the validity of a bottleneck version is always contained by the validity of the referenced gauge version allows to reliably determine matching reference values of the gauge at a point in time. Since this implies that a bottleneck version might be cut into more than one time ranges, the concept of having only one non-erased version is no longer applicable and replaced by using the 'current' version of a bottleneck. Fairway availability data are always kept with the 'current' bottleneck version to have them at hand alltogether for analyses over longer time ranges. diff -r 29ef6d41e4af -r db87f34805fb pkg/controllers/bottlenecks.go --- a/pkg/controllers/bottlenecks.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/controllers/bottlenecks.go Sat Jun 15 14:36:50 2019 +0200 @@ -33,7 +33,9 @@ const ( selectLimitingSQL = ` -SELECT limiting from waterway.bottlenecks WHERE NOT erased AND objnam = $1` +SELECT limiting from waterway.bottlenecks + WHERE bn.validity @> current_timestamp AND objnam = $1 +` selectAvailableDepthSQL = ` WITH data AS ( @@ -48,7 +50,7 @@ JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.id WHERE - NOT bn.erased AND + bn.validity @> current_timestamp AND bn.objnam = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND @@ -82,7 +84,9 @@ JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity -WHERE NOT bns.erased AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%' +WHERE bns.validity @> current_timestamp + AND bns.objnam = $1 + AND grwl.depth_reference like 'LDC%' ` ) diff -r 29ef6d41e4af -r db87f34805fb pkg/controllers/search.go --- a/pkg/controllers/search.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/controllers/search.go Sat Jun 15 14:36:50 2019 +0200 @@ -42,7 +42,7 @@ ST_AsGeoJSON(ST_Centroid(area))::json AS geom, 'bottleneck' AS type FROM waterway.bottlenecks - WHERE NOT erased + WHERE validity @> current_timestamp ORDER BY objnam) r ` ) diff -r 29ef6d41e4af -r db87f34805fb pkg/controllers/stretches.go --- a/pkg/controllers/stretches.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/controllers/stretches.go Sat Jun 15 14:36:50 2019 +0200 @@ -35,14 +35,18 @@ distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b -WHERE NOT b.erased AND ST_Intersects(b.area, s.area) AND s.name = $1` +WHERE b.validity @> current_timestamp + AND ST_Intersects(b.area, s.area) + AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.stretches s, waterway.bottlenecks b -WHERE NOT b.erased AND ST_Intersects(b.area, s.area) AND s.name = $1` +WHERE b.validity @> current_timestamp + AND ST_Intersects(b.area, s.area) + AND s.name = $1` ) type ( diff -r 29ef6d41e4af -r db87f34805fb pkg/imports/bn.go --- a/pkg/imports/bn.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/imports/bn.go Sat Jun 15 14:36:50 2019 +0200 @@ -43,21 +43,6 @@ const BNJobKind JobKind = "bn" const ( - hasBottleneckSQL = ` -WITH upd AS ( - UPDATE waterway.bottlenecks SET - erased = true - WHERE bottleneck_id = $1 - AND NOT erased - -- Don't touch old entry if new validity contains old: will be updated - AND NOT validity <@ $2 - RETURNING 1 -) --- Decide whether a new version will be INSERTed -SELECT EXISTS(SELECT 1 FROM upd) - OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1) -` - insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), @@ -80,15 +65,11 @@ limiting, date_info, source_organization -) VALUES ( +) SELECT $1, - $2, - isrs_fromText($3), - COALESCE( - (SELECT validity FROM waterway.gauges - WHERE location = isrs_fromText($3) - AND validity @> lower(CAST($2 AS tstzrange))), - tstzrange(NULL, NULL)), + validity * $2, -- intersections with gauge validity ranges + location, + validity, $4, $5, (SELECT r FROM r), @@ -104,67 +85,57 @@ $12, $13, $14 -) -RETURNING id` + FROM waterway.gauges + WHERE location = isrs_fromText($3) AND validity && $2 +ON CONFLICT (bottleneck_id, validity) DO UPDATE SET + gauge_location = EXCLUDED.gauge_location, + gauge_validity = EXCLUDED.gauge_validity, + objnam = EXCLUDED.objnam, + nobjnm = EXCLUDED.nobjnm, + stretch = EXCLUDED.stretch, + area = EXCLUDED.area, + rb = EXCLUDED.rb, + lb = EXCLUDED.lb, + responsible_country = EXCLUDED.responsible_country, + revisiting_time = EXCLUDED.revisiting_time, + limiting = EXCLUDED.limiting, + date_info = EXCLUDED.date_info, + source_organization = EXCLUDED.source_organization +RETURNING id +` + + // Alignment with gauge validity might have generated new entries + // for the same time range. Thus, remove the old ones + deleteObsoleteBNSQL = ` +DELETE FROM waterway.bottlenecks +WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) +` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 - AND validity && $2 - AND erased -` - - updateBottleneckSQL = ` -WITH -bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))), -r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) -UPDATE waterway.bottlenecks b SET - gauge_location = isrs_fromtext($2), - gauge_validity = COALESCE( - (SELECT validity FROM waterway.gauges g - WHERE g.location = isrs_fromText($2) - AND g.validity @> lower(b.validity)), - tstzrange(NULL, NULL)), - objnam = $3, - nobjnm = $4, - stretch = (SELECT r FROM r), - area = ISRSrange_area( - ISRSrange_axis((SELECT r FROM r), $14), - (SELECT ST_Collect(CAST(area AS geometry)) - FROM waterway.waterway_area)), - rb = $7, - lb = $8, - responsible_country = $9, - revisiting_time = $10, - limiting = $11, - date_info = $12, - source_organization = $13, - validity = $15 -WHERE bottleneck_id = $1 - AND NOT erased - AND $12 > date_info -RETURNING id + AND validity && $2 AND NOT validity <@ $2 ` deleteBottleneckMaterialSQL = ` -DELETE FROM waterway.bottlenecks_riverbed_materials -WHERE bottleneck_id = $1 - AND riverbed <> ALL($2) -RETURNING riverbed +WITH del AS ( + DELETE FROM waterway.bottlenecks_riverbed_materials + WHERE bottleneck_id = ANY($1) + AND riverbed <> ALL($2) + RETURNING riverbed) +SELECT DISTINCT riverbed FROM del ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed -) VALUES ( - $1, - $2 -) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING +) SELECT * +FROM unnest(CAST($1 AS int[])) AS bns, + unnest(CAST($2 AS varchar[])) AS materials +ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) @@ -265,17 +236,16 @@ feedback.Info("Found %d bottlenecks for import", len(bns)) - var hasStmt, insertStmt, fixValidityStmt, updateStmt, + var insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ - {hasBottleneckSQL, &hasStmt}, {insertBottleneckSQL, &insertStmt}, + {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, {fixBNValiditySQL, &fixValidityStmt}, - {updateBottleneckSQL, &updateStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, @@ -294,7 +264,7 @@ for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, - hasStmt, insertStmt, fixValidityStmt, updateStmt, + insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } @@ -321,7 +291,7 @@ bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, - hasStmt, insertStmt, fixValidityStmt, updateStmt, + insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) @@ -408,99 +378,57 @@ } defer tx.Rollback() - var isNew bool - var nid int64 - err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx, + var bnIds []int64 + bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, bn.Bottleneck_id, - validity, - ).Scan(&isNew) - switch { - case err != nil: + &validity, + bn.Fk_g_fid, + bn.OBJNAM, + bn.NOBJNM, + bn.From_ISRS, bn.To_ISRS, + rb, + lb, + country, + revisitingTime, + limiting, + bn.Date_Info, + bn.Source, + tolerance, + ) + if err != nil { + feedback.Warn(handleError(err).Error()) + return nil + } + defer bns.Close() + for bns.Next() { + var nid int64 + if err := bns.Scan(&nid); err != nil { + return err + } + bnIds = append(bnIds, nid) + } + if err := bns.Err(); err != nil { + return err + } + if len(bnIds) == 0 { + feedback.Warn( + "No gauge matching '%s' or given time available", bn.Fk_g_fid) + return nil + } + + // Remove obsolete bottleneck version entries + var pgBnIds pgtype.Int8Array + pgBnIds.Set(bnIds) + if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, + bn.Bottleneck_id, + &validity, + &pgBnIds, + ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil - case isNew: - err = tx.StmtContext(ctx, insertStmt).QueryRowContext( - ctx, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ).Scan(&nid) - if err != nil { - feedback.Warn(handleError(err).Error()) - return nil - } - feedback.Info("insert new version") - case !isNew: - // try to update - err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, - bn.Bottleneck_id, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - &validity, - ).Scan(&nid) - switch { - case err == sql.ErrNoRows: - feedback.Info("unchanged") - if err := tx.Rollback(); err != nil { - return err - } - return nil - case err != nil: - feedback.Warn(handleError(err).Error()) - if err := tx.Rollback(); err != nil { - return err - } - return nil - default: - feedback.Info("update") - - // Remove obsolete riverbed materials - var pgMaterials pgtype.VarcharArray - pgMaterials.Set(materials) - mtls, err := tx.StmtContext(ctx, - deleteMaterialStmt).QueryContext(ctx, - nid, - &pgMaterials, - ) - if err != nil { - return err - } - defer mtls.Close() - for mtls.Next() { - var delMat string - if err := mtls.Scan(&delMat); err != nil { - return err - } - feedback.Warn("Removed riverbed material %s", delMat) - } - if err := mtls.Err(); err != nil { - return err - } - } } // Set end of validity of old version to start of new version @@ -516,22 +444,47 @@ return nil } - // Insert riverbed materials if materials != nil { - for _, mat := range materials { - if _, err := tx.StmtContext(ctx, - insertMaterialStmt).ExecContext( - ctx, nid, mat); err != nil { - feedback.Warn("Failed to insert riverbed material '%s'", mat) - feedback.Warn(handleError(err).Error()) + // Remove obsolete riverbed materials + var pgMaterials pgtype.VarcharArray + pgMaterials.Set(materials) + mtls, err := tx.StmtContext(ctx, + deleteMaterialStmt).QueryContext(ctx, + &pgBnIds, + &pgMaterials, + ) + if err != nil { + return err + } + defer mtls.Close() + for mtls.Next() { + var delMat string + if err := mtls.Scan(&delMat); err != nil { + return err } + feedback.Warn("Removed riverbed material %s", delMat) + } + if err := mtls.Err(); err != nil { + return err + } + + // Insert riverbed materials + if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, + &pgBnIds, + &pgMaterials, + ); err != nil { + feedback.Warn("Failed to insert riverbed materials") + feedback.Warn(handleError(err).Error()) + return nil } } - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, - ); err != nil { - return err + for _, nid := range bnIds { + if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( + ctx, importID, "waterway.bottlenecks", nid, + ); err != nil { + return err + } } if err = tx.Commit(); err != nil { return err diff -r 29ef6d41e4af -r db87f34805fb pkg/imports/fa.go --- a/pkg/imports/fa.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/imports/fa.go Sat Jun 15 14:36:50 2019 +0200 @@ -79,8 +79,10 @@ source_organization ) VALUES ( $1, - (SELECT id FROM waterway.bottlenecks - WHERE NOT erased AND bottleneck_id = $2), + -- Always associate fairway availability data to newest bottleneck + -- version to prevent problems in analysis over longer time periods + (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2 + ORDER BY validity DESC FETCH FIRST ROW ONLY), $3, $4, $5, diff -r 29ef6d41e4af -r db87f34805fb schema/auth.sql --- a/schema/auth.sql Sat Jun 15 09:24:28 2019 +0200 +++ b/schema/auth.sql Sat Jun 15 14:36:50 2019 +0200 @@ -38,9 +38,6 @@ -- GRANT INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA waterway TO waterway_admin; --- TODO: will there ever be UPDATEs and DELETEs or can we drop that for --- imported data due to historicisation? Special tables like --- import_configuration will further need UPDATE and DELETE privileges. GRANT INSERT, UPDATE, DELETE ON users.templates TO waterway_admin; diff -r 29ef6d41e4af -r db87f34805fb schema/gemma.sql --- a/schema/gemma.sql Sat Jun 15 09:24:28 2019 +0200 +++ b/schema/gemma.sql Sat Jun 15 14:36:50 2019 +0200 @@ -76,6 +76,9 @@ -- associating time-based referencing objects to matching version CREATE OR REPLACE FUNCTION move_gauge_referencing() RETURNS trigger AS $$ +DECLARE + new_bn int; + new_bns int[]; BEGIN -- Avoid unnecessary execution ON UPDATE if validity did not change IF OLD IS NULL OR NEW.validity <> OLD.validity THEN @@ -84,10 +87,63 @@ WHERE location = NEW.location AND measure_date <@ NEW.validity; - UPDATE waterway.bottlenecks - SET gauge_validity = NEW.validity + -- build bottleneck validities from intersections with gauge validities + FOR new_bn IN + INSERT INTO waterway.bottlenecks ( + bottleneck_id, + validity, + gauge_location, + gauge_validity, + objnam, + nobjnm, + stretch, + area, + rb, + lb, + responsible_country, + revisiting_time, + limiting, + date_info, + source_organization, + staging_done + ) SELECT + b.bottleneck_id, + -- Anticipate non-intersecting gauge validities: + b.validity * CASE WHEN g.validity = NEW.validity + THEN NEW.validity ELSE g.validity - NEW.validity END, + b.gauge_location, + g.validity, + b.objnam, + b.nobjnm, + b.stretch, + b.area, + b.rb, + b.lb, + b.responsible_country, + b.revisiting_time, + b.limiting, + b.date_info, + b.source_organization, + b.staging_done + FROM waterway.bottlenecks b JOIN waterway.gauges g + ON b.gauge_location = g.location + WHERE b.gauge_location = NEW.location + AND b.validity && NEW.validity + -- Avoid duplicate intersection results: + AND NOT (b.validity <@ NEW.validity + AND g.validity <> NEW.validity) + ON CONFLICT (bottleneck_id, validity) DO UPDATE SET + -- Associate to new matching gauge version + gauge_validity = EXCLUDED.gauge_validity + RETURNING id + LOOP + new_bns = new_bns || new_bn; + END LOOP; + -- Delete bottleneck versions superseded by new intersections: + DELETE FROM waterway.bottlenecks WHERE gauge_location = NEW.location - AND lower(validity) <@ NEW.validity; + AND validity && NEW.validity + AND id <> ALL(new_bns); END IF; RETURN NULL; -- ignored END; @@ -103,6 +159,19 @@ SET bottleneck_validity = NEW.validity WHERE bottleneck_id = NEW.bottleneck_id AND CAST(date_info AS timestamptz) <@ NEW.validity; + + -- Always associate fairway availability data to newest bottleneck + -- version to prevent problems in analysis over longer time periods + WITH + bn AS (SELECT id, validity FROM waterway.bottlenecks + WHERE bottleneck_id = NEW.bottleneck_id), + latest AS (SELECT id FROM bn + -- Candidates are past new validity or just inserted/updated + WHERE NOT validity &< NEW.validity OR id = NEW.id + ORDER BY upper(validity) DESC FETCH FIRST ROW ONLY) + UPDATE waterway.fairway_availability + SET bottleneck_id = (SELECT id FROM latest) + WHERE bottleneck_id IN(SELECT id FROM bn EXCEPT SELECT id FROM latest); END IF; RETURN NULL; -- ignored END; @@ -534,7 +603,7 @@ DEFERRABLE INITIALLY DEFERRED, gauge_location isrs NOT NULL, gauge_validity tstzrange NOT NULL, - CHECK(lower(validity) <@ gauge_validity), + CHECK(validity <@ gauge_validity), CONSTRAINT gauge_key FOREIGN KEY (gauge_location, gauge_validity) REFERENCES gauges ON UPDATE CASCADE, @@ -558,13 +627,8 @@ -- XXX: Also an attribut of sounding result? date_info timestamp with time zone NOT NULL, source_organization varchar NOT NULL, - erased boolean NOT NULL DEFAULT false, staging_done boolean NOT NULL DEFAULT false ) - -- Allow only one non-erased entry per bottleneck - CREATE UNIQUE INDEX bottlenecks_erased_unique_constraint - ON bottlenecks (bottleneck_id) - WHERE NOT erased -- Associate referencing objects to matching bottleneck version CREATE TRIGGER move_referencing AFTER INSERT OR UPDATE OF validity ON bottlenecks FOR EACH ROW diff -r 29ef6d41e4af -r db87f34805fb schema/gemma_tests.sql --- a/schema/gemma_tests.sql Sat Jun 15 09:24:28 2019 +0200 +++ b/schema/gemma_tests.sql Sat Jun 15 14:36:50 2019 +0200 @@ -63,9 +63,7 @@ AND NOT erased; COMMIT; SELECT results_eq($$ - SELECT DISTINCT gauge_validity FROM waterway.bottlenecks + SELECT count(*) FROM waterway.bottlenecks GROUP BY bottleneck_id; $$, - $$ - SELECT v FROM new_v - $$, - 'Bottlenecks have been associated to new matching gauge version'); + CAST(ARRAY[2,2] AS bigint[]), + 'Bottlenecks have been split to two new matching gauge versions'); diff -r 29ef6d41e4af -r db87f34805fb schema/geoserver_views.sql --- a/schema/geoserver_views.sql Sat Jun 15 09:24:28 2019 +0200 +++ b/schema/geoserver_views.sql Sat Jun 15 14:36:50 2019 +0200 @@ -136,7 +136,7 @@ ON b.id = fal.bottleneck_id LEFT JOIN sounding_result_latest srl ON b.bottleneck_id = srl.bottleneck_id - WHERE NOT b.erased; + WHERE b.validity @> current_timestamp; CREATE OR REPLACE VIEW waterway.stretches_geoserver AS SELECT @@ -201,7 +201,7 @@ SELECT bottleneck_id, max(date_info) AS current FROM waterway.sounding_results GROUP BY bottleneck_id) sr ON sr.bottleneck_id = bn.bottleneck_id - WHERE NOT bn.erased + WHERE bn.validity @> current_timestamp ORDER BY objnam; CREATE OR REPLACE VIEW waterway.sounding_differences AS