diff pkg/imports/bn.go @ 3666:db87f34805fb

Align bottleneck validity at gauges Ensuring the validity of a bottleneck version is always contained by the validity of the referenced gauge version allows to reliably determine matching reference values of the gauge at a point in time. Since this implies that a bottleneck version might be cut into more than one time ranges, the concept of having only one non-erased version is no longer applicable and replaced by using the 'current' version of a bottleneck. Fairway availability data are always kept with the 'current' bottleneck version to have them at hand alltogether for analyses over longer time ranges.
author Tom Gottfried <tom@intevation.de>
date Sat, 15 Jun 2019 14:36:50 +0200
parents 29ef6d41e4af
children 0227670dedd5
line wrap: on
line diff
--- a/pkg/imports/bn.go	Sat Jun 15 09:24:28 2019 +0200
+++ b/pkg/imports/bn.go	Sat Jun 15 14:36:50 2019 +0200
@@ -43,21 +43,6 @@
 const BNJobKind JobKind = "bn"
 
 const (
-	hasBottleneckSQL = `
-WITH upd AS (
-  UPDATE waterway.bottlenecks SET
-    erased = true
-  WHERE bottleneck_id = $1
-    AND NOT erased
-    -- Don't touch old entry if new validity contains old: will be updated
-    AND NOT validity <@ $2
-  RETURNING 1
-)
--- Decide whether a new version will be INSERTed
-SELECT EXISTS(SELECT 1 FROM upd)
-  OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1)
-`
-
 	insertBottleneckSQL = `
 WITH
 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
@@ -80,15 +65,11 @@
   limiting,
   date_info,
   source_organization
-) VALUES (
+) SELECT
   $1,
-  $2,
-  isrs_fromText($3),
-  COALESCE(
-    (SELECT validity FROM waterway.gauges
-       WHERE location = isrs_fromText($3)
-         AND validity @> lower(CAST($2 AS tstzrange))),
-    tstzrange(NULL, NULL)),
+  validity * $2, -- intersections with gauge validity ranges
+  location,
+  validity,
   $4,
   $5,
   (SELECT r FROM r),
@@ -104,67 +85,57 @@
   $12,
   $13,
   $14
-)
-RETURNING id`
+  FROM waterway.gauges
+  WHERE location = isrs_fromText($3) AND validity && $2
+ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
+    gauge_location = EXCLUDED.gauge_location,
+    gauge_validity = EXCLUDED.gauge_validity,
+    objnam = EXCLUDED.objnam,
+    nobjnm = EXCLUDED.nobjnm,
+    stretch = EXCLUDED.stretch,
+    area = EXCLUDED.area,
+    rb = EXCLUDED.rb,
+    lb = EXCLUDED.lb,
+    responsible_country = EXCLUDED.responsible_country,
+    revisiting_time = EXCLUDED.revisiting_time,
+    limiting = EXCLUDED.limiting,
+    date_info = EXCLUDED.date_info,
+    source_organization = EXCLUDED.source_organization
+RETURNING id
+`
+
+	// Alignment with gauge validity might have generated new entries
+	// for the same time range. Thus, remove the old ones
+	deleteObsoleteBNSQL = `
+DELETE FROM waterway.bottlenecks
+WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
+`
 
 	fixBNValiditySQL = `
 UPDATE waterway.bottlenecks SET
    -- Set enddate of old entry to new startdate in case of overlap:
   validity = validity - $2
 WHERE bottleneck_id = $1
-  AND validity && $2
-  AND erased
-`
-
-	updateBottleneckSQL = `
-WITH
-bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))),
-r AS (SELECT isrsrange(
-    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
-    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
-UPDATE waterway.bottlenecks b SET
-  gauge_location = isrs_fromtext($2),
-  gauge_validity = COALESCE(
-    (SELECT validity FROM waterway.gauges g
-       WHERE g.location = isrs_fromText($2)
-         AND g.validity @> lower(b.validity)),
-    tstzrange(NULL, NULL)),
-  objnam = $3,
-  nobjnm = $4,
-  stretch = (SELECT r FROM r),
-  area = ISRSrange_area(
-    ISRSrange_axis((SELECT r FROM r), $14),
-    (SELECT ST_Collect(CAST(area AS geometry))
-        FROM waterway.waterway_area)),
-  rb = $7,
-  lb = $8,
-  responsible_country = $9,
-  revisiting_time = $10,
-  limiting = $11,
-  date_info = $12,
-  source_organization = $13,
-  validity = $15
-WHERE bottleneck_id = $1
-  AND NOT erased
-  AND $12 > date_info
-RETURNING id
+  AND validity && $2 AND NOT validity <@ $2
 `
 
 	deleteBottleneckMaterialSQL = `
-DELETE FROM waterway.bottlenecks_riverbed_materials
-WHERE bottleneck_id = $1
-  AND riverbed <> ALL($2)
-RETURNING riverbed
+WITH del AS (
+  DELETE FROM waterway.bottlenecks_riverbed_materials
+  WHERE bottleneck_id = ANY($1)
+    AND riverbed <> ALL($2)
+  RETURNING riverbed)
+SELECT DISTINCT riverbed FROM del
 `
 
 	insertBottleneckMaterialSQL = `
 INSERT INTO waterway.bottlenecks_riverbed_materials (
    bottleneck_id,
    riverbed
-) VALUES (
-   $1,
-   $2
-) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
+) SELECT *
+FROM unnest(CAST($1 AS int[])) AS bns,
+  unnest(CAST($2 AS varchar[])) AS materials
+ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
 `
 )
 
@@ -265,17 +236,16 @@
 
 	feedback.Info("Found %d bottlenecks for import", len(bns))
 
-	var hasStmt, insertStmt, fixValidityStmt, updateStmt,
+	var insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 		deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt
 
 	for _, x := range []struct {
 		sql  string
 		stmt **sql.Stmt
 	}{
-		{hasBottleneckSQL, &hasStmt},
 		{insertBottleneckSQL, &insertStmt},
+		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
 		{fixBNValiditySQL, &fixValidityStmt},
-		{updateBottleneckSQL, &updateStmt},
 		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
 		{insertBottleneckMaterialSQL, &insertMaterialStmt},
 		{trackImportSQL, &trackStmt},
@@ -294,7 +264,7 @@
 	for _, bn := range bns {
 		if err := storeBottleneck(
 			ctx, importID, conn, feedback, bn, &nids, tolerance,
-			hasStmt, insertStmt, fixValidityStmt, updateStmt,
+			insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 			deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil {
 			return nil, err
 		}
@@ -321,7 +291,7 @@
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
 	tolerance float64,
-	hasStmt, insertStmt, fixValidityStmt, updateStmt,
+	insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
 	deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt,
 ) error {
 	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
@@ -408,99 +378,57 @@
 	}
 	defer tx.Rollback()
 
-	var isNew bool
-	var nid int64
-	err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx,
+	var bnIds []int64
+	bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
 		bn.Bottleneck_id,
-		validity,
-	).Scan(&isNew)
-	switch {
-	case err != nil:
+		&validity,
+		bn.Fk_g_fid,
+		bn.OBJNAM,
+		bn.NOBJNM,
+		bn.From_ISRS, bn.To_ISRS,
+		rb,
+		lb,
+		country,
+		revisitingTime,
+		limiting,
+		bn.Date_Info,
+		bn.Source,
+		tolerance,
+	)
+	if err != nil {
+		feedback.Warn(handleError(err).Error())
+		return nil
+	}
+	defer bns.Close()
+	for bns.Next() {
+		var nid int64
+		if err := bns.Scan(&nid); err != nil {
+			return err
+		}
+		bnIds = append(bnIds, nid)
+	}
+	if err := bns.Err(); err != nil {
+		return err
+	}
+	if len(bnIds) == 0 {
+		feedback.Warn(
+			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
+		return nil
+	}
+
+	// Remove obsolete bottleneck version entries
+	var pgBnIds pgtype.Int8Array
+	pgBnIds.Set(bnIds)
+	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
+		bn.Bottleneck_id,
+		&validity,
+		&pgBnIds,
+	); err != nil {
 		feedback.Warn(handleError(err).Error())
 		if err2 := tx.Rollback(); err2 != nil {
 			return err2
 		}
 		return nil
-	case isNew:
-		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(
-			ctx,
-			bn.Bottleneck_id,
-			&validity,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-		).Scan(&nid)
-		if err != nil {
-			feedback.Warn(handleError(err).Error())
-			return nil
-		}
-		feedback.Info("insert new version")
-	case !isNew:
-		// try to update
-		err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
-			bn.Bottleneck_id,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-			&validity,
-		).Scan(&nid)
-		switch {
-		case err == sql.ErrNoRows:
-			feedback.Info("unchanged")
-			if err := tx.Rollback(); err != nil {
-				return err
-			}
-			return nil
-		case err != nil:
-			feedback.Warn(handleError(err).Error())
-			if err := tx.Rollback(); err != nil {
-				return err
-			}
-			return nil
-		default:
-			feedback.Info("update")
-
-			// Remove obsolete riverbed materials
-			var pgMaterials pgtype.VarcharArray
-			pgMaterials.Set(materials)
-			mtls, err := tx.StmtContext(ctx,
-				deleteMaterialStmt).QueryContext(ctx,
-				nid,
-				&pgMaterials,
-			)
-			if err != nil {
-				return err
-			}
-			defer mtls.Close()
-			for mtls.Next() {
-				var delMat string
-				if err := mtls.Scan(&delMat); err != nil {
-					return err
-				}
-				feedback.Warn("Removed riverbed material %s", delMat)
-			}
-			if err := mtls.Err(); err != nil {
-				return err
-			}
-		}
 	}
 
 	// Set end of validity of old version to start of new version
@@ -516,22 +444,47 @@
 		return nil
 	}
 
-	// Insert riverbed materials
 	if materials != nil {
-		for _, mat := range materials {
-			if _, err := tx.StmtContext(ctx,
-				insertMaterialStmt).ExecContext(
-				ctx, nid, mat); err != nil {
-				feedback.Warn("Failed to insert riverbed material '%s'", mat)
-				feedback.Warn(handleError(err).Error())
+		// Remove obsolete riverbed materials
+		var pgMaterials pgtype.VarcharArray
+		pgMaterials.Set(materials)
+		mtls, err := tx.StmtContext(ctx,
+			deleteMaterialStmt).QueryContext(ctx,
+			&pgBnIds,
+			&pgMaterials,
+		)
+		if err != nil {
+			return err
+		}
+		defer mtls.Close()
+		for mtls.Next() {
+			var delMat string
+			if err := mtls.Scan(&delMat); err != nil {
+				return err
 			}
+			feedback.Warn("Removed riverbed material %s", delMat)
+		}
+		if err := mtls.Err(); err != nil {
+			return err
+		}
+
+		// Insert riverbed materials
+		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
+			&pgBnIds,
+			&pgMaterials,
+		); err != nil {
+			feedback.Warn("Failed to insert riverbed materials")
+			feedback.Warn(handleError(err).Error())
+			return nil
 		}
 	}
 
-	if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-		ctx, importID, "waterway.bottlenecks", nid,
-	); err != nil {
-		return err
+	for _, nid := range bnIds {
+		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
+			ctx, importID, "waterway.bottlenecks", nid,
+		); err != nil {
+			return err
+		}
 	}
 	if err = tx.Commit(); err != nil {
 		return err