Mercurial > gemma
changeset 1778:164b46ebd60d
Approved gauge measurement import: Implemented. TODO: Fix staging.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 11 Jan 2019 15:28:10 +0100 |
parents | e70b7b8e7b74 |
children | ad1c12e999df |
files | pkg/imports/agm.go |
diffstat | 1 files changed, 171 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/agm.go Fri Jan 11 12:58:01 2019 +0100 +++ b/pkg/imports/agm.go Fri Jan 11 15:28:10 2019 +0100 @@ -18,13 +18,17 @@ "context" "database/sql" "encoding/csv" - "errors" + "fmt" + "io" "log" "os" "path/filepath" + "strconv" "strings" + "time" "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { @@ -87,6 +91,20 @@ return os.RemoveAll(agm.Dir) } +func guessDate(s string) (time.Time, error) { + var err error + var t time.Time + for _, layout := range [...]string{ + "02.01.2006 15:04", + "2006-01-02T15:04:05-07:00", + } { + if t, err = time.Parse(layout, s); err == nil { + break + } + } + return t, err +} + // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, @@ -95,6 +113,8 @@ feedback Feedback, ) (interface{}, error) { + start := time.Now() + f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err @@ -114,9 +134,13 @@ for i, f := range headers { log.Printf("%d: %s\n", i, f) - headerIndices[strings.ToLower(strings.TrimSpace(f))] = i + headerIndices[strings.Replace( + strings.ToLower( + strings.TrimSpace(f)), " ", "_", -1)] = i } + var missing []string + for _, m := range [...]string{ "fk_gauge_id", "measure_date", @@ -125,20 +149,159 @@ "country_code", "date_issue", "reference_code", - "water_level", + "value", // "water_level", "predicted", - "is_waterlevel", + // "is_waterlevel", "value_min", "value_max", "date_info", "originator", // "source_organization", } { - idx, found := headerIndices[m] + if _, found := headerIndices[m]; !found { + missing = append(missing, m) + } + } + + if len(missing) > 0 { + return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) + } + + inCm, _ := rescale("cm") + scaler := func(row []string) (func(float32) float32, error) { + idx, found := headerIndices["unit"] if !found { - log.Printf("missing column '%s'\n", m) + return inCm, nil } - _ = idx + log.Printf("scaler index: %d %d\n", idx, len(row)) + unit := row[idx] + if unit == "cm" { + return inCm, nil + } + s, err := rescale(unit) + return s, err } - return nil, errors.New("Not implemented, yet!") + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) + if err != nil { + return nil, err + } + defer trackStmt.Close() + + ids := []int64{} + + args := make([]interface{}, 18) + +lines: + for line := 1; ; line++ { + + row, err := r.Read() + switch { + case err == io.EOF || len(row) == 0: + break lines + case err != nil: + return nil, fmt.Errorf("CSV parsing failed: %v", err) + } + convert, err := scaler(row) + if err != nil { + return nil, fmt.Errorf("line %d: %v", line, err) + } + + gids := row[headerIndices["fk_gauge_id"]] + gid, err := models.IsrsFromString(gids) + if err != nil { + return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) + } + + args[0] = gid.CountryCode + args[1] = gid.LoCode + args[2] = gid.FairwaySection + args[3] = gid.Orc + args[4] = gid.Hectometre + + md, err := guessDate(row[headerIndices["measure_date"]]) + if err != nil { + return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) + } + args[5] = md + + args[6] = row[headerIndices["from"]] + args[7] = row[headerIndices["language_code"]] + args[8] = row[headerIndices["country_code"]] + + dis, err := guessDate(row[headerIndices["date_issue"]]) + if err != nil { + return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) + } + args[9] = dis + + args[10] = row[headerIndices["reference_code"]] + + value, err := strconv.ParseFloat(row[headerIndices["value"]], 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) + } + args[11] = convert(float32(value)) + + predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true" + args[12] = predicted + + args[13] = true // is_waterlevel + + valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) + } + args[14] = convert(float32(valueMin)) + + valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) + } + args[15] = convert(float32(valueMax)) + + din, err := guessDate(row[headerIndices["date_info"]]) + if err != nil { + return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) + } + args[16] = din + + args[17] = row[headerIndices["originator"]] + + var id int64 + if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { + return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) + } + ids = append(ids, id) + + if _, err := trackStmt.ExecContext( + ctx, importID, "waterway.gauge_measurements", id, + ); err != nil { + return nil, err + } + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("Commit failed: %v", err) + } + + feedback.Info("Importing approved gauge measurements took %s", + time.Since(start)) + + summary := struct { + IDs []int64 `json:"ids"` + }{ + IDs: ids, + } + return &summary, nil }