diff pkg/imports/wg.go @ 3302:ec6163c6687d

'Historicise' gauges on import Gauge data sets will be updated or a new version will be inserted depending on temporal validity and a timestamp marking the last update in the RIS-Index of a data set. The trigger on date_info is removed because the value is actually an attribut coming from the RIS-Index. Gauge measurements and predictions are associated to the version with matching temporal validity. Bottlenecks are always associated to the actual version of the gauge, although this might change as soon as bottlenecks are 'historicised', too.
author Tom Gottfried <tom@intevation.de>
date Thu, 16 May 2019 18:41:43 +0200
parents e640f51b5a4e
children 5932f9574493
line wrap: on
line diff
--- a/pkg/imports/wg.go	Thu May 16 17:22:33 2019 +0200
+++ b/pkg/imports/wg.go	Thu May 16 18:41:43 2019 +0200
@@ -65,11 +65,15 @@
 func (*WaterwayGauge) CleanUp() error { return nil }
 
 const (
-	deleteReferenceWaterLevelsSQL = `
-DELETE FROM waterway.gauges_reference_water_levels
-WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
-  AND depth_reference <> ALL($6)
-RETURNING depth_reference
+	eraseGaugeSQL = `
+UPDATE waterway.gauges SET
+  erased = true,
+   -- Set enddate of old entry to new startdate in case of overlap:
+  validity = validity - $2
+WHERE isrs_astext(location) = $1
+  AND NOT erased
+  -- Don't touch old entry if validity did not change: will be updated
+  AND validity <> $2
 `
 
 	insertGaugeSQL = `
@@ -83,7 +87,8 @@
   zero_point,
   geodref,
   date_info,
-  source_organization
+  source_organization,
+  lastupdate
 ) VALUES (
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
   $6,
@@ -94,17 +99,36 @@
   $12,
   $13,
   $14,
-  $15
-) ON CONFLICT (location) DO UPDATE SET
-    objname = EXCLUDED.objname,
-    geom = EXCLUDED.geom,
-    applicability_from_km = EXCLUDED.applicability_from_km,
-    applicability_to_km = EXCLUDED.applicability_to_km,
-    validity = EXCLUDED.validity,
-    zero_point = EXCLUDED.zero_point,
-    geodref = EXCLUDED.geodref,
-    date_info = EXCLUDED.date_info,
-    source_organization = EXCLUDED.source_organization
+  $15,
+  $16
+-- Exclusion constraints are not supported as arbiters.
+-- Thus we need to DO NOTHING here and use an extra UPDATE statement
+) ON CONFLICT DO NOTHING
+RETURNING 1
+`
+	updateGaugeSQL = `
+UPDATE waterway.gauges SET
+  objname = $6,
+  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
+  applicability_from_km = $9,
+  applicability_to_km = $10,
+  zero_point = $11,
+  geodref = $12,
+  date_info = $13,
+  source_organization = $14,
+  lastupdate = $15
+WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+  AND NOT erased
+  AND $15 > lastupdate
+RETURNING 1
+`
+
+	deleteReferenceWaterLevelsSQL = `
+DELETE FROM waterway.gauges_reference_water_levels
+WHERE isrs_astext(location) = $1
+  AND validity = $2
+  AND depth_reference <> ALL($3)
+RETURNING depth_reference
 `
 
 	isNtSDepthRefSQL = `
@@ -112,14 +136,16 @@
 
 	insertReferenceWaterLevelsSQL = `
 INSERT INTO waterway.gauges_reference_water_levels (
-  gauge_id,
+  location,
+  validity,
   depth_reference,
   value
 ) VALUES (
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
   $6,
-  $7
-) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET
+  $7,
+  $8
+) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
     value = EXCLUDED.value
 `
 )
@@ -190,21 +216,24 @@
 			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
 		}
 	}
-	feedback.Info("ignored gauges: %d", ignored)
-	feedback.Info("insert/update gauges: %d", len(gauges))
+	feedback.Info("Ignored gauges: %d", ignored)
+	feedback.Info("Further process %d gauges", len(gauges))
 
 	if len(gauges) == 0 {
-		return nil, UnchangedError("nothing to do")
+		return nil, UnchangedError("Nothing to do")
 	}
 
 	// insert/update the gauges
-	var insertStmt, deleteReferenceWaterLevelsStmt,
+	var eraseGaugeStmt, insertStmt, updateStmt,
+		deleteReferenceWaterLevelsStmt,
 		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
 	for _, x := range []struct {
 		sql  string
 		stmt **sql.Stmt
 	}{
+		{eraseGaugeSQL, &eraseGaugeStmt},
 		{insertGaugeSQL, &insertStmt},
+		{updateGaugeSQL, &updateStmt},
 		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
 		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
 		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
@@ -216,11 +245,13 @@
 		defer (*x.stmt).Close()
 	}
 
+	var unchanged int
+
 	for i := range gauges {
 		ic := &gauges[i]
 		dr := responseData[ic.jdx].RisdataReturn[ic.idx]
 
-		feedback.Info("insert/update %s", ic.code)
+		feedback.Info("Processing %s", ic.code)
 
 		var from, to sql.NullInt64
 
@@ -265,7 +296,7 @@
 			Lower:     tfrom,
 			Upper:     tto,
 			LowerType: pgtype.Inclusive,
-			UpperType: pgtype.Inclusive,
+			UpperType: pgtype.Exclusive,
 			Status:    pgtype.Present,
 		}
 
@@ -302,7 +333,22 @@
 		}
 		defer tx.Rollback()
 
-		if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
+		// Mark old entries of gauge as erased, if applicable
+		if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
+			ic.code.String(),
+			validity,
+		); err != nil {
+			feedback.Warn(handleError(err).Error())
+			if err2 := tx.Rollback(); err2 != nil {
+				return nil, err2
+			}
+			unchanged++
+			continue
+		}
+
+		// Try to insert gauge entry
+		var dummy int
+		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
 			ic.code.CountryCode,
 			ic.code.LoCode,
 			ic.code.FairwaySection,
@@ -317,42 +363,83 @@
 			geodref,
 			&dateInfo,
 			source,
-		); err != nil {
+			time.Time(*dr.Lastupdate),
+		).Scan(&dummy)
+		switch {
+		case err == sql.ErrNoRows:
+			// Assume constraint conflict, try to update
+			err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
+				ic.code.CountryCode,
+				ic.code.LoCode,
+				ic.code.FairwaySection,
+				ic.code.Orc,
+				ic.code.Hectometre,
+				string(*dr.Objname.Loc),
+				float64(*dr.Lon), float64(*dr.Lat),
+				from,
+				to,
+				float64(*dr.Zeropoint),
+				geodref,
+				&dateInfo,
+				source,
+				time.Time(*dr.Lastupdate),
+			).Scan(&dummy)
+			switch {
+			case err2 == sql.ErrNoRows:
+				feedback.Info("unchanged")
+				if err3 := tx.Rollback(); err3 != nil {
+					return nil, err3
+				}
+				unchanged++
+				continue
+			case err2 != nil:
+				feedback.Warn(handleError(err2).Error())
+				if err3 := tx.Rollback(); err3 != nil {
+					return nil, err3
+				}
+				unchanged++
+				continue
+			default:
+				feedback.Info("update")
+			}
+
+			// Remove obsolete reference water levels
+			var currLevels pgtype.VarcharArray
+			currLevels.Set([]string{
+				string(*dr.Reflevel1code),
+				string(*dr.Reflevel2code),
+				string(*dr.Reflevel3code),
+			})
+			rwls, err2 := tx.StmtContext(ctx,
+				deleteReferenceWaterLevelsStmt).QueryContext(ctx,
+				ic.code.String(),
+				&validity,
+				&currLevels,
+			)
+			if err2 != nil {
+				return nil, err2
+			}
+			defer rwls.Close()
+			for rwls.Next() {
+				var delRef string
+				if err2 = rwls.Scan(&delRef); err2 != nil {
+					return nil, err2
+				}
+				feedback.Warn("Removed reference water level %s from %s",
+					delRef, ic.code)
+			}
+		case err != nil:
 			feedback.Warn(handleError(err).Error())
-			tx.Rollback()
+			if err2 := tx.Rollback(); err2 != nil {
+				return nil, err2
+			}
+			unchanged++
 			continue
+		default:
+			feedback.Info("insert new version")
 		}
 
-		// Remove obsolete reference water levels
-		var currLevels pgtype.VarcharArray
-		currLevels.Set([]string{
-			string(*dr.Reflevel1code),
-			string(*dr.Reflevel2code),
-			string(*dr.Reflevel3code),
-		})
-		rwls, err := tx.StmtContext(
-			ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx,
-			ic.code.CountryCode,
-			ic.code.LoCode,
-			ic.code.FairwaySection,
-			ic.code.Orc,
-			ic.code.Hectometre,
-			&currLevels,
-		)
-		if err != nil {
-			return nil, err
-		}
-		defer rwls.Close()
-		for rwls.Next() {
-			var delRef string
-			if err = rwls.Scan(&delRef); err != nil {
-				return nil, err
-			}
-			feedback.Warn("Removed reference water level %s from %s",
-				delRef, ic.code)
-		}
-
-		// Insert/update reference water levels
+		// "Upsert" reference water levels
 		for _, wl := range []struct {
 			level **erdms.RisreflevelcodeType
 			value **erdms.RisreflevelvalueType
@@ -388,6 +475,7 @@
 				ic.code.FairwaySection,
 				ic.code.Orc,
 				ic.code.Hectometre,
+				&validity,
 				string(**wl.level),
 				int64(**wl.value),
 			); err != nil {
@@ -402,8 +490,12 @@
 		}
 	}
 
-	feedback.Info("Refreshing gauges took %s.",
+	feedback.Info("Importing gauges took %s",
 		time.Since(start))
 
+	if unchanged == len(gauges) {
+		return nil, UnchangedError("All gauges unchanged")
+	}
+
 	return nil, err
 }