comparison pkg/imports/agm.go @ 3502:45483dd0d801

AGM import: Don't pile up defer calls for every line in CSV file.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 28 May 2019 11:33:57 +0200
parents 45a629a3a8b8
children 6e748f31777a
comparison
equal deleted inserted replaced
3501:c5c7cc24fe72 3502:45483dd0d801
18 "bufio" 18 "bufio"
19 "context" 19 "context"
20 "database/sql" 20 "database/sql"
21 "encoding/csv" 21 "encoding/csv"
22 "encoding/json" 22 "encoding/json"
23 "errors"
23 "fmt" 24 "fmt"
24 "io" 25 "io"
25 "math" 26 "math"
26 "os" 27 "os"
27 "path/filepath" 28 "path/filepath"
203 SELECT 1 FROM waterway.gauges 204 SELECT 1 FROM waterway.gauges
204 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) 205 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
205 ` 206 `
206 ) 207 )
207 208
209 var errContinue = errors.New("continue")
210
208 // Do executes the actual approved gauge measurements import. 211 // Do executes the actual approved gauge measurements import.
209 func (agm *ApprovedGaugeMeasurements) Do( 212 func (agm *ApprovedGaugeMeasurements) Do(
210 ctx context.Context, 213 ctx context.Context,
211 importID int64, 214 importID int64,
212 conn *sql.Conn, 215 conn *sql.Conn,
410 413
411 newDateInfo := newDateIssue 414 newDateInfo := newDateIssue
412 415
413 newSourceOrganization := newSender 416 newSourceOrganization := newSender
414 417
415 tx, err := conn.BeginTx(ctx, nil) 418 switch err := func() error {
416 if err != nil { 419 tx, err := conn.BeginTx(ctx, nil)
420 if err != nil {
421 return err
422 }
423 defer tx.Rollback()
424
425 var newID int64
426
427 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
428 ctx,
429 gid.CountryCode,
430 gid.LoCode,
431 gid.FairwaySection,
432 gid.Orc,
433 gid.Hectometre,
434 md,
435 newCountryCode,
436 newSender,
437 newLanguageCode,
438 newDateIssue,
439 newReferenceCode,
440 newValue,
441 newDateInfo,
442 newSourceOrganization,
443 ).Scan(&newID); err != nil {
444 feedback.Warn(handleError(err).Error())
445 ignored++
446 return errContinue
447 }
448
449 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
450 ctx, importID, "waterway.gauge_measurements", newID,
451 ); err != nil {
452 return err
453 }
454
455 if err = tx.Commit(); err != nil {
456 err = fmt.Errorf("Commit failed: %v", err)
457 }
458 return err
459 }(); {
460 case err == errContinue:
461 continue lines
462 case err != nil:
417 return nil, err 463 return nil, err
418 }
419 defer tx.Rollback()
420
421 var newID int64
422
423 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
424 ctx,
425 gid.CountryCode,
426 gid.LoCode,
427 gid.FairwaySection,
428 gid.Orc,
429 gid.Hectometre,
430 md,
431 newCountryCode,
432 newSender,
433 newLanguageCode,
434 newDateIssue,
435 newReferenceCode,
436 newValue,
437 newDateInfo,
438 newSourceOrganization,
439 ).Scan(&newID); err != nil {
440 feedback.Warn(handleError(err).Error())
441 if err := tx.Rollback(); err != nil {
442 return nil, err
443 }
444 ignored++
445 continue
446 }
447 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
448 ctx, importID, "waterway.gauge_measurements", newID,
449 ); err != nil {
450 return nil, err
451 }
452 if err := tx.Commit(); err != nil {
453 return nil, fmt.Errorf("Commit failed: %v", err)
454 } 464 }
455 465
456 n := newAGMLine( 466 n := newAGMLine(
457 newCountryCode, 467 newCountryCode,
458 newSender, 468 newSender,