Mercurial > gemma
view pkg/imports/bn.go @ 3645:02951a62e8c6
'Historicise' bottlenecks on import
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 12 Jun 2019 17:11:15 +0200 |
parents | 4c585b5d4fe8 |
children | 0ec5c8ec1e44 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( hasBottleneckSQL = ` WITH upd AS ( UPDATE waterway.bottlenecks SET erased = true WHERE bottleneck_id = $1 AND NOT erased -- Don't touch old entry if validity did not change: will be updated AND validity <> $2 RETURNING 1 ) -- Decide whether a new version will be INSERTed SELECT EXISTS(SELECT 1 FROM upd) OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1) ` insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) VALUES ( $1, $2, isrs_fromText($3), COALESCE( (SELECT validity FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity @> lower(CAST($2 AS tstzrange))), tstzrange(NULL, NULL)), $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 ) RETURNING id` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 AND validity && $2 AND erased ` updateBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) UPDATE waterway.bottlenecks b SET gauge_location = isrs_fromtext($2), gauge_validity = COALESCE( (SELECT validity FROM waterway.gauges g WHERE g.location = isrs_fromText($2) AND g.validity @> lower(b.validity)), tstzrange(NULL, NULL)), objnam = $3, nobjnm = $4, stretch = (SELECT r FROM r), area = ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $14), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), rb = $7, lb = $8, responsible_country = $9, revisiting_time = $10, limiting = $11, date_info = $12, source_organization = $13 WHERE bottleneck_id = $1 AND NOT erased AND $12 > date_info RETURNING id ` deleteBottleneckMaterialSQL = ` DELETE FROM waterway.bottlenecks_riverbed_materials WHERE bottleneck_id = $1 AND riverbed <> ALL($2) RETURNING riverbed ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) VALUES ( $1, $2 ) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } const ( bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass)` ) // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{} resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "The requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var hasStmt, insertStmt, fixValidityStmt, updateStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {hasBottleneckSQL, &hasStmt}, {insertBottleneckSQL, &insertStmt}, {fixBNValiditySQL, &fixValidityStmt}, {updateBottleneckSQL, &updateStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var nids []string feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, hasStmt, insertStmt, fixValidityStmt, updateStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, hasStmt, insertStmt, fixValidityStmt, updateStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { feedback.Warn("Missing validity information") return nil } const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { k := string(kv.Key) if k == fromKey || k == toKey { if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { return err } else { fromTo[k] = t } } } var tfrom, tto pgtype.Timestamptz if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Warn("Missing start date") return nil } var uBound pgtype.BoundType if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive } else { uBound = pgtype.Unbounded } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() var isNew bool var nid int64 err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx, bn.Bottleneck_id, validity, ).Scan(&isNew) switch { case err != nil: feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil case isNew: err = tx.StmtContext(ctx, insertStmt).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ).Scan(&nid) if err != nil { feedback.Warn(handleError(err).Error()) return nil } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, bn.Bottleneck_id, validity, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } feedback.Info("insert new version") case !isNew: // try to update err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, bn.Bottleneck_id, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ).Scan(&nid) switch { case err == sql.ErrNoRows: feedback.Info("unchanged") if err := tx.Rollback(); err != nil { return err } return nil case err != nil: feedback.Warn(handleError(err).Error()) if err := tx.Rollback(); err != nil { return err } return nil default: feedback.Info("update") // Remove obsolete riverbed materials var pgMaterials pgtype.VarcharArray pgMaterials.Set(materials) mtls, err := tx.StmtContext(ctx, deleteMaterialStmt).QueryContext(ctx, nid, &pgMaterials, ) if err != nil { return err } defer mtls.Close() for mtls.Next() { var delMat string if err := mtls.Scan(&delMat); err != nil { return err } feedback.Warn("Removed riverbed material %s", delMat) } if err := mtls.Err(); err != nil { return err } } } if materials != nil { for _, mat := range materials { if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext( ctx, nid, mat); err != nil { feedback.Warn("Failed to insert riverbed material '%s'", mat) feedback.Warn(handleError(err).Error()) } } } if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( ctx, importID, "waterway.bottlenecks", nid, ); err != nil { return err } if err = tx.Commit(); err != nil { return err } *nids = append(*nids, bn.Bottleneck_id) return nil }