changeset 2079:9318973487a1

Waterway profiles import: Implemented the parsing of the data lines of the CSV file and stored the values into the database.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 30 Jan 2019 23:57:51 +0100
parents dacf79a0658e
children 1dafdbfca100
files pkg/imports/wp.go schema/auth.sql
diffstat 2 files changed, 216 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/wp.go	Wed Jan 30 20:27:07 2019 +0100
+++ b/pkg/imports/wp.go	Wed Jan 30 23:57:51 2019 +0100
@@ -18,13 +18,18 @@
 	"context"
 	"database/sql"
 	"encoding/csv"
-	"errors"
 	"fmt"
+	"io"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/misc"
+	"gemma.intevation.de/gemma/pkg/models"
+	"github.com/jackc/pgx/pgtype"
 )
 
 type WaterwayProfiles struct {
@@ -59,19 +64,63 @@
 	}
 }
 
+const (
+	insertWaterwayProfileSQL = `
+INSERT INTO waterway.waterway_profiles (
+  location,
+  validity,
+  lnwl,
+  mwl,
+  hnwl,
+  fe30,
+  fe100,
+  date_info,
+  source_organization
+) VALUES (
+  ($1, $2, $3, $4, $5),
+  $6,
+  $7,
+  $8,
+  $9,
+  $10,
+  $11,
+  $12,
+  $13
+) RETURNING id`
+
+	wpStageDoneSQL = `
+UPDATE waterway.waterway_profiles SET staging_done = true
+WHERE id IN (
+  SELECT key FROM import.track_imports
+  WHERE import_id = $1 AND
+    relation = 'waterway.waterway_profiles'::regclass)`
+)
+
 func (wpJobCreator) StageDone(
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
 ) error {
-	// TODO: Implement me!
-	return nil
+	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
+	return err
 }
 
 func (wp *WaterwayProfiles) CleanUp() error {
 	return os.RemoveAll(wp.Dir)
 }
 
+func parseFloat64(s string) (sql.NullFloat64, error) {
+	if s == "" {
+		return sql.NullFloat64{}, nil
+	}
+	s = strings.Replace(s, ",", ".", -1)
+	v, err := strconv.ParseFloat(s, 64)
+	if err != nil {
+		return sql.NullFloat64{}, err
+	}
+	return sql.NullFloat64{Float64: v, Valid: true}, nil
+}
+
 func (wp *WaterwayProfiles) Do(
 	ctx context.Context,
 	importID int64,
@@ -79,7 +128,7 @@
 	feedback Feedback,
 ) (interface{}, error) {
 
-	//start := time.Now()
+	start := time.Now()
 
 	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
 	if err != nil {
@@ -155,7 +204,164 @@
 			strings.Join(missing, ", "))
 	}
 
-	// TODO: Implement me!
+	parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer trackStmt.Close()
+
+	var ids []int64
+
+lines:
+	for line := 1; ; line++ {
+
+		row, err := r.Read()
+		switch {
+		case err == io.EOF || len(row) == 0:
+			break lines
+		case err != nil:
+			return nil, fmt.Errorf("CSV parsing failed: %v", err)
+		}
+
+		location, err := models.IsrsFromString(row[locationIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid ISRS location code in line %d: %v",
+				line, err)
+		}
+
+		validFromTime, err := parseDate(row[validFromIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'valid_from' value in line %d: %v",
+				line, err)
+		}
+		validToTime, err := parseDate(row[validToIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'valid_to' value in line %d: %v",
+				line, err)
+		}
+
+		validFrom := pgtype.Timestamptz{
+			Time:   validFromTime,
+			Status: pgtype.Present,
+		}
+
+		validTo := pgtype.Timestamptz{
+			Time:   validToTime,
+			Status: pgtype.Present,
+		}
+
+		validity := pgtype.Tstzrange{
+			Lower:     validFrom,
+			Upper:     validTo,
+			LowerType: pgtype.Inclusive,
+			UpperType: pgtype.Exclusive,
+			Status:    pgtype.Present,
+		}
 
-	return nil, errors.New("Not implemented, yet!")
+		lnwl, err := parseFloat64(row[lnwlIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'lnwl' value in line %d: %v",
+				line, err)
+		}
+		mwl, err := parseFloat64(row[mwlIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'mwl' value in line %d: %v",
+				line, err)
+		}
+		hnwl, err := parseFloat64(row[hnwlIdx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'hnwl' value in line %d: %v",
+				line, err)
+		}
+		fe30, err := parseFloat64(row[fe30Idx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'fe30' value in line %d: %v",
+				line, err)
+		}
+		fe100, err := parseFloat64(row[fe100Idx])
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'fe100' value in line %d: %v",
+				line, err)
+		}
+
+		var dateInfo time.Time
+
+		if di := row[dateInfoIdx]; di == "" {
+			dateInfo = start
+		} else if dateInfo, err = parseDate(di); err != nil {
+			return nil, fmt.Errorf(
+				"Invalid 'date_info' value in line %d: %v",
+				line, err)
+		}
+
+		source := row[sourceIdx]
+
+		var id int64
+		if err := insertStmt.QueryRowContext(
+			ctx,
+			location.CountryCode,
+			location.LoCode,
+			location.FairwaySection,
+			location.Orc,
+			location.Hectometre,
+			&validity,
+			lnwl,
+			mwl,
+			hnwl,
+			fe30,
+			fe100,
+			dateInfo,
+			source,
+		).Scan(&id); err != nil {
+			return nil, err
+		}
+
+		if _, err := trackStmt.ExecContext(
+			ctx, importID, "waterway.waterway_profiles", id); err != nil {
+			return nil, err
+		}
+		ids = append(ids, id)
+	}
+	if len(ids) == 0 {
+		return nil, UnchangedError("No new entries in waterway profiles.")
+	}
+
+	if err := tx.Commit(); err != nil {
+		return nil, fmt.Errorf(
+			"Importing waterway profiles failed after %s: %v",
+			time.Since(start), err)
+	}
+
+	feedback.Info("%d new entries in waterway profiles.", len(ids))
+	feedback.Info("Importing waterway profiles took %s",
+		time.Since(start))
+
+	summary := struct {
+		IDs []int64 `json:"ids"`
+	}{
+		IDs: ids,
+	}
+	return &summary, nil
 }
--- a/schema/auth.sql	Wed Jan 30 20:27:07 2019 +0100
+++ b/schema/auth.sql	Wed Jan 30 23:57:51 2019 +0100
@@ -124,6 +124,10 @@
     FOR ALL TO waterway_admin
     USING ((fk_gauge_id).country_code = users.current_user_country());
 
+CREATE POLICY same_country ON waterway.waterway_profiles
+    FOR ALL TO waterway_admin
+    USING ((location).country_code = users.current_user_country());
+
 CREATE POLICY responsibility_area ON waterway.bottlenecks
     FOR ALL TO waterway_admin
     USING (utm_covers(area));