# HG changeset patch # User Sascha Wilde # Date 1564592915 -7200 # Node ID 0cf0008070db73010cbdfb53ee4d531ea3deb793 # Parent b9ddc6cdc8716a1e0cabe7a70fafcc410f597bda bottleneck import: Use a global transaction with savepoints. diff -r b9ddc6cdc871 -r 0cf0008070db pkg/imports/bn.go --- a/pkg/imports/bn.go Wed Jul 31 17:05:24 2019 +0200 +++ b/pkg/imports/bn.go Wed Jul 31 19:08:35 2019 +0200 @@ -284,9 +284,15 @@ feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + for _, bn := range bns { if err := storeBottleneck( - ctx, importID, conn, feedback, bn, + tx, ctx, importID, feedback, bn, &nids, &seenOldBnIds, tolerance, insertStmt, findExactMatchingBNStmt, @@ -296,6 +302,10 @@ return nil, err } } + if err = tx.Commit(); err != nil { + return nil, err + } + if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } @@ -311,9 +321,9 @@ } func storeBottleneck( + tx *sql.Tx, ctx context.Context, importID int64, - conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, @@ -426,7 +436,8 @@ // Check if an bottleneck identical to the one we would insert already // exists: - bns, err := findExactMatchingBNStmt.QueryContext(ctx, + bns, err := tx.StmtContext(ctx, findExactMatchingBNStmt).QueryContext( + ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, @@ -458,12 +469,6 @@ // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer tx.Rollback() - // Check if the new bottleneck intersects with the validity of existing // for the same bottleneck_id we consider this an update and mark the // old data for deletion. @@ -517,62 +522,63 @@ var bnIds []int64 // Add new BN data: - bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance) + savepoint := Savepoint(ctx, tx, "insert_bottlenck") + + err = savepoint(func() error { + bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, + bn.Bottleneck_id, + &validity, + bn.Fk_g_fid, + bn.OBJNAM, + bn.NOBJNM, + bn.From_ISRS, bn.To_ISRS, + rb, + lb, + country, + revisitingTime, + limiting, + bn.Date_Info, + bn.Source, + tolerance) + if err != nil { + return err + } + defer bns.Close() + for bns.Next() { + var nid int64 + if err := bns.Scan(&nid); err != nil { + return err + } + bnIds = append(bnIds, nid) + } + if err := bns.Err(); err != nil { + return err + } + + // Add new materials + if len(bnIds) > 0 && materials != nil { + var ( + pgBnIds pgtype.Int8Array + pgMaterials pgtype.VarcharArray + ) + pgBnIds.Set(bnIds) + pgMaterials.Set(materials) + + // Insert riverbed materials + if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, + + &pgBnIds, + &pgMaterials, + ); err != nil { + return err + } + } + return nil + }) if err != nil { feedback.Warn(pgxutils.ReadableError{err}.Error()) return nil } - defer bns.Close() - for bns.Next() { - var nid int64 - if err := bns.Scan(&nid); err != nil { - return err - } - bnIds = append(bnIds, nid) - } - if err := bns.Err(); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - if len(bnIds) == 0 { - feedback.Warn( - "No gauge matching '%s' or given time available", bn.Fk_g_fid) - return nil - } - - // Add new materials - var ( - pgBnIds pgtype.Int8Array - pgMaterials pgtype.VarcharArray - ) - pgBnIds.Set(bnIds) - pgMaterials.Set(materials) - - if materials != nil { - // Insert riverbed materials - if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, - &pgBnIds, - &pgMaterials, - ); err != nil { - feedback.Warn("Failed to insert riverbed materials") - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - } // Only add new BN data to tracking for staging review. for _, nid := range bnIds { @@ -583,10 +589,6 @@ } } - if err = tx.Commit(); err != nil { - return err - } - *nids = append(*nids, bn.Bottleneck_id) return nil }