Mercurial > gemma
changeset 2079:9318973487a1
Waterway profiles import: Implemented the parsing of the data lines of the CSV file and stored the values into the database.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 30 Jan 2019 23:57:51 +0100 |
parents | dacf79a0658e |
children | 1dafdbfca100 |
files | pkg/imports/wp.go schema/auth.sql |
diffstat | 2 files changed, 216 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/wp.go Wed Jan 30 20:27:07 2019 +0100 +++ b/pkg/imports/wp.go Wed Jan 30 23:57:51 2019 +0100 @@ -18,13 +18,18 @@ "context" "database/sql" "encoding/csv" - "errors" "fmt" + "io" "os" "path/filepath" + "strconv" "strings" + "time" "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/misc" + "gemma.intevation.de/gemma/pkg/models" + "github.com/jackc/pgx/pgtype" ) type WaterwayProfiles struct { @@ -59,19 +64,63 @@ } } +const ( + insertWaterwayProfileSQL = ` +INSERT INTO waterway.waterway_profiles ( + location, + validity, + lnwl, + mwl, + hnwl, + fe30, + fe100, + date_info, + source_organization +) VALUES ( + ($1, $2, $3, $4, $5), + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13 +) RETURNING id` + + wpStageDoneSQL = ` +UPDATE waterway.waterway_profiles SET staging_done = true +WHERE id IN ( + SELECT key FROM import.track_imports + WHERE import_id = $1 AND + relation = 'waterway.waterway_profiles'::regclass)` +) + func (wpJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { - // TODO: Implement me! - return nil + _, err := tx.ExecContext(ctx, wpStageDoneSQL, id) + return err } func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) } +func parseFloat64(s string) (sql.NullFloat64, error) { + if s == "" { + return sql.NullFloat64{}, nil + } + s = strings.Replace(s, ",", ".", -1) + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return sql.NullFloat64{}, err + } + return sql.NullFloat64{Float64: v, Valid: true}, nil +} + func (wp *WaterwayProfiles) Do( ctx context.Context, importID int64, @@ -79,7 +128,7 @@ feedback Feedback, ) (interface{}, error) { - //start := time.Now() + start := time.Now() f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) if err != nil { @@ -155,7 +204,164 @@ strings.Join(missing, ", ")) } - // TODO: Implement me! + parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) + if err != nil { + return nil, err + } + defer trackStmt.Close() + + var ids []int64 + +lines: + for line := 1; ; line++ { + + row, err := r.Read() + switch { + case err == io.EOF || len(row) == 0: + break lines + case err != nil: + return nil, fmt.Errorf("CSV parsing failed: %v", err) + } + + location, err := models.IsrsFromString(row[locationIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid ISRS location code in line %d: %v", + line, err) + } + + validFromTime, err := parseDate(row[validFromIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'valid_from' value in line %d: %v", + line, err) + } + validToTime, err := parseDate(row[validToIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'valid_to' value in line %d: %v", + line, err) + } + + validFrom := pgtype.Timestamptz{ + Time: validFromTime, + Status: pgtype.Present, + } + + validTo := pgtype.Timestamptz{ + Time: validToTime, + Status: pgtype.Present, + } + + validity := pgtype.Tstzrange{ + Lower: validFrom, + Upper: validTo, + LowerType: pgtype.Inclusive, + UpperType: pgtype.Exclusive, + Status: pgtype.Present, + } - return nil, errors.New("Not implemented, yet!") + lnwl, err := parseFloat64(row[lnwlIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'lnwl' value in line %d: %v", + line, err) + } + mwl, err := parseFloat64(row[mwlIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'mwl' value in line %d: %v", + line, err) + } + hnwl, err := parseFloat64(row[hnwlIdx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'hnwl' value in line %d: %v", + line, err) + } + fe30, err := parseFloat64(row[fe30Idx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'fe30' value in line %d: %v", + line, err) + } + fe100, err := parseFloat64(row[fe100Idx]) + if err != nil { + return nil, fmt.Errorf( + "Invalid 'fe100' value in line %d: %v", + line, err) + } + + var dateInfo time.Time + + if di := row[dateInfoIdx]; di == "" { + dateInfo = start + } else if dateInfo, err = parseDate(di); err != nil { + return nil, fmt.Errorf( + "Invalid 'date_info' value in line %d: %v", + line, err) + } + + source := row[sourceIdx] + + var id int64 + if err := insertStmt.QueryRowContext( + ctx, + location.CountryCode, + location.LoCode, + location.FairwaySection, + location.Orc, + location.Hectometre, + &validity, + lnwl, + mwl, + hnwl, + fe30, + fe100, + dateInfo, + source, + ).Scan(&id); err != nil { + return nil, err + } + + if _, err := trackStmt.ExecContext( + ctx, importID, "waterway.waterway_profiles", id); err != nil { + return nil, err + } + ids = append(ids, id) + } + if len(ids) == 0 { + return nil, UnchangedError("No new entries in waterway profiles.") + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf( + "Importing waterway profiles failed after %s: %v", + time.Since(start), err) + } + + feedback.Info("%d new entries in waterway profiles.", len(ids)) + feedback.Info("Importing waterway profiles took %s", + time.Since(start)) + + summary := struct { + IDs []int64 `json:"ids"` + }{ + IDs: ids, + } + return &summary, nil }
--- a/schema/auth.sql Wed Jan 30 20:27:07 2019 +0100 +++ b/schema/auth.sql Wed Jan 30 23:57:51 2019 +0100 @@ -124,6 +124,10 @@ FOR ALL TO waterway_admin USING ((fk_gauge_id).country_code = users.current_user_country()); +CREATE POLICY same_country ON waterway.waterway_profiles + FOR ALL TO waterway_admin + USING ((location).country_code = users.current_user_country()); + CREATE POLICY responsibility_area ON waterway.bottlenecks FOR ALL TO waterway_admin USING (utm_covers(area));