Mercurial > gemma
view pkg/imports/agm.go @ 1927:5a37ee321651
client: make isolines legend graphic internally availableo
* Add isolines legend graphic to vuex store. Rename it to reflect
it being a dataURL now.
* License header: for store/map.js add 2019 and author BER.
author | Bernhard Reiter <bernhard@intevation.de> |
---|---|
date | Mon, 21 Jan 2019 10:56:14 +0100 |
parents | 0a53c33bc7b2 |
children | 206b1dd31112 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create(_ JobKind, data string) (Job, error) { agm := new(ApprovedGaugeMeasurements) if err := common.FromJSONString(data, agm); err != nil { return nil, err } return agm, nil } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` WITH staged AS ( SELECT key FROM waterway.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass ), to_delete AS ( SELECT o.id AS id FROM waterway.gauge_measurements o JOIN waterway.gauge_measurements n ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM waterway.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } func guessDate(s string) (time.Time, error) { var err error var t time.Time for _, layout := range [...]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", } { if t, err = time.Parse(layout, s); err == nil { break } } return t, err } // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } headerIndices := map[string]int{} for i, f := range headers { headerIndices[strings.Replace( strings.ToLower( strings.TrimSpace(f)), " ", "_", -1)] = i } var missing []string for _, m := range [...]string{ "fk_gauge_id", "measure_date", "from", // "sender", "language_code", "country_code", "date_issue", "reference_code", "value", // "water_level", "predicted", // "is_waterlevel", "value_min", "value_max", "date_info", "originator", // "source_organization", } { if _, found := headerIndices[m]; !found { missing = append(missing, m) } } if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } inCm, _ := rescale("cm") scaler := func(row []string) (func(float32) float32, error) { idx, found := headerIndices["unit"] if !found { return inCm, nil } unit := row[idx] if unit == "cm" { return inCm, nil } s, err := rescale(unit) return s, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() ids := []int64{} args := make([]interface{}, 19) args[18] = false // staging_done lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } convert, err := scaler(row) if err != nil { return nil, fmt.Errorf("line %d: %v", line, err) } gids := row[headerIndices["fk_gauge_id"]] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } args[0] = gid.CountryCode args[1] = gid.LoCode args[2] = gid.FairwaySection args[3] = gid.Orc args[4] = gid.Hectometre md, err := guessDate(row[headerIndices["measure_date"]]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } args[5] = md args[6] = row[headerIndices["from"]] args[7] = row[headerIndices["language_code"]] args[8] = row[headerIndices["country_code"]] dis, err := guessDate(row[headerIndices["date_issue"]]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } args[9] = dis args[10] = row[headerIndices["reference_code"]] value, err := strconv.ParseFloat(row[headerIndices["value"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } args[11] = convert(float32(value)) predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true" args[12] = predicted args[13] = true // is_waterlevel valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) } args[14] = convert(float32(valueMin)) valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) } args[15] = convert(float32(valueMax)) din, err := guessDate(row[headerIndices["date_info"]]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } args[16] = din args[17] = row[headerIndices["originator"]] // args[18] (staging_done) is set to true outside the loop. var id int64 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) } ids = append(ids, id) if _, err := trackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", id, ); err != nil { return nil, err } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }