Mercurial > gemma
view pkg/imports/agm.go @ 2006:35acb7f9ae0c
Do anything else before expectedly failing role creation
Creating roles during database setup expectedly fails in case there
already is another gemma database in the cluster. Doing it at the end
of the transaction ensures it does not hide errors in other commands
in the script.
In passing, add the default admin via the designated view to ensure it
will become a correctly set up application user.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 24 Jan 2019 17:23:43 +0100 |
parents | 59055c8301df |
children | 8a986d80e1c6 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create(_ JobKind, data string) (Job, error) { agm := new(ApprovedGaugeMeasurements) if err := common.FromJSONString(data, agm); err != nil { return nil, err } return agm, nil } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` WITH staged AS ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass ), to_delete AS ( SELECT o.id AS id FROM waterway.gauge_measurements o JOIN waterway.gauge_measurements n ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = misc.TimeGuesser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Guess // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } headerIndices := map[string]int{} for i, f := range headers { headerIndices[strings.Replace( strings.ToLower( strings.TrimSpace(f)), " ", "_", -1)] = i } var missing []string for _, m := range [...]string{ "fk_gauge_id", "measure_date", "from", // "sender", "language_code", "country_code", "date_issue", "reference_code", "value", // "water_level", "predicted", // "is_waterlevel", "value_min", "value_max", "date_info", "originator", // "source_organization", } { if _, found := headerIndices[m]; !found { missing = append(missing, m) } } if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } inCm, _ := rescale("cm") scaler := func(row []string) (func(float32) float32, error) { idx, found := headerIndices["unit"] if !found { return inCm, nil } unit := row[idx] if unit == "cm" { return inCm, nil } s, err := rescale(unit) return s, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() ids := []int64{} args := make([]interface{}, 19) args[18] = false // staging_done lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } convert, err := scaler(row) if err != nil { return nil, fmt.Errorf("line %d: %v", line, err) } gids := row[headerIndices["fk_gauge_id"]] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } args[0] = gid.CountryCode args[1] = gid.LoCode args[2] = gid.FairwaySection args[3] = gid.Orc args[4] = gid.Hectometre md, err := guessDate(row[headerIndices["measure_date"]]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } args[5] = md args[6] = row[headerIndices["from"]] args[7] = row[headerIndices["language_code"]] args[8] = row[headerIndices["country_code"]] dis, err := guessDate(row[headerIndices["date_issue"]]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } args[9] = dis args[10] = row[headerIndices["reference_code"]] value, err := strconv.ParseFloat(row[headerIndices["value"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } args[11] = convert(float32(value)) predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true" args[12] = predicted args[13] = true // is_waterlevel valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) } args[14] = convert(float32(valueMin)) valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) } args[15] = convert(float32(valueMax)) din, err := guessDate(row[headerIndices["date_info"]]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } args[16] = din args[17] = row[headerIndices["originator"]] // args[18] (staging_done) is set to true outside the loop. var id int64 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) } ids = append(ids, id) if _, err := trackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", id, ); err != nil { return nil, err } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }