Mercurial > gemma
changeset 2200:64147a137e0a
Fairway availability import: Run all db ops in same transaction.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 12 Feb 2019 17:12:12 +0100 |
parents | b1735b09df6f |
children | cae0b597aefc |
files | pkg/imports/fa.go |
diffstat | 1 files changed, 35 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/fa.go Tue Feb 12 16:54:48 2019 +0100 +++ b/pkg/imports/fa.go Tue Feb 12 17:12:12 2019 +0100 @@ -17,6 +17,7 @@ "context" "database/sql" "errors" + "fmt" "time" "github.com/jackc/pgx/pgtype" @@ -184,10 +185,10 @@ ResponsibleCountry string } -func loadBottleneckCountries(ctx context.Context, conn *sql.Conn) ([]bottleneckCountry, error) { +func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) ([]bottleneckCountry, error) { // Get available bottlenecks from database for use as filter in SOAP request - rows, err := conn.QueryContext(ctx, listBottlenecksSQL) + rows, err := tx.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } @@ -211,8 +212,8 @@ return bottlenecks, nil } -func loadFairwayAvailabilities(ctx context.Context, conn *sql.Conn) (map[uniqueFairwayAvailability]int64, error) { - rows, err := conn.QueryContext(ctx, listFairwayAvailabilitySQL) +func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) { + rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } @@ -241,9 +242,9 @@ return fairwayAvailabilities, nil } -func latestDate(ctx context.Context, conn *sql.Conn) (pgtype.Timestamp, error) { +func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { var date pgtype.Timestamp - err := conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) + err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) switch { case err == sql.ErrNoRows: date = pgtype.Timestamp{ @@ -264,37 +265,57 @@ feedback Feedback, ) (interface{}, error) { - bottlenecks, err := loadBottleneckCountries(ctx, conn) + start := time.Now() + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + bottlenecks, err := loadBottleneckCountries(ctx, tx) if err != nil { return nil, err } - fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, conn) + fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) if err != nil { return nil, err } - latest, err := latestDate(ctx, conn) + latest, err := latestDate(ctx, tx) if err != nil { return nil, err } - faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, conn, feedback) + faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, tx, feedback) if err != nil { - feedback.Error("Error processing data: %s", err) + return nil, fmt.Errorf("Error processing data: %v", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") - return nil, nil + return nil, UnchangedError("No new fairway availablity data found") } feedback.Info("Processed %d fairway availabilities", len(faids)) + + feedback.Info("Storing fairway availabilities took %s", time.Since(start)) + + if err = tx.Commit(); err == nil { + feedback.Info( + "Importing fairway availabilities successfully took %s", time.Since(start)) + } else { + feedback.Info( + "Importing fairway availabilities failed after %s", time.Since(start)) + return nil, err + } + // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } - return &summary, err + return &summary, nil } func (fa *FairwayAvailability) doForFAs( @@ -302,10 +323,9 @@ bottlenecks []bottleneckCountry, fairwayAvailabilities map[uniqueFairwayAvailability]int64, latestDate pgtype.Timestamp, - conn *sql.Conn, + tx *sql.Tx, feedback Feedback, ) ([]string, error) { - start := time.Now() client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) @@ -337,12 +357,6 @@ result := resp.Get_bottleneck_faResult - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err @@ -475,10 +489,5 @@ feedback.Info("Add %d Reference Values", rvCount) } } - feedback.Info("Storing fairway availabilities took %s", time.Since(start)) - if err = tx.Commit(); err == nil { - feedback.Info("Import of fairway availabilities was successful") - } - return faIDs, nil }