changeset 3220:56b297592c0a

Handle failing INSERTs gracefully during approved gauge measurements import
author Tom Gottfried <tom@intevation.de>
date Thu, 09 May 2019 14:21:50 +0200
parents 4acbee65275d
children 899914a18d7e
files pkg/imports/agm.go
diffstat 1 files changed, 21 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Thu May 09 12:49:53 2019 +0200
+++ b/pkg/imports/agm.go	Thu May 09 14:21:50 2019 +0200
@@ -291,31 +291,25 @@
 		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
 	}
 
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
-	gaugeCheckStmt, err := tx.PrepareContext(ctx, agmGaugeCheckSQL)
+	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer gaugeCheckStmt.Close()
 
-	selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL)
+	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer selectStmt.Close()
 
-	insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL)
+	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer insertStmt.Close()
 
-	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
+	trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
 	if err != nil {
 		return nil, err
 	}
@@ -454,9 +448,15 @@
 
 		newSourceOrganization := newSender
 
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+		defer tx.Rollback()
+
 		var newID int64
 
-		if err := insertStmt.QueryRowContext(
+		if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
 			ctx,
 			gid.CountryCode,
 			gid.LoCode,
@@ -476,13 +476,21 @@
 			newDateInfo,
 			newSourceOrganization,
 		).Scan(&newID); err != nil {
-			return nil, err
+			feedback.Warn(handleError(err).Error())
+			if err := tx.Rollback(); err != nil {
+				return nil, err
+			}
+			ignored++
+			continue
 		}
-		if _, err := trackStmt.ExecContext(
+		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
 			ctx, importID, "waterway.gauge_measurements", newID,
 		); err != nil {
 			return nil, err
 		}
+		if err := tx.Commit(); err != nil {
+			return nil, fmt.Errorf("Commit failed: %v", err)
+		}
 
 		n := newAGMLine(
 			newCountryCode,
@@ -528,10 +536,6 @@
 		entries = append(entries, ase)
 	}
 
-	if err := tx.Commit(); err != nil {
-		return nil, fmt.Errorf("Commit failed: %v", err)
-	}
-
 	feedback.Info("Importing approved gauge measurements took %s",
 		time.Since(start))