Mercurial > gemma
view pkg/imports/agm.go @ 2104:c9af355d4a2c
staging: display stretch name
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 04 Feb 2019 14:35:47 +0100 |
parents | 8a986d80e1c6 |
children | 58a28715e386 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create(_ JobKind, data string) (Job, error) { agm := new(ApprovedGaugeMeasurements) if err := common.FromJSONString(data, agm); err != nil { return nil, err } return agm, nil } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` WITH staged AS ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass ), to_delete AS ( SELECT o.id AS id FROM waterway.gauge_measurements o JOIN waterway.gauge_measurements n ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = misc.TimeGuesser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Guess // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( fkGaugeIDIdx = -1 measureDateIdx = -1 fromIdx = -1 languageCodeIdx = -1 countryCodeIdx = -1 dateIssueIdx = -1 referenceCodeIdx = -1 valueIdx = -1 predictedIdx = -1 valueMinIdx = -1 valueMaxIdx = -1 dateInfoIdx = -1 originatorIdx = -1 unitIdx = -1 ) headerFields := []struct { idx *int name string }{ {&fkGaugeIDIdx, "fk_gauge_id"}, {&measureDateIdx, "measure_date"}, {&fromIdx, "from"}, // "sender", {&languageCodeIdx, "language_code"}, {&countryCodeIdx, "country_code"}, {&dateIssueIdx, "date_issue"}, {&referenceCodeIdx, "reference_code"}, {&valueIdx, "value"}, // "water_level", {&predictedIdx, "predicted"}, // "is_waterlevel", {&valueMinIdx, "value_min"}, {&valueMaxIdx, "value_max"}, {&dateInfoIdx, "date_info"}, {&originatorIdx, "originator"}, // "source_organization", {&unitIdx, "unit"}, } nextHeader: for i, f := range headers { h := strings.Replace(strings.ToLower( strings.TrimSpace(f)), " ", "_", -1) for j := range headerFields { if headerFields[j].name == h { if *headerFields[j].idx != -1 { return nil, fmt.Errorf( "There is more than one column namend '%s'", h) } *headerFields[j].idx = i continue nextHeader } } } var missing []string for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } inCm, _ := rescale("cm") scaler := func(row []string) (func(float32) float32, error) { if unitIdx == -1 { return inCm, nil } unit := row[unitIdx] if unit == "cm" { return inCm, nil } s, err := rescale(unit) return s, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() ids := []int64{} args := make([]interface{}, 19) args[18] = false // staging_done lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } convert, err := scaler(row) if err != nil { return nil, fmt.Errorf("line %d: %v", line, err) } gids := row[fkGaugeIDIdx] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } args[0] = gid.CountryCode args[1] = gid.LoCode args[2] = gid.FairwaySection args[3] = gid.Orc args[4] = gid.Hectometre md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } args[5] = md args[6] = row[fromIdx] args[7] = row[languageCodeIdx] args[8] = row[countryCodeIdx] dis, err := guessDate(row[dateIssueIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } args[9] = dis args[10] = row[referenceCodeIdx] value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } args[11] = convert(float32(value)) predicted := strings.ToLower(row[predictedIdx]) == "true" args[12] = predicted args[13] = true // is_waterlevel valueMin, err := strconv.ParseFloat(row[valueMinIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) } args[14] = convert(float32(valueMin)) valueMax, err := strconv.ParseFloat(row[valueMaxIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) } args[15] = convert(float32(valueMax)) din, err := guessDate(row[dateInfoIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } args[16] = din args[17] = row[originatorIdx] // args[18] (staging_done) is set to true outside the loop. var id int64 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) } ids = append(ids, id) if _, err := trackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", id, ); err != nil { return nil, err } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }