# HG changeset patch # User Tom Gottfried # Date 1557404510 -7200 # Node ID 56b297592c0a178037be784690c036b422950b74 # Parent 4acbee65275ddc03139068c9836dd8c221f4ab7b Handle failing INSERTs gracefully during approved gauge measurements import diff -r 4acbee65275d -r 56b297592c0a pkg/imports/agm.go --- a/pkg/imports/agm.go Thu May 09 12:49:53 2019 +0200 +++ b/pkg/imports/agm.go Thu May 09 14:21:50 2019 +0200 @@ -291,31 +291,25 @@ return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - - gaugeCheckStmt, err := tx.PrepareContext(ctx, agmGaugeCheckSQL) + gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL) if err != nil { return nil, err } defer gaugeCheckStmt.Close() - selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) + selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL) if err != nil { return nil, err } defer selectStmt.Close() - insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL) + insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } defer insertStmt.Close() - trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) + trackStmt, err := conn.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } @@ -454,9 +448,15 @@ newSourceOrganization := newSender + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + var newID int64 - if err := insertStmt.QueryRowContext( + if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( ctx, gid.CountryCode, gid.LoCode, @@ -476,13 +476,21 @@ newDateInfo, newSourceOrganization, ).Scan(&newID); err != nil { - return nil, err + feedback.Warn(handleError(err).Error()) + if err := tx.Rollback(); err != nil { + return nil, err + } + ignored++ + continue } - if _, err := trackStmt.ExecContext( + if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( ctx, importID, "waterway.gauge_measurements", newID, ); err != nil { return nil, err } + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("Commit failed: %v", err) + } n := newAGMLine( newCountryCode, @@ -528,10 +536,6 @@ entries = append(entries, ase) } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("Commit failed: %v", err) - } - feedback.Info("Importing approved gauge measurements took %s", time.Since(start))