Mercurial > gemma
comparison pkg/imports/agm.go @ 3220:56b297592c0a
Handle failing INSERTs gracefully during approved gauge measurements import
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 09 May 2019 14:21:50 +0200 |
parents | 4acbee65275d |
children | 232fc90e6ee2 |
comparison
equal
deleted
inserted
replaced
3219:4acbee65275d | 3220:56b297592c0a |
---|---|
289 } | 289 } |
290 if len(missing) > 0 { | 290 if len(missing) > 0 { |
291 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) | 291 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) |
292 } | 292 } |
293 | 293 |
294 tx, err := conn.BeginTx(ctx, nil) | 294 gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL) |
295 if err != nil { | 295 if err != nil { |
296 return nil, err | 296 return nil, err |
297 } | 297 } |
298 defer tx.Rollback() | 298 defer gaugeCheckStmt.Close() |
299 | 299 |
300 gaugeCheckStmt, err := tx.PrepareContext(ctx, agmGaugeCheckSQL) | 300 selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL) |
301 if err != nil { | 301 if err != nil { |
302 return nil, err | 302 return nil, err |
303 } | 303 } |
304 defer gaugeCheckStmt.Close() | 304 defer selectStmt.Close() |
305 | 305 |
306 selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) | 306 insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL) |
307 if err != nil { | 307 if err != nil { |
308 return nil, err | 308 return nil, err |
309 } | 309 } |
310 defer selectStmt.Close() | |
311 | |
312 insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL) | |
313 if err != nil { | |
314 return nil, err | |
315 } | |
316 defer insertStmt.Close() | 310 defer insertStmt.Close() |
317 | 311 |
318 trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) | 312 trackStmt, err := conn.PrepareContext(ctx, trackImportSQL) |
319 if err != nil { | 313 if err != nil { |
320 return nil, err | 314 return nil, err |
321 } | 315 } |
322 defer trackStmt.Close() | 316 defer trackStmt.Close() |
323 | 317 |
452 | 446 |
453 newDateInfo := newDateIssue | 447 newDateInfo := newDateIssue |
454 | 448 |
455 newSourceOrganization := newSender | 449 newSourceOrganization := newSender |
456 | 450 |
451 tx, err := conn.BeginTx(ctx, nil) | |
452 if err != nil { | |
453 return nil, err | |
454 } | |
455 defer tx.Rollback() | |
456 | |
457 var newID int64 | 457 var newID int64 |
458 | 458 |
459 if err := insertStmt.QueryRowContext( | 459 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( |
460 ctx, | 460 ctx, |
461 gid.CountryCode, | 461 gid.CountryCode, |
462 gid.LoCode, | 462 gid.LoCode, |
463 gid.FairwaySection, | 463 gid.FairwaySection, |
464 gid.Orc, | 464 gid.Orc, |
474 newValueMin, | 474 newValueMin, |
475 newValueMax, | 475 newValueMax, |
476 newDateInfo, | 476 newDateInfo, |
477 newSourceOrganization, | 477 newSourceOrganization, |
478 ).Scan(&newID); err != nil { | 478 ).Scan(&newID); err != nil { |
479 return nil, err | 479 feedback.Warn(handleError(err).Error()) |
480 } | 480 if err := tx.Rollback(); err != nil { |
481 if _, err := trackStmt.ExecContext( | 481 return nil, err |
482 } | |
483 ignored++ | |
484 continue | |
485 } | |
486 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | |
482 ctx, importID, "waterway.gauge_measurements", newID, | 487 ctx, importID, "waterway.gauge_measurements", newID, |
483 ); err != nil { | 488 ); err != nil { |
484 return nil, err | 489 return nil, err |
490 } | |
491 if err := tx.Commit(); err != nil { | |
492 return nil, fmt.Errorf("Commit failed: %v", err) | |
485 } | 493 } |
486 | 494 |
487 n := newAGMLine( | 495 n := newAGMLine( |
488 newCountryCode, | 496 newCountryCode, |
489 newSender, | 497 newSender, |
526 ase.Versions = []*agmLine{o, n} | 534 ase.Versions = []*agmLine{o, n} |
527 } | 535 } |
528 entries = append(entries, ase) | 536 entries = append(entries, ase) |
529 } | 537 } |
530 | 538 |
531 if err := tx.Commit(); err != nil { | |
532 return nil, fmt.Errorf("Commit failed: %v", err) | |
533 } | |
534 | |
535 feedback.Info("Importing approved gauge measurements took %s", | 539 feedback.Info("Importing approved gauge measurements took %s", |
536 time.Since(start)) | 540 time.Since(start)) |
537 | 541 |
538 return entries, nil | 542 return entries, nil |
539 } | 543 } |