comparison pkg/imports/agm.go @ 3220:56b297592c0a

Handle failing INSERTs gracefully during approved gauge measurements import
author Tom Gottfried <tom@intevation.de>
date Thu, 09 May 2019 14:21:50 +0200
parents 4acbee65275d
children 232fc90e6ee2
comparison
equal deleted inserted replaced
3219:4acbee65275d 3220:56b297592c0a
289 } 289 }
290 if len(missing) > 0 { 290 if len(missing) > 0 {
291 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) 291 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
292 } 292 }
293 293
294 tx, err := conn.BeginTx(ctx, nil) 294 gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
295 if err != nil { 295 if err != nil {
296 return nil, err 296 return nil, err
297 } 297 }
298 defer tx.Rollback() 298 defer gaugeCheckStmt.Close()
299 299
300 gaugeCheckStmt, err := tx.PrepareContext(ctx, agmGaugeCheckSQL) 300 selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
301 if err != nil { 301 if err != nil {
302 return nil, err 302 return nil, err
303 } 303 }
304 defer gaugeCheckStmt.Close() 304 defer selectStmt.Close()
305 305
306 selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) 306 insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
307 if err != nil { 307 if err != nil {
308 return nil, err 308 return nil, err
309 } 309 }
310 defer selectStmt.Close()
311
312 insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL)
313 if err != nil {
314 return nil, err
315 }
316 defer insertStmt.Close() 310 defer insertStmt.Close()
317 311
318 trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) 312 trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
319 if err != nil { 313 if err != nil {
320 return nil, err 314 return nil, err
321 } 315 }
322 defer trackStmt.Close() 316 defer trackStmt.Close()
323 317
452 446
453 newDateInfo := newDateIssue 447 newDateInfo := newDateIssue
454 448
455 newSourceOrganization := newSender 449 newSourceOrganization := newSender
456 450
451 tx, err := conn.BeginTx(ctx, nil)
452 if err != nil {
453 return nil, err
454 }
455 defer tx.Rollback()
456
457 var newID int64 457 var newID int64
458 458
459 if err := insertStmt.QueryRowContext( 459 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
460 ctx, 460 ctx,
461 gid.CountryCode, 461 gid.CountryCode,
462 gid.LoCode, 462 gid.LoCode,
463 gid.FairwaySection, 463 gid.FairwaySection,
464 gid.Orc, 464 gid.Orc,
474 newValueMin, 474 newValueMin,
475 newValueMax, 475 newValueMax,
476 newDateInfo, 476 newDateInfo,
477 newSourceOrganization, 477 newSourceOrganization,
478 ).Scan(&newID); err != nil { 478 ).Scan(&newID); err != nil {
479 return nil, err 479 feedback.Warn(handleError(err).Error())
480 } 480 if err := tx.Rollback(); err != nil {
481 if _, err := trackStmt.ExecContext( 481 return nil, err
482 }
483 ignored++
484 continue
485 }
486 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
482 ctx, importID, "waterway.gauge_measurements", newID, 487 ctx, importID, "waterway.gauge_measurements", newID,
483 ); err != nil { 488 ); err != nil {
484 return nil, err 489 return nil, err
490 }
491 if err := tx.Commit(); err != nil {
492 return nil, fmt.Errorf("Commit failed: %v", err)
485 } 493 }
486 494
487 n := newAGMLine( 495 n := newAGMLine(
488 newCountryCode, 496 newCountryCode,
489 newSender, 497 newSender,
526 ase.Versions = []*agmLine{o, n} 534 ase.Versions = []*agmLine{o, n}
527 } 535 }
528 entries = append(entries, ase) 536 entries = append(entries, ase)
529 } 537 }
530 538
531 if err := tx.Commit(); err != nil {
532 return nil, fmt.Errorf("Commit failed: %v", err)
533 }
534
535 feedback.Info("Importing approved gauge measurements took %s", 539 feedback.Info("Importing approved gauge measurements took %s",
536 time.Since(start)) 540 time.Since(start))
537 541
538 return entries, nil 542 return entries, nil
539 } 543 }