changeset 3705:7006b92c0334

Handle updates (vs. historized and new versions) separately. We need this distinction as updated data currently can not be reviewed. More precisely: it can not be declined after review, as the old data is updated in place. The current exclusion from the review is a workaround and not meant to be the final solution. Note that there are additional minor problems, like the fact that the updated data is not counted as changed data for the import.
author Sascha Wilde <wilde@intevation.de>
date Wed, 19 Jun 2019 17:00:08 +0200
parents b07511ff859e
children c3b0f2912b6e
files pkg/imports/bn.go
diffstat 1 files changed, 151 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/bn.go	Wed Jun 19 12:34:48 2019 +0200
+++ b/pkg/imports/bn.go	Wed Jun 19 17:00:08 2019 +0200
@@ -11,6 +11,7 @@
 // Author(s):
 //  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
 //  * Tom Gottfried <tom.gottfried@intevation.de>
+//  * Sascha Wilde <sascha.wilde@intevation.de>
 
 package imports
 
@@ -87,20 +88,54 @@
   $14
   FROM waterway.gauges
   WHERE location = isrs_fromText($3) AND validity && $2
-ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
-    gauge_location = EXCLUDED.gauge_location,
-    gauge_validity = EXCLUDED.gauge_validity,
-    objnam = EXCLUDED.objnam,
-    nobjnm = EXCLUDED.nobjnm,
-    stretch = EXCLUDED.stretch,
-    area = EXCLUDED.area,
-    rb = EXCLUDED.rb,
-    lb = EXCLUDED.lb,
-    responsible_country = EXCLUDED.responsible_country,
-    revisiting_time = EXCLUDED.revisiting_time,
-    limiting = EXCLUDED.limiting,
-    date_info = EXCLUDED.date_info,
-    source_organization = EXCLUDED.source_organization
+RETURNING id
+`
+
+	updateBottleneckSQL = `
+WITH
+bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))),
+r AS (SELECT isrsrange(
+    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
+    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
+UPDATE waterway.bottlenecks SET (
+  bottleneck_id,
+  validity,
+  gauge_location,
+  gauge_validity,
+  objnam,
+  nobjnm,
+  stretch,
+  area,
+  rb,
+  lb,
+  responsible_country,
+  revisiting_time,
+  limiting,
+  date_info,
+  source_organization
+) = ( SELECT
+  $2,
+  validity * $3, -- intersections with gauge validity ranges
+  location,
+  validity,
+  $5,
+  $6,
+  (SELECT r FROM r),
+  ISRSrange_area(
+    ISRSrange_axis((SELECT r FROM r),
+                   $16),
+    (SELECT ST_Collect(CAST(area AS geometry))
+        FROM waterway.waterway_area)),
+  $9,
+  $10,
+  $11,
+  $12::smallint,
+  $13,
+  $14::timestamptz,
+  $15
+  FROM waterway.gauges
+  WHERE location = isrs_fromText($4) AND validity && $3 )
+WHERE id=$1
 RETURNING id
 `
 
@@ -148,6 +183,20 @@
 )
 `
 
+	findMatchBottleneckSQL = `
+SELECT id FROM waterway.bottlenecks
+WHERE (
+  bottleneck_id,
+  validity,
+  staging_done
+) = ( SELECT
+  $1,
+  validity * $2, -- intersections with gauge validity ranges
+  true
+  FROM waterway.gauges
+  WHERE location = isrs_fromText($3) AND validity && $2
+)
+`
 	// Alignment with gauge validity might have generated new entries
 	// for the same time range. Thus, remove the old ones
 	deleteObsoleteBNSQL = `
@@ -280,16 +329,18 @@
 
 	feedback.Info("Found %d bottlenecks for import", len(bns))
 
-	var insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
-		fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
-		trackStmt *sql.Stmt
+	var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt,
+		deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt,
+		insertMaterialStmt, trackStmt *sql.Stmt
 
 	for _, x := range []struct {
 		sql  string
 		stmt **sql.Stmt
 	}{
 		{insertBottleneckSQL, &insertStmt},
+		{updateBottleneckSQL, &updateStmt},
 		{findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
+		{findMatchBottleneckSQL, &findMatchingBNStmt},
 		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
 		{fixBNValiditySQL, &fixValidityStmt},
 		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
@@ -310,8 +361,10 @@
 	for _, bn := range bns {
 		if err := storeBottleneck(
 			ctx, importID, conn, feedback, bn, &nids, tolerance,
-			insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
-			fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
+			insertStmt, updateStmt,
+			findExactMatchingBNStmt, findMatchingBNStmt,
+			deleteObsoleteBNStmt, fixValidityStmt,
+			deleteMaterialStmt, insertMaterialStmt,
 			trackStmt); err != nil {
 			return nil, err
 		}
@@ -338,8 +391,10 @@
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
 	tolerance float64,
-	insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
-	fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
+	insertStmt, updateStmt,
+	findExactMatchingBNStmt, findMatchingBNStmt,
+	deleteObsoleteBNStmt, fixValidityStmt,
+	deleteMaterialStmt, insertMaterialStmt,
 	trackStmt *sql.Stmt,
 ) error {
 	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
@@ -464,6 +519,24 @@
 		return nil
 	}
 
+	// Check if an bottleneck with the same identity
+	// (bottleneck_id,validity) already exists:
+	// Check if an bottleneck identical to the one we would insert already
+	// exists:
+	var existing_bn_id *int64
+	err = findMatchingBNStmt.QueryRowContext(ctx,
+		bn.Bottleneck_id,
+		&validity,
+		bn.Fk_g_fid,
+	).Scan(&existing_bn_id)
+	switch {
+	case err == sql.ErrNoRows:
+		existing_bn_id = nil
+	case err != nil:
+		// This is unexpected and propably a serious error
+		return err
+	}
+
 	tx, err := conn.BeginTx(ctx, nil)
 	if err != nil {
 		return err
@@ -471,22 +544,47 @@
 	defer tx.Rollback()
 
 	var bnIds []int64
-	bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-		bn.Fk_g_fid,
-		bn.OBJNAM,
-		bn.NOBJNM,
-		bn.From_ISRS, bn.To_ISRS,
-		rb,
-		lb,
-		country,
-		revisitingTime,
-		limiting,
-		bn.Date_Info,
-		bn.Source,
-		tolerance,
-	)
+	if existing_bn_id != nil {
+		feedback.Info("Bottelneck '%s' "+
+			"with matching validity already exists:"+
+			"UPDATING", bn.Bottleneck_id)
+		// Updating existnig BN data:
+		bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx,
+			existing_bn_id,
+			bn.Bottleneck_id,
+			&validity,
+			bn.Fk_g_fid,
+			bn.OBJNAM,
+			bn.NOBJNM,
+			bn.From_ISRS, bn.To_ISRS,
+			rb,
+			lb,
+			country,
+			revisitingTime,
+			limiting,
+			bn.Date_Info,
+			bn.Source,
+			tolerance,
+		)
+	} else {
+		// New BN data:
+		bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
+			bn.Bottleneck_id,
+			&validity,
+			bn.Fk_g_fid,
+			bn.OBJNAM,
+			bn.NOBJNM,
+			bn.From_ISRS, bn.To_ISRS,
+			rb,
+			lb,
+			country,
+			revisitingTime,
+			limiting,
+			bn.Date_Info,
+			bn.Source,
+			tolerance,
+		)
+	}
 	if err != nil {
 		feedback.Warn(handleError(err).Error())
 		return nil
@@ -572,16 +670,26 @@
 		}
 	}
 
-	for _, nid := range bnIds {
-		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-			ctx, importID, "waterway.bottlenecks", nid,
-		); err != nil {
-			return err
+	// Only add new BN data to tracking for staging review.
+	//
+	// FIXME: Review for updated bottlenecks is currently not possible, as
+	// the update is done instantly in place.
+	if existing_bn_id == nil {
+		for _, nid := range bnIds {
+			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
+				ctx, importID, "waterway.bottlenecks", nid,
+			); err != nil {
+				return err
+			}
 		}
 	}
+
 	if err = tx.Commit(); err != nil {
 		return err
 	}
-	*nids = append(*nids, bn.Bottleneck_id)
+	// See above...
+	if existing_bn_id == nil {
+		*nids = append(*nids, bn.Bottleneck_id)
+	}
 	return nil
 }