changeset 1981:e89368aec538

Import: Make one request per gauge for gauge measurement data.
author Raimund Renkert <raimund.renkert@intevation.de>
date Wed, 23 Jan 2019 15:02:50 +0100
parents c8e2f6838eaf
children a7e47a9d890b
files pkg/imports/gm.go
diffstat 1 files changed, 95 insertions(+), 94 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/gm.go	Wed Jan 23 14:17:14 2019 +0100
+++ b/pkg/imports/gm.go	Wed Jan 23 15:02:50 2019 +0100
@@ -16,7 +16,6 @@
 import (
 	"context"
 	"database/sql"
-	"errors"
 	"fmt"
 	"strings"
 	"time"
@@ -233,110 +232,112 @@
 
 	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
 
-	var idPairs []*nts.Id_pair
-	for _, g := range gauges {
-		isrs := g.Gauge.String()
-		isrsID := nts.Isrs_code_type(isrs)
-		idPairs = append(idPairs, &nts.Id_pair{
-			Id: &isrsID,
-		})
-	}
-	mt := nts.Message_type_typeWRM
-	req := &nts.Get_messages_query{
-		Message_type: &mt,
-		Ids:          idPairs,
-	}
-	resp, err := client.Get_messages(req)
-	if err != nil {
-		feedback.Error("%v", err)
-		return nil, err
-	}
-
-	if resp.Result_message == nil {
-		err := errors.New("no gauge measurements found")
-		for _, e := range resp.Result_error {
-			if e != nil {
-				feedback.Error("%v", *e)
-			} else {
-				feedback.Error("unknown")
-			}
-		}
-		return nil, err
-	}
-
-	result := resp.Result_message
-
 	tx, err := conn.BeginTx(ctx, nil)
 	if err != nil {
 		return nil, err
 	}
 	defer tx.Rollback()
 
-	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertStmt.Close()
+	mt := nts.Message_type_typeWRM
 
-	var gid int64
 	var gids []string
-	for _, msg := range result {
-		feedback.Info("Found %d gauges with measurements", len(msg.Wrm))
-		for _, wrm := range msg.Wrm {
-			currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
-			if err != nil {
-				feedback.Warn("Invalid ISRS code %v", err)
-				continue
-			}
-			var referenceCode string
-			if wrm.Reference_code == nil {
-				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
-				referenceCode = "ZPG"
-			} else {
-				referenceCode = string(*wrm.Reference_code)
-			}
-			for _, measure := range wrm.Measure {
-				var unit string
-				if measure.Unit == nil {
-					feedback.Info("'Unit' not specified. Assuming 'cm'")
-					unit = "cm"
+	for _, g := range gauges {
+
+		var idPair []*nts.Id_pair
+		isrs := g.Gauge.String()
+		isrsID := nts.Isrs_code_type(isrs)
+		idPair = append(idPair, &nts.Id_pair{
+			Id: &isrsID,
+		})
+
+		req := &nts.Get_messages_query{
+			Message_type: &mt,
+			Ids:          idPair,
+		}
+
+		resp, err := client.Get_messages(req)
+		if err != nil {
+			feedback.Error("%v", err)
+			return nil, err
+		}
+
+		if resp.Result_message == nil {
+			for _, e := range resp.Result_error {
+				if e != nil {
+					feedback.Error("No gauge measurements found for %s", g.Gauge.String())
 				} else {
-					unit = string(*measure.Unit)
-				}
-				convert, err := rescale(unit)
-				if err != nil {
-					return nil, err
-				}
-				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
-				err = insertStmt.QueryRowContext(
-					ctx,
-					currIsrs.CountryCode,
-					currIsrs.LoCode,
-					currIsrs.FairwaySection,
-					currIsrs.Orc,
-					currIsrs.Hectometre,
-					measure.Measuredate,
-					msg.Identification.From,
-					msg.Identification.Language_code,
-					msg.Identification.Country_code,
-					msg.Identification.Date_issue,
-					referenceCode,
-					convert(measure.Value),
-					measure.Predicted,
-					isWaterlevel,
-					convert(measure.Value_min),
-					convert(measure.Value_max),
-					msg.Identification.Date_issue,
-					msg.Identification.Originator,
-					true, // staging_done
-				).Scan(&gid)
-				if err != nil {
-					return nil, err
+					feedback.Error("unknown")
 				}
 			}
-			feedback.Info("Inserted %d measurements for %s",
-				len(wrm.Measure), currIsrs)
-			gids = append(gids, currIsrs.String())
+			continue
+		}
+		result := resp.Result_message
+
+		insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
+		if err != nil {
+			return nil, err
+		}
+		defer insertStmt.Close()
+
+		for _, msg := range result {
+			var gid int64
+			feedback.Info("Found measurements for %s", g.Gauge.String())
+			for _, wrm := range msg.Wrm {
+				currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
+				if err != nil {
+					feedback.Warn("Invalid ISRS code %v", err)
+					continue
+				}
+				var referenceCode string
+				if wrm.Reference_code == nil {
+					feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
+					referenceCode = "ZPG"
+				} else {
+					referenceCode = string(*wrm.Reference_code)
+				}
+				for _, measure := range wrm.Measure {
+					var unit string
+					if measure.Unit == nil {
+						feedback.Info("'Unit' not specified. Assuming 'cm'")
+						unit = "cm"
+					} else {
+						unit = string(*measure.Unit)
+					}
+					convert, err := rescale(unit)
+					if err != nil {
+						return nil, err
+					}
+					isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
+					err = insertStmt.QueryRowContext(
+						ctx,
+						currIsrs.CountryCode,
+						currIsrs.LoCode,
+						currIsrs.FairwaySection,
+						currIsrs.Orc,
+						currIsrs.Hectometre,
+						measure.Measuredate,
+						msg.Identification.From,
+						msg.Identification.Language_code,
+						msg.Identification.Country_code,
+						msg.Identification.Date_issue,
+						referenceCode,
+						convert(measure.Value),
+						measure.Predicted,
+						isWaterlevel,
+						convert(measure.Value_min),
+						convert(measure.Value_max),
+						msg.Identification.Date_issue,
+						msg.Identification.Originator,
+						true, // staging_done
+					).Scan(&gid)
+					if err != nil {
+						return nil, err
+					}
+				}
+				feedback.Info("Inserted %d measurements for %s",
+					len(wrm.Measure), currIsrs)
+				gids = append(gids, currIsrs.String())
+			}
 		}
 	}
 	feedback.Info("Storing gauge measurements took %s", time.Since(start))