changeset 2233:137addc77b1b

Gauge measurement imports: Do all database ops in one transaction.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 13 Feb 2019 12:39:56 +0100
parents 7936b46a88d4
children 9b2f8e94671e
files pkg/imports/fa.go pkg/imports/gm.go
diffstat 2 files changed, 61 insertions(+), 48 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fa.go	Wed Feb 13 11:44:27 2019 +0100
+++ b/pkg/imports/fa.go	Wed Feb 13 12:39:56 2019 +0100
@@ -300,14 +300,13 @@
 	}
 	feedback.Info("Processed %d fairway availabilities", len(faids))
 
-	if err = tx.Commit(); err == nil {
-		feedback.Info(
-			"Importing fairway availabilities successfully took %s", time.Since(start))
-	} else {
+	if err = tx.Commit(); err != nil {
 		feedback.Info(
 			"Importing fairway availabilities failed after %s", time.Since(start))
 		return nil, err
 	}
+	feedback.Info(
+		"Importing fairway availabilities successfully took %s", time.Since(start))
 
 	// TODO: needs to be filled more useful.
 	summary := struct {
--- a/pkg/imports/gm.go	Wed Feb 13 11:44:27 2019 +0100
+++ b/pkg/imports/gm.go	Wed Feb 13 12:39:56 2019 +0100
@@ -34,10 +34,10 @@
 	Insecure bool `json:"insecure"`
 }
 
-// gaugeMeasurement holds information about a gauge and the latest measurement
-type gaugeMeasurement struct {
-	Gauge           models.Isrs
-	LatestDateIssue time.Time
+// gauge holds information about a gauge and the latest measurement
+type gauge struct {
+	location models.Isrs
+	latest   time.Time
 }
 
 // GMJobKind is the import queue type identifier.
@@ -137,6 +137,37 @@
 // CleanUp of a gauge measurement import is a NOP.
 func (*GaugeMeasurement) CleanUp() error { return nil }
 
+func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) {
+
+	rows, err := tx.QueryContext(ctx, listGaugesSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	var gauges []*gauge
+
+	for rows.Next() {
+		var g gauge
+		if err = rows.Scan(
+			&g.location.CountryCode,
+			&g.location.LoCode,
+			&g.location.FairwaySection,
+			&g.location.Orc,
+			&g.location.Hectometre,
+		); err != nil {
+			return nil, err
+		}
+		gauges = append(gauges, &g)
+	}
+
+	if err = rows.Err(); err != nil {
+		return nil, err
+	}
+
+	return gauges, nil
+}
+
 // Do executes the actual bottleneck import.
 func (gm *GaugeMeasurement) Do(
 	ctx context.Context,
@@ -145,45 +176,40 @@
 	feedback Feedback,
 ) (interface{}, error) {
 
-	// Get available gauges from database for use as filter in SOAP request
-	var rows *sql.Rows
+	start := time.Now()
 
-	rows, err := conn.QueryContext(ctx, listGaugesSQL)
+	tx, err := conn.BeginTx(ctx, nil)
 	if err != nil {
 		return nil, err
 	}
-	defer rows.Close()
-
-	gauges := []gaugeMeasurement{}
+	defer tx.Rollback()
 
-	for rows.Next() {
-		var g gaugeMeasurement
-		if err = rows.Scan(
-			&g.Gauge.CountryCode,
-			&g.Gauge.LoCode,
-			&g.Gauge.FairwaySection,
-			&g.Gauge.Orc,
-			&g.Gauge.Hectometre,
-		); err != nil {
-			return nil, err
-		}
-		gauges = append(gauges, g)
-	}
-
-	if err = rows.Err(); err != nil {
+	// Get available gauges from database for use as filter in SOAP request
+	gauges, err := loadGauges(ctx, tx)
+	if err != nil {
 		return nil, err
 	}
 
 	// TODO get date_issue for selected gauges
-	gids, err := gm.doForGM(ctx, gauges, conn, feedback)
+	gids, err := gm.doForGM(ctx, gauges, tx, feedback)
 	if err != nil {
 		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
 		return nil, err
 	}
+
 	if len(gids) == 0 {
 		feedback.Info("No new gauge measurements found")
-		return nil, nil
+		return nil, UnchangedError("No new gauge measurements found")
 	}
+
+	if err = tx.Commit(); err != nil {
+		feedback.Info(
+			"Importing gauge measurements failed after %s", time.Since(start))
+		return nil, err
+	}
+	feedback.Info(
+		"Importing gauge measurements successfully took %s", time.Since(start))
+
 	// TODO: needs to be filled more useful.
 	summary := struct {
 		GaugeMeasuremets []string `json:"gaugeMeasurements"`
@@ -221,27 +247,20 @@
 
 func (gm *GaugeMeasurement) doForGM(
 	ctx context.Context,
-	gauges []gaugeMeasurement,
-	conn *sql.Conn,
+	gauges []*gauge,
+	tx *sql.Tx,
 	feedback Feedback,
 ) ([]string, error) {
-	start := time.Now()
 
 	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
 
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
 	mt := nts.Message_type_typeWRM
 
 	var gids []string
 	for _, g := range gauges {
 
 		var idPair []*nts.Id_pair
-		isrs := g.Gauge.String()
+		isrs := g.location.String()
 		isrsID := nts.Isrs_code_type(isrs)
 		idPair = append(idPair, &nts.Id_pair{
 			Id: &isrsID,
@@ -261,7 +280,7 @@
 		if resp.Result_message == nil {
 			for _, e := range resp.Result_error {
 				if e != nil {
-					feedback.Error("No gauge measurements found for %s", g.Gauge.String())
+					feedback.Error("No gauge measurements found for %s", g.location.String())
 				} else {
 					feedback.Error("unknown")
 				}
@@ -278,7 +297,7 @@
 
 		for _, msg := range result {
 			var gid int64
-			feedback.Info("Found measurements for %s", g.Gauge.String())
+			feedback.Info("Found measurements for %s", g.location.String())
 			for _, wrm := range msg.Wrm {
 				currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
 				if err != nil {
@@ -337,10 +356,5 @@
 			}
 		}
 	}
-	feedback.Info("Storing gauge measurements took %s", time.Since(start))
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Import of gauge measurements was successful")
-	}
-
 	return gids, nil
 }