changeset 4119:0cf0008070db request_hist_bns

bottleneck import: Use a global transaction with savepoints.
author Sascha Wilde <wilde@intevation.de>
date Wed, 31 Jul 2019 19:08:35 +0200
parents b9ddc6cdc871
children ad0c373dff6a
files pkg/imports/bn.go
diffstat 1 files changed, 67 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/bn.go	Wed Jul 31 17:05:24 2019 +0200
+++ b/pkg/imports/bn.go	Wed Jul 31 19:08:35 2019 +0200
@@ -284,9 +284,15 @@
 
 	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
 
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
 	for _, bn := range bns {
 		if err := storeBottleneck(
-			ctx, importID, conn, feedback, bn,
+			tx, ctx, importID, feedback, bn,
 			&nids, &seenOldBnIds, tolerance,
 			insertStmt,
 			findExactMatchingBNStmt,
@@ -296,6 +302,10 @@
 			return nil, err
 		}
 	}
+	if err = tx.Commit(); err != nil {
+		return nil, err
+	}
+
 	if len(nids) == 0 {
 		return nil, UnchangedError("No new bottlenecks inserted")
 	}
@@ -311,9 +321,9 @@
 }
 
 func storeBottleneck(
+	tx *sql.Tx,
 	ctx context.Context,
 	importID int64,
-	conn *sql.Conn,
 	feedback Feedback,
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
@@ -426,7 +436,8 @@
 
 	// Check if an bottleneck identical to the one we would insert already
 	// exists:
-	bns, err := findExactMatchingBNStmt.QueryContext(ctx,
+	bns, err := tx.StmtContext(ctx, findExactMatchingBNStmt).QueryContext(
+		ctx,
 		bn.Bottleneck_id,
 		&validity,
 		bn.Fk_g_fid,
@@ -458,12 +469,6 @@
 	// it can be used for debugging if something goes wrong...
 	feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
 
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return err
-	}
-	defer tx.Rollback()
-
 	// Check if the new bottleneck intersects with the validity of existing
 	// for the same bottleneck_id we consider this an update and mark the
 	// old data for deletion.
@@ -517,62 +522,63 @@
 
 	var bnIds []int64
 	// Add new BN data:
-	bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-		bn.Fk_g_fid,
-		bn.OBJNAM,
-		bn.NOBJNM,
-		bn.From_ISRS, bn.To_ISRS,
-		rb,
-		lb,
-		country,
-		revisitingTime,
-		limiting,
-		bn.Date_Info,
-		bn.Source,
-		tolerance)
+	savepoint := Savepoint(ctx, tx, "insert_bottlenck")
+
+	err = savepoint(func() error {
+		bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
+			bn.Bottleneck_id,
+			&validity,
+			bn.Fk_g_fid,
+			bn.OBJNAM,
+			bn.NOBJNM,
+			bn.From_ISRS, bn.To_ISRS,
+			rb,
+			lb,
+			country,
+			revisitingTime,
+			limiting,
+			bn.Date_Info,
+			bn.Source,
+			tolerance)
+		if err != nil {
+			return err
+		}
+		defer bns.Close()
+		for bns.Next() {
+			var nid int64
+			if err := bns.Scan(&nid); err != nil {
+				return err
+			}
+			bnIds = append(bnIds, nid)
+		}
+		if err := bns.Err(); err != nil {
+			return err
+		}
+
+		// Add new materials
+		if len(bnIds) > 0 && materials != nil {
+			var (
+				pgBnIds     pgtype.Int8Array
+				pgMaterials pgtype.VarcharArray
+			)
+			pgBnIds.Set(bnIds)
+			pgMaterials.Set(materials)
+
+			// Insert riverbed materials
+			if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
+
+				&pgBnIds,
+				&pgMaterials,
+			); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
 	if err != nil {
 		feedback.Warn(pgxutils.ReadableError{err}.Error())
 		return nil
 	}
-	defer bns.Close()
-	for bns.Next() {
-		var nid int64
-		if err := bns.Scan(&nid); err != nil {
-			return err
-		}
-		bnIds = append(bnIds, nid)
-	}
-	if err := bns.Err(); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		return nil
-	}
-	if len(bnIds) == 0 {
-		feedback.Warn(
-			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
-		return nil
-	}
-
-	// Add new materials
-	var (
-		pgBnIds     pgtype.Int8Array
-		pgMaterials pgtype.VarcharArray
-	)
-	pgBnIds.Set(bnIds)
-	pgMaterials.Set(materials)
-
-	if materials != nil {
-		// Insert riverbed materials
-		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
-			&pgBnIds,
-			&pgMaterials,
-		); err != nil {
-			feedback.Warn("Failed to insert riverbed materials")
-			feedback.Warn(pgxutils.ReadableError{err}.Error())
-			return nil
-		}
-	}
 
 	// Only add new BN data to tracking for staging review.
 	for _, nid := range bnIds {
@@ -583,10 +589,6 @@
 		}
 	}
 
-	if err = tx.Commit(); err != nil {
-		return err
-	}
-
 	*nids = append(*nids, bn.Bottleneck_id)
 	return nil
 }