Mercurial > gemma
changeset 3402:c04b1409a596
Fix adaptation of gauge temporal validity
The necessity to update validity implies the new value has to cascade
through to referencing columns.
Because measurements falling into the validity of a new version might
have been already imported, a deadlock situation might occur with
the CHECK constraint on gauge_measurements preventing an update
of validity and the exclusion constraint on gauges preventing the
insertion of a new version before the update. Thus, the exclusion
constraint is now deferred and cannot longer be used with ON CONFLICT
in the INSERT statement. Gauge measurements matching the validity of
a new gauge version are now 'moved' to that new version before the
validity of the old version is adapted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 23 May 2019 12:27:14 +0200 |
parents | 9f4308edc70a |
children | d7cc5cda82a9 |
files | pkg/imports/wg.go schema/gemma.sql |
diffstat | 2 files changed, 105 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/wg.go Thu May 23 11:34:34 2019 +0200 +++ b/pkg/imports/wg.go Thu May 23 12:27:14 2019 +0200 @@ -72,14 +72,18 @@ ` eraseGaugeSQL = ` -UPDATE waterway.gauges SET - erased = true, - -- Set enddate of old entry to new startdate in case of overlap: - validity = validity - $2 -WHERE isrs_astext(location) = $1 - AND NOT erased - -- Don't touch old entry if validity did not change: will be updated - AND validity <> $2 +WITH upd AS ( + UPDATE waterway.gauges SET + erased = true + WHERE isrs_astext(location) = $1 + AND NOT erased + -- Don't touch old entry if validity did not change: will be updated + AND validity <> $2 + RETURNING 1 +) +-- Decide whether a new version will be INSERTed +SELECT EXISTS(SELECT 1 FROM upd) + OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` @@ -107,11 +111,26 @@ $14, $15, $16 --- Exclusion constraints are not supported as arbiters. --- Thus we need to DO NOTHING here and use an extra UPDATE statement -) ON CONFLICT DO NOTHING -RETURNING 1 +) +` + + moveGMSQL = ` +UPDATE waterway.gauge_measurements +-- Associate measurements to matching gauge version +SET validity = $2 +WHERE isrs_astext(location) = $1 + AND measure_date <@ CAST($2 AS tstzrange) ` + + fixValiditySQL = ` +UPDATE waterway.gauges SET + -- Set enddate of old entry to new startdate in case of overlap: + validity = validity - $2 +WHERE isrs_astext(location) = $1 + AND validity && $2 + AND erased +` + updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, @@ -178,7 +197,7 @@ return nil, err } - var eraseGaugeStmt, insertStmt, updateStmt, + var eraseGaugeStmt, insertStmt, moveGMStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { @@ -187,6 +206,8 @@ }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, + {moveGMSQL, &moveGMStmt}, + {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, @@ -294,41 +315,77 @@ } defer tx.Rollback() - // Mark old entries of gauge as erased, if applicable - if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, + // Mark old entry of gauge as erased, if applicable + var isNew bool + err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, - ); err != nil { + ).Scan(&isNew) + switch { + case err != nil: feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue - } - - // Try to insert gauge entry - var dummy int - err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, - code.CountryCode, - code.LoCode, - code.FairwaySection, - code.Orc, - code.Hectometre, - dr.Objname.Loc, - dr.Lon, dr.Lat, - from, - to, - &validity, - dr.Zeropoint, - geodref, - &dateInfo, - source, - time.Time(*dr.Lastupdate), - ).Scan(&dummy) - switch { - case err == sql.ErrNoRows: - // Assume constraint conflict, try to update + case isNew: + // insert gauge version entry + if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, + from, + to, + &validity, + dr.Zeropoint, + geodref, + &dateInfo, + source, + time.Time(*dr.Lastupdate), + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + // Move gauge measurements to new matching gauge version, + // if applicable + if _, err = tx.StmtContext(ctx, moveGMStmt).ExecContext(ctx, + code.String(), + &validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + // Set end of validity of old version to start of new version + // in case of overlap + if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( + ctx, + code.String(), + &validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + feedback.Info("insert new version") + case !isNew: + // try to update + var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, @@ -392,15 +449,6 @@ if err := rwls.Err(); err != nil { return nil, err } - case err != nil: - feedback.Warn(handleError(err).Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 - } - unchanged++ - continue - default: - feedback.Info("insert new version") } // "Upsert" reference water levels
--- a/schema/gemma.sql Thu May 23 11:34:34 2019 +0200 +++ b/schema/gemma.sql Thu May 23 12:27:14 2019 +0200 @@ -290,6 +290,7 @@ erased boolean NOT NULL DEFAULT false, PRIMARY KEY (location, validity), EXCLUDE USING GiST (isrs_astext(location) WITH =, validity WITH &&) + DEFERRABLE INITIALLY DEFERRED ) -- Allow only one non-erased entry per location CREATE UNIQUE INDEX gauges_erased_unique_constraint @@ -299,7 +300,7 @@ CREATE TABLE gauges_reference_water_levels ( location isrs NOT NULL, validity tstzrange NOT NULL, - FOREIGN KEY (location, validity) REFERENCES gauges, + FOREIGN KEY (location, validity) REFERENCES gauges ON UPDATE CASCADE, -- Omit foreign key constraint to be able to store not NtS-compliant -- names, too: depth_reference varchar NOT NULL, -- REFERENCES depth_references, @@ -312,7 +313,8 @@ location isrs NOT NULL, validity tstzrange NOT NULL, CONSTRAINT gauge_key - FOREIGN KEY (location, validity) REFERENCES gauges, + FOREIGN KEY (location, validity) REFERENCES gauges + ON UPDATE CASCADE, measure_date timestamp with time zone NOT NULL, CHECK (measure_date <@ validity), country_code char(2) NOT NULL REFERENCES countries, @@ -331,7 +333,8 @@ location isrs NOT NULL, validity tstzrange NOT NULL, CONSTRAINT gauge_key - FOREIGN KEY (location, validity) REFERENCES gauges, + FOREIGN KEY (location, validity) REFERENCES gauges + ON UPDATE CASCADE, measure_date timestamp with time zone NOT NULL, CHECK (measure_date >= lower(validity)), country_code char(2) NOT NULL REFERENCES countries, @@ -490,7 +493,8 @@ bottleneck_id varchar UNIQUE NOT NULL, gauge_location isrs NOT NULL, gauge_validity tstzrange NOT NULL, - FOREIGN KEY (gauge_location, gauge_validity) REFERENCES gauges, + FOREIGN KEY (gauge_location, gauge_validity) REFERENCES gauges + ON UPDATE CASCADE, -- XXX: DRC references "ch. 3.1.1", which does not exist in document. objnam varchar, nobjnm varchar,