changeset 2200:64147a137e0a

Fairway availability import: Run all db ops in same transaction.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 12 Feb 2019 17:12:12 +0100
parents b1735b09df6f
children cae0b597aefc
files pkg/imports/fa.go
diffstat 1 files changed, 35 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fa.go	Tue Feb 12 16:54:48 2019 +0100
+++ b/pkg/imports/fa.go	Tue Feb 12 17:12:12 2019 +0100
@@ -17,6 +17,7 @@
 	"context"
 	"database/sql"
 	"errors"
+	"fmt"
 	"time"
 
 	"github.com/jackc/pgx/pgtype"
@@ -184,10 +185,10 @@
 	ResponsibleCountry string
 }
 
-func loadBottleneckCountries(ctx context.Context, conn *sql.Conn) ([]bottleneckCountry, error) {
+func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) ([]bottleneckCountry, error) {
 
 	// Get available bottlenecks from database for use as filter in SOAP request
-	rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
+	rows, err := tx.QueryContext(ctx, listBottlenecksSQL)
 	if err != nil {
 		return nil, err
 	}
@@ -211,8 +212,8 @@
 	return bottlenecks, nil
 }
 
-func loadFairwayAvailabilities(ctx context.Context, conn *sql.Conn) (map[uniqueFairwayAvailability]int64, error) {
-	rows, err := conn.QueryContext(ctx, listFairwayAvailabilitySQL)
+func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) {
+	rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL)
 	if err != nil {
 		return nil, err
 	}
@@ -241,9 +242,9 @@
 	return fairwayAvailabilities, nil
 }
 
-func latestDate(ctx context.Context, conn *sql.Conn) (pgtype.Timestamp, error) {
+func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) {
 	var date pgtype.Timestamp
-	err := conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
+	err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
 	switch {
 	case err == sql.ErrNoRows:
 		date = pgtype.Timestamp{
@@ -264,37 +265,57 @@
 	feedback Feedback,
 ) (interface{}, error) {
 
-	bottlenecks, err := loadBottleneckCountries(ctx, conn)
+	start := time.Now()
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	bottlenecks, err := loadBottleneckCountries(ctx, tx)
 	if err != nil {
 		return nil, err
 	}
 
-	fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, conn)
+	fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx)
 	if err != nil {
 		return nil, err
 	}
 
-	latest, err := latestDate(ctx, conn)
+	latest, err := latestDate(ctx, tx)
 	if err != nil {
 		return nil, err
 	}
 
-	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, conn, feedback)
+	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, tx, feedback)
 	if err != nil {
-		feedback.Error("Error processing data: %s", err)
+		return nil, fmt.Errorf("Error processing data: %v", err)
 	}
 	if len(faids) == 0 {
 		feedback.Info("No new fairway availablity data found")
-		return nil, nil
+		return nil, UnchangedError("No new fairway availablity data found")
 	}
 	feedback.Info("Processed %d fairway availabilities", len(faids))
+
+	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info(
+			"Importing fairway availabilities successfully took %s", time.Since(start))
+	} else {
+		feedback.Info(
+			"Importing fairway availabilities failed after %s", time.Since(start))
+		return nil, err
+	}
+
 	// TODO: needs to be filled more useful.
 	summary := struct {
 		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
 	}{
 		FairwayAvailabilities: faids,
 	}
-	return &summary, err
+	return &summary, nil
 }
 
 func (fa *FairwayAvailability) doForFAs(
@@ -302,10 +323,9 @@
 	bottlenecks []bottleneckCountry,
 	fairwayAvailabilities map[uniqueFairwayAvailability]int64,
 	latestDate pgtype.Timestamp,
-	conn *sql.Conn,
+	tx *sql.Tx,
 	feedback Feedback,
 ) ([]string, error) {
-	start := time.Now()
 
 	client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)
 
@@ -337,12 +357,6 @@
 
 	result := resp.Get_bottleneck_faResult
 
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
 	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
 	if err != nil {
 		return nil, err
@@ -475,10 +489,5 @@
 			feedback.Info("Add %d Reference Values", rvCount)
 		}
 	}
-	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Import of fairway availabilities was successful")
-	}
-
 	return faIDs, nil
 }