Mercurial > gemma
view pkg/imports/agm.go @ 1743:85d0f017fbee
Approved gauges measurements: Open CSV and read headers.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Jan 2019 21:58:35 +0100 |
parents | 44398a8bdf94 |
children | 807569b08513 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "log" "os" "path/filepath" "gemma.intevation.de/gemma/pkg/common" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create(_ JobKind, data string) (Job, error) { agm := new(ApprovedGaugeMeasurements) if err := common.FromJSONString(data, agm); err != nil { return nil, err } return agm, nil } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // TODO: re-add staging_done field in table and fix RLS policy // issue for raw import. agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id = ( SELECT key from waterway.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneSQL, id) return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } for i, f := range headers { log.Printf("%d: %s\n", i, f) } return nil, nil }