changeset 2242:786c3fb7efe1

Gauge measurements import: Re-factored to be re-usable for upcoming uploaded gauge measurements.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 13 Feb 2019 15:38:24 +0100
parents 5529e1f08dba
children 783c1454ab47
files pkg/imports/gm.go
diffstat 1 files changed, 133 insertions(+), 100 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/gm.go	Wed Feb 13 15:16:12 2019 +0100
+++ b/pkg/imports/gm.go	Wed Feb 13 15:38:24 2019 +0100
@@ -17,6 +17,7 @@
 	"context"
 	"database/sql"
 	"fmt"
+	"sort"
 	"strings"
 	"time"
 
@@ -137,7 +138,51 @@
 // CleanUp of a gauge measurement import is a NOP.
 func (*GaugeMeasurement) CleanUp() error { return nil }
 
-func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) {
+// Do executes the actual bottleneck import.
+func (gm *GaugeMeasurement) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	fetch := func() ([]*nts.RIS_Message_Type, error) {
+		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
+
+		mt := nts.Message_type_typeWRM
+
+		req := &nts.Get_messages_query{
+			Message_type: &mt,
+		}
+
+		resp, err := client.Get_messages(req)
+		if err != nil {
+			return nil, err
+		}
+
+		result := resp.Result_message
+		if result == nil {
+			for _, e := range resp.Result_error {
+				if e != nil {
+					feedback.Error("Error code: %s", *e)
+				} else {
+					feedback.Error("Unknown error")
+				}
+			}
+		}
+		return result, nil
+	}
+
+	return storeGaugeMeasurements(
+		ctx,
+		importID,
+		fetch,
+		conn,
+		feedback,
+	)
+}
+
+func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) {
 
 	rows, err := tx.QueryContext(ctx, listGaugesSQL)
 	if err != nil {
@@ -145,33 +190,35 @@
 	}
 	defer rows.Close()
 
-	var gauges []*gauge
+	var gauges []string
 
 	for rows.Next() {
-		var g gauge
+		var g models.Isrs
 		if err = rows.Scan(
-			&g.location.CountryCode,
-			&g.location.LoCode,
-			&g.location.FairwaySection,
-			&g.location.Orc,
-			&g.location.Hectometre,
+			&g.CountryCode,
+			&g.LoCode,
+			&g.FairwaySection,
+			&g.Orc,
+			&g.Hectometre,
 		); err != nil {
 			return nil, err
 		}
-		gauges = append(gauges, &g)
+		gauges = append(gauges, g.String())
 	}
 
 	if err = rows.Err(); err != nil {
 		return nil, err
 	}
 
+	sort.Strings(gauges)
+
 	return gauges, nil
 }
 
-// Do executes the actual bottleneck import.
-func (gm *GaugeMeasurement) Do(
+func storeGaugeMeasurements(
 	ctx context.Context,
 	importID int64,
+	fetch func() ([]*nts.RIS_Message_Type, error),
 	conn *sql.Conn,
 	feedback Feedback,
 ) (interface{}, error) {
@@ -191,7 +238,7 @@
 	}
 
 	// TODO get date_issue for selected gauges
-	gids, err := gm.doForGM(ctx, gauges, tx, feedback)
+	gids, err := doForGM(ctx, gauges, fetch, tx, feedback)
 	if err != nil {
 		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
 		return nil, err
@@ -245,110 +292,96 @@
 	return fn, nil
 }
 
-func (gm *GaugeMeasurement) doForGM(
+func doForGM(
 	ctx context.Context,
-	gauges []*gauge,
+	gauges []string,
+	fetch func() ([]*nts.RIS_Message_Type, error),
 	tx *sql.Tx,
 	feedback Feedback,
 ) ([]string, error) {
 
-	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
-
-	mt := nts.Message_type_typeWRM
-
 	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer insertStmt.Close()
 
-	var gids []string
-	for _, g := range gauges {
+	// lookup to see if we have gauges in the database.
+	isKnown := func(s string) bool {
+		idx := sort.SearchStrings(gauges, s)
+		return idx < len(gauges) && gauges[idx] == s
+	}
+
+	result, err := fetch()
+	if err != nil {
+		return nil, err
+	}
 
-		isrsID := nts.Isrs_code_type(g.location.String())
-		idPair := []*nts.Id_pair{{Id: &isrsID}}
-
-		req := &nts.Get_messages_query{
-			Message_type: &mt,
-			Ids:          idPair,
-		}
+	var gids []string
+	for _, msg := range result {
+		var gid int64
+		for _, wrm := range msg.Wrm {
+			curr := string(*wrm.Geo_object.Id)
+			currIsrs, err := models.IsrsFromString(curr)
+			if err != nil {
+				feedback.Warn("Invalid ISRS code %v", err)
+				continue
+			}
+			feedback.Info("Found measurements for %s", curr)
+			if !isKnown(curr) {
+				feedback.Warn("Gauge '%s' is not in database.", curr)
+				continue
+			}
 
-		resp, err := client.Get_messages(req)
-		if err != nil {
-			return nil, err
-		}
-
-		if resp.Result_message == nil {
-			for _, e := range resp.Result_error {
-				if e != nil {
-					feedback.Error("No gauge measurements found for %s", g.location)
+			var referenceCode string
+			if wrm.Reference_code == nil {
+				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
+				referenceCode = "ZPG"
+			} else {
+				referenceCode = string(*wrm.Reference_code)
+			}
+			for _, measure := range wrm.Measure {
+				var unit string
+				if measure.Unit == nil {
+					feedback.Info("'Unit' not specified. Assuming 'cm'")
+					unit = "cm"
 				} else {
-					feedback.Error("unknown")
+					unit = string(*measure.Unit)
+				}
+				convert, err := rescale(unit)
+				if err != nil {
+					return nil, err
+				}
+				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
+				err = insertStmt.QueryRowContext(
+					ctx,
+					currIsrs.CountryCode,
+					currIsrs.LoCode,
+					currIsrs.FairwaySection,
+					currIsrs.Orc,
+					currIsrs.Hectometre,
+					measure.Measuredate,
+					msg.Identification.From,
+					msg.Identification.Language_code,
+					msg.Identification.Country_code,
+					msg.Identification.Date_issue,
+					referenceCode,
+					convert(measure.Value),
+					measure.Predicted,
+					isWaterlevel,
+					convert(measure.Value_min),
+					convert(measure.Value_max),
+					msg.Identification.Date_issue,
+					msg.Identification.Originator,
+					true, // staging_done
+				).Scan(&gid)
+				if err != nil {
+					return nil, err
 				}
 			}
-			continue
-		}
-		result := resp.Result_message
-
-		for _, msg := range result {
-			var gid int64
-			feedback.Info("Found measurements for %s", g.location)
-			for _, wrm := range msg.Wrm {
-				currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
-				if err != nil {
-					feedback.Warn("Invalid ISRS code %v", err)
-					continue
-				}
-				var referenceCode string
-				if wrm.Reference_code == nil {
-					feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
-					referenceCode = "ZPG"
-				} else {
-					referenceCode = string(*wrm.Reference_code)
-				}
-				for _, measure := range wrm.Measure {
-					var unit string
-					if measure.Unit == nil {
-						feedback.Info("'Unit' not specified. Assuming 'cm'")
-						unit = "cm"
-					} else {
-						unit = string(*measure.Unit)
-					}
-					convert, err := rescale(unit)
-					if err != nil {
-						return nil, err
-					}
-					isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
-					err = insertStmt.QueryRowContext(
-						ctx,
-						currIsrs.CountryCode,
-						currIsrs.LoCode,
-						currIsrs.FairwaySection,
-						currIsrs.Orc,
-						currIsrs.Hectometre,
-						measure.Measuredate,
-						msg.Identification.From,
-						msg.Identification.Language_code,
-						msg.Identification.Country_code,
-						msg.Identification.Date_issue,
-						referenceCode,
-						convert(measure.Value),
-						measure.Predicted,
-						isWaterlevel,
-						convert(measure.Value_min),
-						convert(measure.Value_max),
-						msg.Identification.Date_issue,
-						msg.Identification.Originator,
-						true, // staging_done
-					).Scan(&gid)
-					if err != nil {
-						return nil, err
-					}
-				}
-				feedback.Info("Inserted %d measurements for %s",
-					len(wrm.Measure), currIsrs)
-				gids = append(gids, currIsrs.String())
-			}
+			feedback.Info("Inserted %d measurements for %s",
+				len(wrm.Measure), curr)
+			gids = append(gids, curr)
 		}
 	}
 	return gids, nil