diff pkg/imports/agm.go @ 2105:58a28715e386

Approved gauge measurement import: Added diff-summary. XXX: May be broken!
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 04 Feb 2019 15:39:58 +0100
parents 8a986d80e1c6
children 817cd8b89a86
line wrap: on
line diff
--- a/pkg/imports/agm.go	Mon Feb 04 14:35:47 2019 +0100
+++ b/pkg/imports/agm.go	Mon Feb 04 15:39:58 2019 +0100
@@ -18,6 +18,7 @@
 	"context"
 	"database/sql"
 	"encoding/csv"
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
@@ -115,6 +116,89 @@
 	"2006-01-02T15:04:05-07:00",
 }).Guess
 
+type timetz struct{ time.Time }
+
+func (ttz *timetz) MarshalJSON() ([]byte, error) {
+	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
+}
+
+type agmLine struct {
+	CountryCode        string   `json:"country-code"`
+	Sender             string   `json:"sender"`
+	LanguageCode       string   `json:"language-code"`
+	DateIssue          timetz   `json:"date-issue"`
+	ReferenceCode      string   `json:"reference-code"`
+	WaterLevel         float64  `json:"water-level"`
+	Predicted          bool     `json:"predicted"`
+	ValueMin           *float64 `json:"value-min"`
+	ValueMax           *float64 `json:"value-max"`
+	DateInfo           timetz   `json:"date-info"`
+	SourceOrganization string   `json:"source-organization"`
+}
+
+type agmSummaryEntry struct {
+	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
+	MeasureDate timetz      `json:"measure-date"`
+	Versions    []*agmLine  `json:"versions"`
+}
+
+const (
+	agmSelectSQL = `
+SELECT
+  id,
+  country_code,
+  sender,
+  language_code,
+  date_issue,
+  reference_code,
+  water_level,
+  predicted,
+  value_min,
+  value_max,
+  date_info,
+  source_organization
+FROM waterway.gauge_measurements
+WHERE
+  fk_gauge_id = ($1, $2, $3, $4, $5) AND
+  measure_date = $6 AND staging_done`
+
+	agmInsertSQL = `
+INSERT INTO waterway.gauge_measurements (
+  fk_gauge_id,
+  measure_date,
+  country_code,
+  sender,
+  language_code,
+  date_issue,
+  reference_code,
+  water_level,
+  predicted,
+  value_min,
+  value_max,
+  date_info,
+  source_organization,
+  is_waterlevel,
+  staging_done
+) VALUES(
+  ($1, $2, $3, $4, $5),
+  $6,
+  $7,
+  $8,
+  $9,
+  $10,
+  $11,
+  $12,
+  $13,
+  $14,
+  $15,
+  $16,
+  $17,
+  true,
+  false
+)
+RETURNING id`
+)
+
 // Do executes the actual approved gauge measurements import.
 func (agm *ApprovedGaugeMeasurements) Do(
 	ctx context.Context,
@@ -196,13 +280,11 @@
 	}
 
 	var missing []string
-
 	for i := range headerFields {
 		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
 			missing = append(missing, headerFields[i].name)
 		}
 	}
-
 	if len(missing) > 0 {
 		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
 	}
@@ -226,7 +308,12 @@
 	}
 	defer tx.Rollback()
 
-	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
+	selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer selectStmt.Close()
+	insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL)
 	if err != nil {
 		return nil, err
 	}
@@ -237,11 +324,7 @@
 	}
 	defer trackStmt.Close()
 
-	ids := []int64{}
-
-	args := make([]interface{}, 19)
-
-	args[18] = false // staging_done
+	entries := []*agmSummaryEntry{}
 
 lines:
 	for line := 1; ; line++ {
@@ -264,74 +347,179 @@
 			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
 		}
 
-		args[0] = gid.CountryCode
-		args[1] = gid.LoCode
-		args[2] = gid.FairwaySection
-		args[3] = gid.Orc
-		args[4] = gid.Hectometre
-
 		md, err := guessDate(row[measureDateIdx])
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
 		}
-		args[5] = md
+
+		var (
+			oldID                 int64
+			oldCountryCode        string
+			oldSender             string
+			oldLanguageCode       string
+			oldDateIssue          time.Time
+			oldReferenceCode      string
+			oldValue              float64
+			oldPredicted          bool
+			oldValueMin           sql.NullFloat64
+			oldValueMax           sql.NullFloat64
+			oldDateInfo           time.Time
+			oldSourceOrganization string
+		)
 
-		args[6] = row[fromIdx]
-		args[7] = row[languageCodeIdx]
-		args[8] = row[countryCodeIdx]
+		err = selectStmt.QueryRowContext(
+			ctx,
+			gid.CountryCode,
+			gid.LoCode,
+			gid.FairwaySection,
+			gid.Orc,
+			gid.Hectometre,
+			md,
+		).Scan(
+			&oldID,
+			&oldCountryCode,
+			&oldSender,
+			&oldLanguageCode,
+			&oldDateIssue,
+			&oldReferenceCode,
+			&oldValue,
+			&oldPredicted,
+			&oldValueMin,
+			&oldValueMax,
+			&oldDateInfo,
+			&oldSourceOrganization,
+		)
+
+		var newEntry bool
+		switch {
+		case err == sql.ErrNoRows:
+			// Complete new one
+			newEntry = true
+		case err != nil:
+			return nil, err
+		}
+
+		newSender := row[fromIdx]
+		newLanguageCode := row[languageCodeIdx]
+		newCountryCode := row[countryCodeIdx]
 
 		dis, err := guessDate(row[dateIssueIdx])
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
 		}
-		args[9] = dis
+		newDateIssue := dis
 
-		args[10] = row[referenceCodeIdx]
+		newReferenceCode := row[referenceCodeIdx]
 
 		value, err := strconv.ParseFloat(row[valueIdx], 32)
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
 		}
-		args[11] = convert(float32(value))
+		newValue := float64(convert(float32(value)))
 
-		predicted := strings.ToLower(row[predictedIdx]) == "true"
-		args[12] = predicted
-
-		args[13] = true // is_waterlevel
+		newPredicted := strings.ToLower(row[predictedIdx]) == "true"
 
-		valueMin, err := strconv.ParseFloat(row[valueMinIdx], 32)
-		if err != nil {
-			return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
+		var newValueMin sql.NullFloat64
+		if vm := row[valueMinIdx]; vm != "" {
+			valueMin, err := strconv.ParseFloat(vm, 32)
+			if err != nil {
+				return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
+			}
+			newValueMin = sql.NullFloat64{
+				Float64: float64(convert(float32(valueMin))),
+				Valid:   true,
+			}
 		}
-		args[14] = convert(float32(valueMin))
 
-		valueMax, err := strconv.ParseFloat(row[valueMaxIdx], 32)
-		if err != nil {
-			return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
+		var newValueMax sql.NullFloat64
+		if vm := row[valueMaxIdx]; vm != "" {
+			valueMax, err := strconv.ParseFloat(vm, 32)
+			if err != nil {
+				return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
+			}
+			newValueMax = sql.NullFloat64{
+				Float64: float64(convert(float32(valueMax))),
+				Valid:   true,
+			}
 		}
-		args[15] = convert(float32(valueMax))
 
 		din, err := guessDate(row[dateInfoIdx])
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
 		}
-		args[16] = din
+		newDateInfo := din
 
-		args[17] = row[originatorIdx]
+		newSourceOrganization := row[originatorIdx]
 
-		// args[18] (staging_done) is set to true outside the loop.
+		var newID int64
 
-		var id int64
-		if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
-			return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
+		if err := insertStmt.QueryRowContext(
+			ctx,
+			gid.CountryCode,
+			gid.LoCode,
+			gid.FairwaySection,
+			gid.Orc,
+			gid.Hectometre,
+			md,
+			newCountryCode,
+			newSender,
+			newLanguageCode,
+			newDateIssue,
+			newReferenceCode,
+			newValue,
+			newPredicted,
+			newValueMin,
+			newValueMax,
+			newDateInfo,
+			newSourceOrganization,
+		).Scan(&newID); err != nil {
+			return nil, err
 		}
-		ids = append(ids, id)
-
 		if _, err := trackStmt.ExecContext(
-			ctx, importID, "waterway.gauge_measurements", id,
+			ctx, importID, "waterway.gauge_measurements", newID,
 		); err != nil {
 			return nil, err
 		}
+
+		ase := &agmSummaryEntry{
+			FKGaugeID:   *gid,
+			MeasureDate: timetz{md},
+			Versions: []*agmLine{
+				newAGMLine(
+					newCountryCode,
+					newSender,
+					newLanguageCode,
+					newDateIssue,
+					newReferenceCode,
+					newValue,
+					newPredicted,
+					newValueMin,
+					newValueMax,
+					newDateInfo,
+					newSourceOrganization,
+				),
+			},
+		}
+
+		if !newEntry {
+			ase.Versions = []*agmLine{
+				newAGMLine(
+					oldCountryCode,
+					oldSender,
+					oldLanguageCode,
+					oldDateIssue,
+					oldReferenceCode,
+					oldValue,
+					oldPredicted,
+					oldValueMin,
+					oldValueMax,
+					oldDateInfo,
+					oldSourceOrganization,
+				),
+				ase.Versions[0],
+			}
+		}
+		entries = append(entries, ase)
 	}
 
 	if err := tx.Commit(); err != nil {
@@ -341,10 +529,40 @@
 	feedback.Info("Importing approved gauge measurements took %s",
 		time.Since(start))
 
-	summary := struct {
-		IDs []int64 `json:"ids"`
-	}{
-		IDs: ids,
+	return entries, nil
+}
+
+func newAGMLine(
+	countryCode string,
+	sender string,
+	languageCode string,
+	dateIssue time.Time,
+	referenceCode string,
+	waterLevel float64,
+	predicted bool,
+	valueMin sql.NullFloat64,
+	valueMax sql.NullFloat64,
+	dateInfo time.Time,
+	sourceOrganization string,
+) *agmLine {
+	nilFloat := func(v sql.NullFloat64) *float64 {
+		var p *float64
+		if v.Valid {
+			p = &v.Float64
+		}
+		return p
 	}
-	return &summary, nil
+	return &agmLine{
+		CountryCode:        countryCode,
+		Sender:             sender,
+		LanguageCode:       languageCode,
+		DateIssue:          timetz{dateIssue},
+		ReferenceCode:      referenceCode,
+		WaterLevel:         waterLevel,
+		Predicted:          predicted,
+		ValueMin:           nilFloat(valueMin),
+		ValueMax:           nilFloat(valueMax),
+		DateInfo:           timetz{dateInfo},
+		SourceOrganization: sourceOrganization,
+	}
 }