changeset 1778:164b46ebd60d

Approved gauge measurement import: Implemented. TODO: Fix staging.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 11 Jan 2019 15:28:10 +0100
parents e70b7b8e7b74
children ad1c12e999df
files pkg/imports/agm.go
diffstat 1 files changed, 171 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Fri Jan 11 12:58:01 2019 +0100
+++ b/pkg/imports/agm.go	Fri Jan 11 15:28:10 2019 +0100
@@ -18,13 +18,17 @@
 	"context"
 	"database/sql"
 	"encoding/csv"
-	"errors"
+	"fmt"
+	"io"
 	"log"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/models"
 )
 
 type ApprovedGaugeMeasurements struct {
@@ -87,6 +91,20 @@
 	return os.RemoveAll(agm.Dir)
 }
 
+func guessDate(s string) (time.Time, error) {
+	var err error
+	var t time.Time
+	for _, layout := range [...]string{
+		"02.01.2006 15:04",
+		"2006-01-02T15:04:05-07:00",
+	} {
+		if t, err = time.Parse(layout, s); err == nil {
+			break
+		}
+	}
+	return t, err
+}
+
 // Do executes the actual approved gauge measurements import.
 func (agm *ApprovedGaugeMeasurements) Do(
 	ctx context.Context,
@@ -95,6 +113,8 @@
 	feedback Feedback,
 ) (interface{}, error) {
 
+	start := time.Now()
+
 	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
 	if err != nil {
 		return nil, err
@@ -114,9 +134,13 @@
 
 	for i, f := range headers {
 		log.Printf("%d: %s\n", i, f)
-		headerIndices[strings.ToLower(strings.TrimSpace(f))] = i
+		headerIndices[strings.Replace(
+			strings.ToLower(
+				strings.TrimSpace(f)), " ", "_", -1)] = i
 	}
 
+	var missing []string
+
 	for _, m := range [...]string{
 		"fk_gauge_id",
 		"measure_date",
@@ -125,20 +149,159 @@
 		"country_code",
 		"date_issue",
 		"reference_code",
-		"water_level",
+		"value", // "water_level",
 		"predicted",
-		"is_waterlevel",
+		// "is_waterlevel",
 		"value_min",
 		"value_max",
 		"date_info",
 		"originator", // "source_organization",
 	} {
-		idx, found := headerIndices[m]
+		if _, found := headerIndices[m]; !found {
+			missing = append(missing, m)
+		}
+	}
+
+	if len(missing) > 0 {
+		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
+	}
+
+	inCm, _ := rescale("cm")
+	scaler := func(row []string) (func(float32) float32, error) {
+		idx, found := headerIndices["unit"]
 		if !found {
-			log.Printf("missing column '%s'\n", m)
+			return inCm, nil
 		}
-		_ = idx
+		log.Printf("scaler index: %d %d\n", idx, len(row))
+		unit := row[idx]
+		if unit == "cm" {
+			return inCm, nil
+		}
+		s, err := rescale(unit)
+		return s, err
 	}
 
-	return nil, errors.New("Not implemented, yet!")
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer trackStmt.Close()
+
+	ids := []int64{}
+
+	args := make([]interface{}, 18)
+
+lines:
+	for line := 1; ; line++ {
+
+		row, err := r.Read()
+		switch {
+		case err == io.EOF || len(row) == 0:
+			break lines
+		case err != nil:
+			return nil, fmt.Errorf("CSV parsing failed: %v", err)
+		}
+		convert, err := scaler(row)
+		if err != nil {
+			return nil, fmt.Errorf("line %d: %v", line, err)
+		}
+
+		gids := row[headerIndices["fk_gauge_id"]]
+		gid, err := models.IsrsFromString(gids)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
+		}
+
+		args[0] = gid.CountryCode
+		args[1] = gid.LoCode
+		args[2] = gid.FairwaySection
+		args[3] = gid.Orc
+		args[4] = gid.Hectometre
+
+		md, err := guessDate(row[headerIndices["measure_date"]])
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
+		}
+		args[5] = md
+
+		args[6] = row[headerIndices["from"]]
+		args[7] = row[headerIndices["language_code"]]
+		args[8] = row[headerIndices["country_code"]]
+
+		dis, err := guessDate(row[headerIndices["date_issue"]])
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
+		}
+		args[9] = dis
+
+		args[10] = row[headerIndices["reference_code"]]
+
+		value, err := strconv.ParseFloat(row[headerIndices["value"]], 32)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
+		}
+		args[11] = convert(float32(value))
+
+		predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true"
+		args[12] = predicted
+
+		args[13] = true // is_waterlevel
+
+		valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
+		}
+		args[14] = convert(float32(valueMin))
+
+		valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32)
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
+		}
+		args[15] = convert(float32(valueMax))
+
+		din, err := guessDate(row[headerIndices["date_info"]])
+		if err != nil {
+			return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
+		}
+		args[16] = din
+
+		args[17] = row[headerIndices["originator"]]
+
+		var id int64
+		if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
+			return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
+		}
+		ids = append(ids, id)
+
+		if _, err := trackStmt.ExecContext(
+			ctx, importID, "waterway.gauge_measurements", id,
+		); err != nil {
+			return nil, err
+		}
+	}
+
+	if err := tx.Commit(); err != nil {
+		return nil, fmt.Errorf("Commit failed: %v", err)
+	}
+
+	feedback.Info("Importing approved gauge measurements took %s",
+		time.Since(start))
+
+	summary := struct {
+		IDs []int64 `json:"ids"`
+	}{
+		IDs: ids,
+	}
+	return &summary, nil
 }