Mercurial > gemma
view pkg/imports/bn.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | cfc523c70e90 |
children | 452bc714bfd9 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "time" "gemma.intevation.de/gemma/pkg/soap/ifbn" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( hasBottleneckSQL = ` SELECT true FROM waterway.bottlenecks WHERE bottleneck_id = $1` insertBottleneckSQL = ` INSERT INTO waterway.bottlenecks ( bottleneck_id, fk_g_fid, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) VALUES( $1, isrs_fromText($2), $3, $4, isrsrange(isrs_fromText($5), isrs_fromText($6)), ISRSrange_area( isrsrange(isrs_fromText($5), isrs_fromText($6)), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $7, $8, $9, $10, $11, $12, $13 ) RETURNING id` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() []string { return []string{ "gauges", "bottlenecks", } } const ( bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass)` ) // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (string, string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return "", "" } return m[1], m[2] } func revisitingTime(s string) int { v, err := strconv.Atoi(s) if err != nil { v = 0 } return v } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{} resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New("no Bottlenecks found") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback) } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var hasStmt, insertStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {hasBottleneckSQL, &hasStmt}, {insertBottleneckSQL, &insertStmt}, {trackImportSQL, &trackStmt}, } { var err error if *x.stmt, err = tx.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var nids []string nextBN: for _, bn := range bns { var found bool err := hasStmt.QueryRowContext(ctx, bn.Bottleneck_id).Scan(&found) switch { case err == sql.ErrNoRows: // This is good. case err != nil: return nil, err case found: // TODO: Deep comparison database vs. SOAP. continue nextBN } rb, lb := splitRBLB(bn.Rb_lb) var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var nid int64 err = insertStmt.QueryRowContext( ctx, bn.Bottleneck_id, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime(bn.Revisiting_time), limiting, bn.Date_Info, bn.Source, ).Scan(&nid) if err != nil { return nil, err } nids = append(nids, bn.Bottleneck_id) if _, err := trackStmt.ExecContext( ctx, importID, "waterway.bottlenecks", nid, ); err != nil { return nil, err } feedback.Info("Inserted '%s' into database", bn.OBJNAM) } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks found") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) if err := tx.Commit(); err != nil { return nil, err } feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil }