diff pkg/imports/bn.go @ 4113:f39d20427e89 request_hist_bns

WIP: Reworked bottleneck import: - all available bottlenecks from the service are loaded (including historic data) - kudos to Tom Gottfried - Bottlenecks which are identical to an exiting (non staged) one are recognized as "unchanged" and ignored. - All remaining Bottlenecks are inserted into Staging area - On "staging done" all existing bottlenecks with conflicting (bottleneck_id,validity) are deleted (replaced by the new bottlenecks). Known Issues: - Sometimes the identification of unchanged bottlenecks does not work, needs debugging. - The front end does not handle historic data correctly (we need to define what "correctly" should be). - Save points should be used to distinguish ignored errors from fatal ones.
author Sascha Wilde <wilde@intevation.de>
date Wed, 31 Jul 2019 17:04:31 +0200
parents 861760675497
children 0cf0008070db
line wrap: on
line diff
--- a/pkg/imports/bn.go	Tue Jul 30 13:14:47 2019 +0200
+++ b/pkg/imports/bn.go	Wed Jul 31 17:04:31 2019 +0200
@@ -90,51 +90,6 @@
 RETURNING id
 `
 
-	updateBottleneckSQL = `
-WITH
-bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))),
-r AS (SELECT isrsrange(
-    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
-    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
-UPDATE waterway.bottlenecks SET (
-  bottleneck_id,
-  validity,
-  gauge_location,
-  objnam,
-  nobjnm,
-  stretch,
-  area,
-  rb,
-  lb,
-  responsible_country,
-  revisiting_time,
-  limiting,
-  date_info,
-  source_organization
-) = (
-  $2,
-  $3::tstzrange,
-  isrs_fromText($4),
-  $5,
-  $6,
-  (SELECT r FROM r),
-  ISRSrange_area(
-    ISRSrange_axis((SELECT r FROM r),
-                   $16),
-    (SELECT ST_Collect(CAST(area AS geometry))
-        FROM waterway.waterway_area)),
-  $9,
-  $10,
-  $11,
-  $12::smallint,
-  $13,
-  $14::timestamptz,
-  $15
-)
-WHERE id=$1
-RETURNING id
-`
-
 	findExactMatchBottleneckSQL = `
 WITH
 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
@@ -175,44 +130,10 @@
 )
 `
 
-	findMatchBottleneckSQL = `
+	findIntersectingBottleneckSQL = `
 SELECT id FROM waterway.bottlenecks
-WHERE (
-  bottleneck_id,
-  validity,
-  staging_done
-) = (
-  $1,
-  $2::tstzrange,
-  true
-)
-`
-	// FIXME: Is this still neede wtih the new simplified historization
-	// model?  My intuition is: no it isn't and should be removed, but we
-	// should double check before doing so... [sw]
-	//
-	// Alignment with gauge validity might have generated new entries
-	// for the same time range. Thus, remove the old ones
-	deleteObsoleteBNSQL = `
-DELETE FROM waterway.bottlenecks
-WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
-`
-
-	fixBNValiditySQL = `
-UPDATE waterway.bottlenecks SET
-   -- Set enddate of old entry to new startdate in case of overlap:
-  validity = validity - $2
-WHERE bottleneck_id = $1
-  AND validity && $2 AND NOT validity <@ $2
-`
-
-	deleteBottleneckMaterialSQL = `
-WITH del AS (
-  DELETE FROM waterway.bottlenecks_riverbed_materials
-  WHERE bottleneck_id = ANY($1)
-    AND riverbed <> ALL($2)
-  RETURNING riverbed)
-SELECT DISTINCT riverbed FROM del
+WHERE (bottleneck_id, staging_done) = ($1, true)
+  AND $2::tstzrange && validity
 `
 
 	insertBottleneckMaterialSQL = `
@@ -224,6 +145,25 @@
   unnest(CAST($2 AS varchar[])) AS materials
 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
 `
+
+	bnStageDoneDeleteSQL = `
+DELETE FROM waterway.bottlenecks WHERE id IN (
+  SELECT key
+  FROM import.track_imports
+  WHERE import_id = $1
+        AND relation = 'waterway.bottlenecks'::regclass
+	AND deletion
+)`
+
+	bnStageDoneSQL = `
+UPDATE waterway.bottlenecks SET staging_done = true
+WHERE id IN (
+  SELECT key
+  FROM import.track_imports
+  WHERE import_id = $1
+        AND relation = 'waterway.bottlenecks'::regclass
+	AND NOT deletion
+)`
 )
 
 type bnJobCreator struct{}
@@ -245,22 +185,16 @@
 	}
 }
 
-const (
-	bnStageDoneSQL = `
-UPDATE waterway.bottlenecks SET staging_done = true
-WHERE id IN (
-  SELECT key from import.track_imports
-  WHERE import_id = $1 AND
-        relation = 'waterway.bottlenecks'::regclass)`
-)
-
 // StageDone moves the imported bottleneck out of the staging area.
 func (bnJobCreator) StageDone(
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
 ) error {
-	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
+	_, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
+	if err == nil {
+		_, err = tx.ExecContext(ctx, bnStageDoneSQL, id)
+	}
 	return err
 }
 
@@ -325,8 +259,7 @@
 
 	feedback.Info("Found %d bottlenecks for import", len(bns))
 
-	var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt,
-		deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt,
+	var insertStmt, findExactMatchingBNStmt, findIntersectingBNStmt,
 		insertMaterialStmt, trackStmt *sql.Stmt
 
 	for _, x := range []struct {
@@ -334,14 +267,10 @@
 		stmt **sql.Stmt
 	}{
 		{insertBottleneckSQL, &insertStmt},
-		{updateBottleneckSQL, &updateStmt},
 		{findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
-		{findMatchBottleneckSQL, &findMatchingBNStmt},
-		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
-		{fixBNValiditySQL, &fixValidityStmt},
-		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
+		{findIntersectingBottleneckSQL, &findIntersectingBNStmt},
 		{insertBottleneckMaterialSQL, &insertMaterialStmt},
-		{trackImportSQL, &trackStmt},
+		{trackImportDeletionSQL, &trackStmt},
 	} {
 		var err error
 		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
@@ -351,16 +280,18 @@
 	}
 
 	var nids []string
+	seenOldBnIds := make(map[int64]bool)
 
 	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
 
 	for _, bn := range bns {
 		if err := storeBottleneck(
-			ctx, importID, conn, feedback, bn, &nids, tolerance,
-			insertStmt, updateStmt,
-			findExactMatchingBNStmt, findMatchingBNStmt,
-			deleteObsoleteBNStmt, fixValidityStmt,
-			deleteMaterialStmt, insertMaterialStmt,
+			ctx, importID, conn, feedback, bn,
+			&nids, &seenOldBnIds, tolerance,
+			insertStmt,
+			findExactMatchingBNStmt,
+			findIntersectingBNStmt,
+			insertMaterialStmt,
 			trackStmt); err != nil {
 			return nil, err
 		}
@@ -386,11 +317,11 @@
 	feedback Feedback,
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
+	seenOldBnIds *map[int64]bool,
 	tolerance float64,
-	insertStmt, updateStmt,
-	findExactMatchingBNStmt, findMatchingBNStmt,
-	deleteObsoleteBNStmt, fixValidityStmt,
-	deleteMaterialStmt, insertMaterialStmt,
+	insertStmt,
+	findExactMatchingBNStmt, findIntersectingBNStmt,
+	insertMaterialStmt,
 	trackStmt *sql.Stmt,
 ) error {
 	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
@@ -410,7 +341,7 @@
 		// version is advisable.
 		//
 		// Never the less, the current solution "just works" for the
-		// rtime being...  --  sw
+		// time being and reflects the upstream systems...  --  sw
 		feedback.Warn("No validity information, assuming infinite validity.")
 		tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
 		uBound = pgtype.Unbounded
@@ -510,11 +441,14 @@
 		bn.Date_Info,
 		bn.Source,
 	)
-
 	if err != nil {
 		return err
 	}
 	defer bns.Close()
+	// We could check if the materials are also matching -- but per
+	// specification the Date_Info would hvae to change on that kind of
+	// change anyway.  So actualy we ar alreayd checking more in dpth than
+	// required.
 	if bns.Next() {
 		feedback.Info("unchanged")
 		return nil
@@ -524,71 +458,80 @@
 	// it can be used for debugging if something goes wrong...
 	feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
 
-	// Check if an bottleneck with the same identity
-	// (bottleneck_id,validity) already exists:
-	// Check if an bottleneck identical to the one we would insert already
-	// exists:
-	var existing_bn_id *int64
-	err = findMatchingBNStmt.QueryRowContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-	).Scan(&existing_bn_id)
-	switch {
-	case err == sql.ErrNoRows:
-		existing_bn_id = nil
-	case err != nil:
-		// This is unexpected and propably a serious error
-		return err
-	}
-
 	tx, err := conn.BeginTx(ctx, nil)
 	if err != nil {
 		return err
 	}
 	defer tx.Rollback()
 
-	var bnIds []int64
-	if existing_bn_id != nil {
+	// Check if the new bottleneck intersects with the validity of existing
+	// for the same bottleneck_id we consider this an update and mark the
+	// old data for deletion.
+	bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext(
+		ctx, bn.Bottleneck_id, &validity,
+	)
+	if err != nil {
+		return err
+	}
+	defer bns.Close()
+
+	// Mark old intersecting bottleneck data for deletion.  Don't worry about
+	// materials, they will be deleted via cascading.
+	var oldBnIds []int64
+	for bns.Next() {
+		var oldID int64
+		err := bns.Scan(&oldID)
+		if err != nil {
+			return err
+		}
+		oldBnIds = append(oldBnIds, oldID)
+	}
+
+	switch {
+	case len(oldBnIds) == 1:
 		feedback.Info("Bottelneck '%s' "+
-			"with matching validity already exists:"+
+			"with intersecting validity already exists: "+
 			"UPDATING", bn.Bottleneck_id)
-		// Updating existnig BN data:
-		bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx,
-			existing_bn_id,
-			bn.Bottleneck_id,
-			&validity,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-		)
-	} else {
-		// New BN data:
-		bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
-			bn.Bottleneck_id,
-			&validity,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-		)
+	case len(oldBnIds) > 1:
+		// This case is unexpected and should only happen when historic
+		// data in the bottleneck service was changed subsequently...
+		// We handle it gracefully anyway, but warn.
+		feedback.Warn("More than one Bottelneck '%s' "+
+			"with intersecting validity already exists: "+
+			"REPLACING all of them!", bn.Bottleneck_id)
 	}
+
+	for _, oldID := range oldBnIds {
+		// It is possible, that two new bottlenecks intersect with the
+		// same old noe, therefor we have to handle duplicates in
+		// oldBnIds.
+		if !(*seenOldBnIds)[oldID] {
+			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
+				ctx, importID, "waterway.bottlenecks", oldID, true,
+			); err != nil {
+				return err
+			}
+			(*seenOldBnIds)[oldID] = true
+		}
+	}
+
+	var bnIds []int64
+	// Add new BN data:
+	bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
+		bn.Bottleneck_id,
+		&validity,
+		bn.Fk_g_fid,
+		bn.OBJNAM,
+		bn.NOBJNM,
+		bn.From_ISRS, bn.To_ISRS,
+		rb,
+		lb,
+		country,
+		revisitingTime,
+		limiting,
+		bn.Date_Info,
+		bn.Source,
+		tolerance)
 	if err != nil {
 		feedback.Warn(pgxutils.ReadableError{err}.Error())
 		return nil
@@ -611,58 +554,15 @@
 		return nil
 	}
 
-	// Remove obsolete bottleneck version entries
-	var pgBnIds pgtype.Int8Array
+	// Add new materials
+	var (
+		pgBnIds     pgtype.Int8Array
+		pgMaterials pgtype.VarcharArray
+	)
 	pgBnIds.Set(bnIds)
-	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-		&pgBnIds,
-	); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		if err2 := tx.Rollback(); err2 != nil {
-			return err2
-		}
-		return nil
-	}
-
-	// Set end of validity of old version to start of new version
-	// in case of overlap
-	if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
-		bn.Bottleneck_id,
-		validity,
-	); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		if err2 := tx.Rollback(); err2 != nil {
-			return err2
-		}
-		return nil
-	}
+	pgMaterials.Set(materials)
 
 	if materials != nil {
-		// Remove obsolete riverbed materials
-		var pgMaterials pgtype.VarcharArray
-		pgMaterials.Set(materials)
-		mtls, err := tx.StmtContext(ctx,
-			deleteMaterialStmt).QueryContext(ctx,
-			&pgBnIds,
-			&pgMaterials,
-		)
-		if err != nil {
-			return err
-		}
-		defer mtls.Close()
-		for mtls.Next() {
-			var delMat string
-			if err := mtls.Scan(&delMat); err != nil {
-				return err
-			}
-			feedback.Warn("Removed riverbed material %s", delMat)
-		}
-		if err := mtls.Err(); err != nil {
-			return err
-		}
-
 		// Insert riverbed materials
 		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
 			&pgBnIds,
@@ -675,25 +575,18 @@
 	}
 
 	// Only add new BN data to tracking for staging review.
-	//
-	// FIXME: Review for updated bottlenecks is currently not possible, as
-	// the update is done instantly in place.
-	if existing_bn_id == nil {
-		for _, nid := range bnIds {
-			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-				ctx, importID, "waterway.bottlenecks", nid,
-			); err != nil {
-				return err
-			}
+	for _, nid := range bnIds {
+		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
+			ctx, importID, "waterway.bottlenecks", nid, false,
+		); err != nil {
+			return err
 		}
 	}
 
 	if err = tx.Commit(); err != nil {
 		return err
 	}
-	// See above...
-	if existing_bn_id == nil {
-		*nids = append(*nids, bn.Bottleneck_id)
-	}
+
+	*nids = append(*nids, bn.Bottleneck_id)
 	return nil
 }