Mercurial > gemma
view pkg/imports/st.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | c5bbe2409a52 |
children | 6b34d0fb4498 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "gemma.intevation.de/gemma/pkg/models" ) type Stretch struct { Name string `json:"name"` From models.Isrs `json:"from"` To models.Isrs `json:"to"` ObjNam string `json:"objnam"` NObjNam *string `json:"nobjnam"` Source string `json:"source-organization"` Date models.Date `json:"date-info"` Countries models.UniqueCountries `json:"countries"` } const STJobKind JobKind = "st" type stJobCreator struct{} func init() { RegisterJobCreator(STJobKind, stJobCreator{}) } func (stJobCreator) Description() string { return "stretch" } func (stJobCreator) AutoAccept() bool { return false } func (stJobCreator) Create() Job { return new(Stretch) } func (stJobCreator) Depends() []string { return []string{ "stretches", } } const ( stDeleteSQL = ` DELETE FROM waterway.stretches WHERE staging_done AND name = ( SELECT name FROM waterway.stretches WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.stretches'::regclass) AND NOT staging_done )` stStageDoneSQL = ` UPDATE waterway.stretches SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.stretches'::regclass)` stInsertSQL = ` WITH r AS ( SELECT isrsrange( ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), ( $6::char(2), $7::char(3), $8::char(5), $9::char(5), $10::int)) AS r ) INSERT INTO waterway.stretches ( name, stretch, area, objnam, nobjnam, date_info, source_organization ) VALUES ( $11, (SELECT r FROM r), ISRSrange_area( (SELECT r FROM r), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $12, $13, $14, $15) RETURNING id` stInsertCountrySQL = ` INSERT INTO waterway.stretch_countries ( stretches_id, country_code ) VALUES ( $1, $2 )` ) // StageDone moves the imported stretch out of the staging area. func (stJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { if _, err := tx.ExecContext(ctx, stDeleteSQL, id); err != nil { return err } _, err := tx.ExecContext(ctx, stStageDoneSQL, id) return err } // CleanUp of a stretch import is a NOP. func (*Stretch) CleanUp() error { return nil } // Do executes the actual stretch import. func (st *Stretch) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() if st.Date.Time.IsZero() { st.Date = models.Date{Time: start} } feedback.Info("Storing stretch '%s'", st.Name) if len(st.Countries) == 0 { return nil, errors.New("List of countries is empty") } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL) if err != nil { return nil, err } var nobjnm sql.NullString if st.NObjNam != nil { nobjnm = sql.NullString{String: *st.NObjNam, Valid: true} } feedback.Info("Stretch from %s to %s.", st.From.String(), st.To.String()) var id int64 if err := tx.QueryRowContext( ctx, stInsertSQL, st.From.CountryCode, st.From.LoCode, st.From.FairwaySection, st.From.Orc, st.From.Hectometre, st.To.CountryCode, st.To.LoCode, st.To.FairwaySection, st.To.Orc, st.To.Hectometre, st.Name, st.ObjNam, nobjnm, st.Date.Time, st.Source, ).Scan(&id); err != nil { return nil, err } // store the associated countries. feedback.Info("Countries associated with stretch: %s.", st.Countries) for _, c := range st.Countries { if _, err := insertCountryStmt.ExecContext(ctx, id, c); err != nil { return nil, err } } if err := track(ctx, tx, importID, "waterway.stretches", id); err != nil { return nil, err } feedback.Info("Storing stretch '%s' took %s", st.Name, time.Since(start)) if err := tx.Commit(); err != nil { return nil, err } feedback.Info("Import of stretch was successful") summary := struct { Stretch string `json:"stretch"` }{ Stretch: st.Name, } return &summary, nil }