changeset 4023:baa51bb82364

AGM import: parse CSV to completion before requesting database This is in preparation to avoidance of doing one SELECT per line.
author Tom Gottfried <tom@intevation.de>
date Mon, 22 Jul 2019 14:45:41 +0200
parents efe0904b1d45
children 7afa18971d38
files pkg/imports/agm.go
diffstat 1 files changed, 81 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Mon Jul 22 13:28:04 2019 +0200
+++ b/pkg/imports/agm.go	Mon Jul 22 14:45:41 2019 +0200
@@ -4,13 +4,14 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018 by via donau
+// Copyright (C) 2018, 2019 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
 // Author(s):
 //  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
 //  * Sascha Wilde <wilde@intevation.de>
+//  * Tom Gottfried <tom.gottfried@intevation.de>
 
 package imports
 
@@ -120,14 +121,16 @@
 }
 
 type agmLine struct {
-	CountryCode        string  `json:"country-code"`
-	Sender             string  `json:"sender"`
-	LanguageCode       string  `json:"language-code"`
-	DateIssue          timetz  `json:"date-issue"`
-	ReferenceCode      string  `json:"reference-code"`
-	WaterLevel         float64 `json:"water-level"`
-	DateInfo           timetz  `json:"date-info"`
-	SourceOrganization string  `json:"source-organization"`
+	Location           models.Isrs `json:"fk-gauge-id"`
+	CountryCode        string      `json:"country-code"`
+	Sender             string      `json:"sender"`
+	LanguageCode       string      `json:"language-code"`
+	DateIssue          timetz      `json:"date-issue"`
+	ReferenceCode      string      `json:"reference-code"`
+	MeasureDate        timetz      `json:"measure-date"`
+	WaterLevel         float64     `json:"water-level"`
+	DateInfo           timetz      `json:"date-info"`
+	SourceOrganization string      `json:"source-organization"`
 }
 
 func (a *agmLine) hasDiff(b *agmLine) bool {
@@ -309,8 +312,11 @@
 	warn := warnLimiter.Warn
 	defer warnLimiter.Close()
 
+	agmLines := []*agmLine{}
+	ignored := 0
+
 lines:
-	for line, ignored := 1, 0; ; line++ {
+	for line := 1; ; line++ {
 
 		row, err := r.Read()
 		switch {
@@ -322,7 +328,6 @@
 			if ignored == line-1 {
 				return nil, UnchangedError("No entries imported")
 			}
-			feedback.Info("Imported %d entries with changes", len(entries))
 			break lines
 		case err != nil:
 			return nil, fmt.Errorf("CSV parsing failed: %v", err)
@@ -364,6 +369,38 @@
 			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
 		}
 
+		newSender := agm.Originator
+		newCountryCode := gid.CountryCode
+		newLanguageCode := misc.CCtoLang[gid.CountryCode]
+		newDateIssue := time.Now()
+		newReferenceCode := "ZPG"
+
+		value, err := strconv.ParseFloat(row[valueIdx], 32)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
+		}
+		newValue := value
+
+		newDateInfo := newDateIssue
+
+		newSourceOrganization := newSender
+
+		agmLines = append(agmLines, newAGMLine(
+			*gid,
+			newCountryCode,
+			newSender,
+			newLanguageCode,
+			newDateIssue,
+			newReferenceCode,
+			md,
+			newValue,
+			newDateInfo,
+			newSourceOrganization,
+		))
+	}
+
+agmLines:
+	for _, line := range agmLines {
 		var (
 			oldID                 int64
 			oldCountryCode        string
@@ -378,12 +415,12 @@
 
 		err = selectStmt.QueryRowContext(
 			ctx,
-			gid.CountryCode,
-			gid.LoCode,
-			gid.FairwaySection,
-			gid.Orc,
-			gid.Hectometre,
-			md,
+			line.Location.CountryCode,
+			line.Location.LoCode,
+			line.Location.FairwaySection,
+			line.Location.Orc,
+			line.Location.Hectometre,
+			line.MeasureDate.Time,
 		).Scan(
 			&oldID,
 			&oldCountryCode,
@@ -405,22 +442,6 @@
 			return nil, err
 		}
 
-		newSender := agm.Originator
-		newCountryCode := gid.CountryCode
-		newLanguageCode := misc.CCtoLang[gid.CountryCode]
-		newDateIssue := time.Now()
-		newReferenceCode := "ZPG"
-
-		value, err := strconv.ParseFloat(row[valueIdx], 32)
-		if err != nil {
-			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
-		}
-		newValue := value
-
-		newDateInfo := newDateIssue
-
-		newSourceOrganization := newSender
-
 		switch err := func() error {
 			tx, err := conn.BeginTx(ctx, nil)
 			if err != nil {
@@ -432,20 +453,20 @@
 
 			if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
 				ctx,
-				gid.CountryCode,
-				gid.LoCode,
-				gid.FairwaySection,
-				gid.Orc,
-				gid.Hectometre,
-				md,
-				newCountryCode,
-				newSender,
-				newLanguageCode,
-				newDateIssue,
-				newReferenceCode,
-				newValue,
-				newDateInfo,
-				newSourceOrganization,
+				line.Location.CountryCode,
+				line.Location.LoCode,
+				line.Location.FairwaySection,
+				line.Location.Orc,
+				line.Location.Hectometre,
+				line.MeasureDate.Time,
+				line.CountryCode,
+				line.Sender,
+				line.LanguageCode,
+				line.DateIssue.Time,
+				line.ReferenceCode,
+				line.WaterLevel,
+				line.DateInfo.Time,
+				line.SourceOrganization,
 			).Scan(&newID); err != nil {
 				warn(handleError(err).Error())
 				ignored++
@@ -464,49 +485,41 @@
 			return err
 		}(); {
 		case err == errContinue:
-			continue lines
+			continue agmLines
 		case err != nil:
 			return nil, err
 		}
 
-		n := newAGMLine(
-			newCountryCode,
-			newSender,
-			newLanguageCode,
-			newDateIssue,
-			newReferenceCode,
-			newValue,
-			newDateInfo,
-			newSourceOrganization,
-		)
-
 		ase := &agmSummaryEntry{
-			FKGaugeID:   *gid,
-			MeasureDate: timetz{md},
+			FKGaugeID:   line.Location,
+			MeasureDate: line.MeasureDate,
 		}
 
 		if newEntry {
-			ase.Versions = []*agmLine{n}
+			ase.Versions = []*agmLine{line}
 		} else {
 			o := newAGMLine(
+				line.Location,
 				oldCountryCode,
 				oldSender,
 				oldLanguageCode,
 				oldDateIssue,
 				oldReferenceCode,
+				line.MeasureDate.Time,
 				oldValue,
 				oldDateInfo,
 				oldSourceOrganization,
 			)
 			// Ignore if there is no diff.
-			if !n.hasDiff(o) {
+			if !line.hasDiff(o) {
 				continue
 			}
-			ase.Versions = []*agmLine{o, n}
+			ase.Versions = []*agmLine{o, line}
 		}
 		entries = append(entries, ase)
 	}
 
+	feedback.Info("Imported %d entries with changes", len(entries))
 	feedback.Info("Importing approved gauge measurements took %s",
 		time.Since(start))
 
@@ -514,21 +527,25 @@
 }
 
 func newAGMLine(
+	location models.Isrs,
 	countryCode string,
 	sender string,
 	languageCode string,
 	dateIssue time.Time,
 	referenceCode string,
+	measureDate time.Time,
 	waterLevel float64,
 	dateInfo time.Time,
 	sourceOrganization string,
 ) *agmLine {
 	return &agmLine{
+		Location:           location,
 		CountryCode:        countryCode,
 		Sender:             sender,
 		LanguageCode:       languageCode,
 		DateIssue:          timetz{dateIssue},
 		ReferenceCode:      referenceCode,
+		MeasureDate:        timetz{measureDate},
 		WaterLevel:         waterLevel,
 		DateInfo:           timetz{dateInfo},
 		SourceOrganization: sourceOrganization,