Mercurial > gemma
diff pkg/imports/wg.go @ 3402:c04b1409a596
Fix adaptation of gauge temporal validity
The necessity to update validity implies the new value has to cascade
through to referencing columns.
Because measurements falling into the validity of a new version might
have been already imported, a deadlock situation might occur with
the CHECK constraint on gauge_measurements preventing an update
of validity and the exclusion constraint on gauges preventing the
insertion of a new version before the update. Thus, the exclusion
constraint is now deferred and cannot longer be used with ON CONFLICT
in the INSERT statement. Gauge measurements matching the validity of
a new gauge version are now 'moved' to that new version before the
validity of the old version is adapted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 23 May 2019 12:27:14 +0200 |
parents | 0f6b156cff55 |
children | aa3c83fb7018 |
line wrap: on
line diff
--- a/pkg/imports/wg.go Thu May 23 11:34:34 2019 +0200 +++ b/pkg/imports/wg.go Thu May 23 12:27:14 2019 +0200 @@ -72,14 +72,18 @@ ` eraseGaugeSQL = ` -UPDATE waterway.gauges SET - erased = true, - -- Set enddate of old entry to new startdate in case of overlap: - validity = validity - $2 -WHERE isrs_astext(location) = $1 - AND NOT erased - -- Don't touch old entry if validity did not change: will be updated - AND validity <> $2 +WITH upd AS ( + UPDATE waterway.gauges SET + erased = true + WHERE isrs_astext(location) = $1 + AND NOT erased + -- Don't touch old entry if validity did not change: will be updated + AND validity <> $2 + RETURNING 1 +) +-- Decide whether a new version will be INSERTed +SELECT EXISTS(SELECT 1 FROM upd) + OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` @@ -107,11 +111,26 @@ $14, $15, $16 --- Exclusion constraints are not supported as arbiters. --- Thus we need to DO NOTHING here and use an extra UPDATE statement -) ON CONFLICT DO NOTHING -RETURNING 1 +) +` + + moveGMSQL = ` +UPDATE waterway.gauge_measurements +-- Associate measurements to matching gauge version +SET validity = $2 +WHERE isrs_astext(location) = $1 + AND measure_date <@ CAST($2 AS tstzrange) ` + + fixValiditySQL = ` +UPDATE waterway.gauges SET + -- Set enddate of old entry to new startdate in case of overlap: + validity = validity - $2 +WHERE isrs_astext(location) = $1 + AND validity && $2 + AND erased +` + updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, @@ -178,7 +197,7 @@ return nil, err } - var eraseGaugeStmt, insertStmt, updateStmt, + var eraseGaugeStmt, insertStmt, moveGMStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { @@ -187,6 +206,8 @@ }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, + {moveGMSQL, &moveGMStmt}, + {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, @@ -294,41 +315,77 @@ } defer tx.Rollback() - // Mark old entries of gauge as erased, if applicable - if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, + // Mark old entry of gauge as erased, if applicable + var isNew bool + err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, - ); err != nil { + ).Scan(&isNew) + switch { + case err != nil: feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue - } - - // Try to insert gauge entry - var dummy int - err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, - code.CountryCode, - code.LoCode, - code.FairwaySection, - code.Orc, - code.Hectometre, - dr.Objname.Loc, - dr.Lon, dr.Lat, - from, - to, - &validity, - dr.Zeropoint, - geodref, - &dateInfo, - source, - time.Time(*dr.Lastupdate), - ).Scan(&dummy) - switch { - case err == sql.ErrNoRows: - // Assume constraint conflict, try to update + case isNew: + // insert gauge version entry + if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, + from, + to, + &validity, + dr.Zeropoint, + geodref, + &dateInfo, + source, + time.Time(*dr.Lastupdate), + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + // Move gauge measurements to new matching gauge version, + // if applicable + if _, err = tx.StmtContext(ctx, moveGMStmt).ExecContext(ctx, + code.String(), + &validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + // Set end of validity of old version to start of new version + // in case of overlap + if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( + ctx, + code.String(), + &validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + feedback.Info("insert new version") + case !isNew: + // try to update + var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, @@ -392,15 +449,6 @@ if err := rwls.Err(); err != nil { return nil, err } - case err != nil: - feedback.Warn(handleError(err).Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 - } - unchanged++ - continue - default: - feedback.Info("insert new version") } // "Upsert" reference water levels