diff pkg/imports/wg.go @ 3402:c04b1409a596

Fix adaptation of gauge temporal validity The necessity to update validity implies the new value has to cascade through to referencing columns. Because measurements falling into the validity of a new version might have been already imported, a deadlock situation might occur with the CHECK constraint on gauge_measurements preventing an update of validity and the exclusion constraint on gauges preventing the insertion of a new version before the update. Thus, the exclusion constraint is now deferred and cannot longer be used with ON CONFLICT in the INSERT statement. Gauge measurements matching the validity of a new gauge version are now 'moved' to that new version before the validity of the old version is adapted.
author Tom Gottfried <tom@intevation.de>
date Thu, 23 May 2019 12:27:14 +0200
parents 0f6b156cff55
children aa3c83fb7018
line wrap: on
line diff
--- a/pkg/imports/wg.go	Thu May 23 11:34:34 2019 +0200
+++ b/pkg/imports/wg.go	Thu May 23 12:27:14 2019 +0200
@@ -72,14 +72,18 @@
 `
 
 	eraseGaugeSQL = `
-UPDATE waterway.gauges SET
-  erased = true,
-   -- Set enddate of old entry to new startdate in case of overlap:
-  validity = validity - $2
-WHERE isrs_astext(location) = $1
-  AND NOT erased
-  -- Don't touch old entry if validity did not change: will be updated
-  AND validity <> $2
+WITH upd AS (
+  UPDATE waterway.gauges SET
+    erased = true
+  WHERE isrs_astext(location) = $1
+    AND NOT erased
+    -- Don't touch old entry if validity did not change: will be updated
+    AND validity <> $2
+  RETURNING 1
+)
+-- Decide whether a new version will be INSERTed
+SELECT EXISTS(SELECT 1 FROM upd)
+  OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
 `
 
 	insertGaugeSQL = `
@@ -107,11 +111,26 @@
   $14,
   $15,
   $16
--- Exclusion constraints are not supported as arbiters.
--- Thus we need to DO NOTHING here and use an extra UPDATE statement
-) ON CONFLICT DO NOTHING
-RETURNING 1
+)
+`
+
+	moveGMSQL = `
+UPDATE waterway.gauge_measurements
+-- Associate measurements to matching gauge version
+SET validity = $2
+WHERE isrs_astext(location) = $1
+  AND measure_date <@ CAST($2 AS tstzrange)
 `
+
+	fixValiditySQL = `
+UPDATE waterway.gauges SET
+   -- Set enddate of old entry to new startdate in case of overlap:
+  validity = validity - $2
+WHERE isrs_astext(location) = $1
+  AND validity && $2
+  AND erased
+`
+
 	updateGaugeSQL = `
 UPDATE waterway.gauges SET
   objname = $6,
@@ -178,7 +197,7 @@
 		return nil, err
 	}
 
-	var eraseGaugeStmt, insertStmt, updateStmt,
+	var eraseGaugeStmt, insertStmt, moveGMStmt, fixValidityStmt, updateStmt,
 		deleteReferenceWaterLevelsStmt,
 		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
 	for _, x := range []struct {
@@ -187,6 +206,8 @@
 	}{
 		{eraseGaugeSQL, &eraseGaugeStmt},
 		{insertGaugeSQL, &insertStmt},
+		{moveGMSQL, &moveGMStmt},
+		{fixValiditySQL, &fixValidityStmt},
 		{updateGaugeSQL, &updateStmt},
 		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
 		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
@@ -294,41 +315,77 @@
 			}
 			defer tx.Rollback()
 
-			// Mark old entries of gauge as erased, if applicable
-			if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
+			// Mark old entry of gauge as erased, if applicable
+			var isNew bool
+			err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
 				code.String(),
 				validity,
-			); err != nil {
+			).Scan(&isNew)
+			switch {
+			case err != nil:
 				feedback.Warn(handleError(err).Error())
 				if err2 := tx.Rollback(); err2 != nil {
 					return nil, err2
 				}
 				unchanged++
 				continue
-			}
-
-			// Try to insert gauge entry
-			var dummy int
-			err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
-				code.CountryCode,
-				code.LoCode,
-				code.FairwaySection,
-				code.Orc,
-				code.Hectometre,
-				dr.Objname.Loc,
-				dr.Lon, dr.Lat,
-				from,
-				to,
-				&validity,
-				dr.Zeropoint,
-				geodref,
-				&dateInfo,
-				source,
-				time.Time(*dr.Lastupdate),
-			).Scan(&dummy)
-			switch {
-			case err == sql.ErrNoRows:
-				// Assume constraint conflict, try to update
+			case isNew:
+				// insert gauge version entry
+				if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
+					code.CountryCode,
+					code.LoCode,
+					code.FairwaySection,
+					code.Orc,
+					code.Hectometre,
+					dr.Objname.Loc,
+					dr.Lon, dr.Lat,
+					from,
+					to,
+					&validity,
+					dr.Zeropoint,
+					geodref,
+					&dateInfo,
+					source,
+					time.Time(*dr.Lastupdate),
+				); err != nil {
+					feedback.Warn(handleError(err).Error())
+					if err2 := tx.Rollback(); err2 != nil {
+						return nil, err2
+					}
+					unchanged++
+					continue
+				}
+				// Move gauge measurements to new matching gauge version,
+				// if applicable
+				if _, err = tx.StmtContext(ctx, moveGMStmt).ExecContext(ctx,
+					code.String(),
+					&validity,
+				); err != nil {
+					feedback.Warn(handleError(err).Error())
+					if err2 := tx.Rollback(); err2 != nil {
+						return nil, err2
+					}
+					unchanged++
+					continue
+				}
+				// Set end of validity of old version to start of new version
+				// in case of overlap
+				if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
+					ctx,
+					code.String(),
+					&validity,
+				); err != nil {
+					feedback.Warn(handleError(err).Error())
+					if err2 := tx.Rollback(); err2 != nil {
+						return nil, err2
+					}
+					unchanged++
+					continue
+				}
+				feedback.Info("insert new version")
+			case !isNew:
+				// try to update
+				var dummy int
 				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
 					code.CountryCode,
 					code.LoCode,
@@ -392,15 +449,6 @@
 				if err := rwls.Err(); err != nil {
 					return nil, err
 				}
-			case err != nil:
-				feedback.Warn(handleError(err).Error())
-				if err2 := tx.Rollback(); err2 != nil {
-					return nil, err2
-				}
-				unchanged++
-				continue
-			default:
-				feedback.Info("insert new version")
 			}
 
 			// "Upsert" reference water levels