Mercurial > gemma
view pkg/imports/st.go @ 3163:d9903cb34842
Handle failing INSERTs gracefully during gauges import
Using the special table EXCLUDED in INSERT statements makes
functionally no difference, but makes editing of the statements easier.
Since reference water levels are not deleted all at once before
(re-)importing anymore, take the chance to report those that were
deleted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 06 May 2019 13:25:49 +0200 |
parents | a75c546ef498 |
children | 5c8ecab9f2d4 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "gemma.intevation.de/gemma/pkg/models" ) type Stretch struct { Name string `json:"name"` From models.Isrs `json:"from"` To models.Isrs `json:"to"` Tolerance float32 `json:"tolerance"` ObjNam string `json:"objnam"` NObjNam *string `json:"nobjnam"` Source string `json:"source-organization"` Date models.Date `json:"date-info"` Countries models.UniqueCountries `json:"countries"` } const STJobKind JobKind = "st" type stJobCreator struct{} func init() { RegisterJobCreator(STJobKind, stJobCreator{}) } func (stJobCreator) Description() string { return "stretch" } func (stJobCreator) AutoAccept() bool { return false } func (stJobCreator) Create() Job { return new(Stretch) } func (stJobCreator) Depends() []string { return []string{ "stretches", } } const ( stDeleteSQL = ` DELETE FROM waterway.stretches WHERE staging_done AND name = ( SELECT name FROM waterway.stretches WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.stretches'::regclass) AND NOT staging_done )` stStageDoneSQL = ` UPDATE waterway.stretches SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.stretches'::regclass)` stInsertSQL = ` WITH r AS ( SELECT isrsrange( least(($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)::isrs, ($6::char(2), $7::char(3), $8::char(5), $9::char(5), $10::int)::isrs), greatest(($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)::isrs, ($6::char(2), $7::char(3), $8::char(5), $9::char(5), $10::int)::isrs) ) AS r), axs AS ( SELECT ISRSrange_axis((SELECT r FROM r), $16::double precision) AS axs) INSERT INTO waterway.stretches ( name, stretch, area, objnam, nobjnam, date_info, source_organization ) VALUES ( $11, (SELECT r FROM r), ST_Transform(ISRSrange_area( (SELECT axs FROM axs), (SELECT ST_Buffer(axs, 10000) FROM axs)), 4326), $12, $13, $14, $15) RETURNING id` stInsertCountrySQL = ` INSERT INTO waterway.stretch_countries ( stretches_id, country_code ) VALUES ( $1, $2 )` ) // StageDone moves the imported stretch out of the staging area. func (stJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { if _, err := tx.ExecContext(ctx, stDeleteSQL, id); err != nil { return err } _, err := tx.ExecContext(ctx, stStageDoneSQL, id) return err } // CleanUp of a stretch import is a NOP. func (*Stretch) CleanUp() error { return nil } // Do executes the actual stretch import. func (st *Stretch) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() if st.Date.Time.IsZero() { st.Date = models.Date{Time: start} } feedback.Info("Storing stretch '%s'", st.Name) if len(st.Countries) == 0 { return nil, errors.New("List of countries is empty") } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL) if err != nil { return nil, err } var nobjnm sql.NullString if st.NObjNam != nil { nobjnm = sql.NullString{String: *st.NObjNam, Valid: true} } feedback.Info("Stretch from %s to %s.", st.From.String(), st.To.String()) feedback.Info("Tolerance used to snap waterway axis: %g", st.Tolerance) var id int64 if err := tx.QueryRowContext( ctx, stInsertSQL, st.From.CountryCode, st.From.LoCode, st.From.FairwaySection, st.From.Orc, st.From.Hectometre, st.To.CountryCode, st.To.LoCode, st.To.FairwaySection, st.To.Orc, st.To.Hectometre, st.Name, st.ObjNam, nobjnm, st.Date.Time, st.Source, st.Tolerance, ).Scan(&id); err != nil { return nil, handleError(err) } // store the associated countries. feedback.Info("Countries associated with stretch: %s.", st.Countries) for _, c := range st.Countries { if _, err := insertCountryStmt.ExecContext(ctx, id, c); err != nil { return nil, err } } if err := track(ctx, tx, importID, "waterway.stretches", id); err != nil { return nil, err } feedback.Info("Storing stretch '%s' took %s", st.Name, time.Since(start)) if err := tx.Commit(); err != nil { return nil, err } feedback.Info("Import of stretch was successful") summary := struct { Stretch string `json:"stretch"` }{ Stretch: st.Name, } return &summary, nil }