Mercurial > gemma
view pkg/imports/bn.go @ 5591:0011f50cf216 surveysperbottleneckid
Removed no longer used alternative api for surveys/ endpoint.
As bottlenecks in the summary for SR imports are now identified by
their id and no longer by the (not guarantied to be unique!) name,
there is no longer the need to request survey data by the name+date
tuple (which isn't reliable anyway). So the workaround was now
reversed.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Wed, 06 Apr 2022 13:30:29 +0200 |
parents | f2204f91d286 |
children | 6270951dda28 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha Wilde <sascha.wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) VALUES ( $1, $2::tstzrange, isrs_fromText($3), $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 ) RETURNING id ` // We only check for NOT NULL values, for correct compairison with // values, which might be null (and then muyst not be compairt with `=' // but with `IS NULL' is comlicated and that we are checking more than // only (bottleneck_id, validity, date_info) is luxury already. findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, stretch, responsible_country, limiting, date_info, source_organization, staging_done ) = ( SELECT $1, $2::tstzrange, isrs_fromText($3), (SELECT r FROM r), $6, $7, $8::timestamptz, $9, true ) ` findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks WHERE (bottleneck_id, staging_done) = ($1, true) AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) VALUES ($1, $2) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` bnStageDoneDeleteSQL = ` DELETE FROM waterway.bottlenecks WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND deletion )` bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND NOT deletion )` ) // Description gives a short info about relevant facts of this import. func (bn *Bottleneck) Description([]string) (string, error) { return bn.URL, nil } type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, _ Feedback, ) error { _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) } return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{ Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}}, } resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "the requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } type bnStmts struct { insert *sql.Stmt findExactMatch *sql.Stmt findIntersecting *sql.Stmt insertMaterial *sql.Stmt track *sql.Stmt } func (bs *bnStmts) close() { for _, s := range []**sql.Stmt{ &bs.insert, &bs.findExactMatch, &bs.findIntersecting, &bs.insertMaterial, &bs.track, } { if *s != nil { (*s).Close() *s = nil } } } func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error { for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &bs.insert}, {findExactMatchBottleneckSQL, &bs.findExactMatch}, {findIntersectingBottleneckSQL, &bs.findIntersecting}, {insertBottleneckMaterialSQL, &bs.insertMaterial}, {trackImportDeletionSQL, &bs.track}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return err } } return nil } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var bs bnStmts defer bs.close() if err := bs.prepare(ctx, conn); err != nil { return nil, err } var nids []string seenOldBnIDs := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() savepoint := Savepoint(ctx, tx, "insert_bottlenck") for _, bn := range bns { if err := storeBottleneck( ctx, tx, importID, feedback, bn, &nids, seenOldBnIDs, tolerance, &bs, savepoint, ); err != nil { return nil, err } } if err = tx.Commit(); err != nil { return nil, err } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, tx *sql.Tx, importID int64, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, seenOldBnIDs map[int64]bool, tolerance float64, bs *bnStmts, savepoint func(func() error) error, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) var ( tfrom, tto pgtype.Timestamptz uBound pgtype.BoundType ) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not // contain the optional validity information. // // The current solution makes every change in the data an // update, which might be not the best solution. Alternative // approaches could include usingthe Date Info of the data as // start date, and maybe even inspecting the details of changed // data for hints wether an update or historisation with a new // version is advisable. // // Never the less, the current solution "just works" for the // time being and reflects the upstream systems... -- sw feedback.Info("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { if kv.Key == fromKey || kv.Key == toKey { t, err := time.Parse(time.RFC3339, kv.Value) if err != nil { return err } fromTo[kv.Key] = t } } if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Error("Missing start date") return nil } if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive feedback.Info("Valid from %s to %s", fromTo[fromKey].Format(common.TimeFormat), fromTo[toKey].Format(common.TimeFormat)) } else { uBound = pgtype.Unbounded feedback.Info("Valid from %s", fromTo[fromKey].Format(common.TimeFormat)) } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } // Check if a bottleneck identical to the one we would insert already // exists: var old int64 err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.From_ISRS, bn.To_ISRS, country, limiting, bn.Date_Info, bn.Source, ).Scan(&old) switch { case err == sql.ErrNoRows: // We dont have a matching old. case err != nil: return err default: // We could check if the materials are also matching -- but per // specification the Date_Info would hvae to change on that kind of // change anyway. So actually we are already checking more in depth than // required. feedback.Info("unchanged") return nil } // Additional Information, only of interest when we change something, so // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) feedback.Info("Reference gauge: %s", bn.Fk_g_fid) // Check if the new bottleneck intersects with the validity of existing // for the same bottleneck_id we consider this an update and mark the // old data for deletion. bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext( ctx, bn.Bottleneck_id, &validity, ) if err != nil { return err } defer bns.Close() // Mark old intersecting bottleneck data for deletion. Don't worry about // materials, they will be deleted via cascading. var oldBnIDs []int64 for bns.Next() { var oldID int64 err := bns.Scan(&oldID) if err != nil { return err } oldBnIDs = append(oldBnIDs, oldID) } if err := bns.Err(); err != nil { return err } switch { case len(oldBnIDs) == 1: feedback.Info("Bottleneck '%s' "+ "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) case len(oldBnIDs) > 1: // This case is unexpected and should only happen when historic // data in the bottleneck service was changed subsequently... // We handle it gracefully anyway, but warn. feedback.Warn("More than one Bottelneck '%s' "+ "with intersecting validity already exists: "+ "REPLACING all of them!", bn.Bottleneck_id) } // We write the actual tracking information for deletion of superseded // bottlenecks later to the database -- AFTER the new bottleneck was // created successfully. That way, we don't change the database, when // an error arises during inserting the new data. var bnID int64 // Add new BN data: if err := savepoint(func() error { if err := tx.StmtContext(ctx, bs.insert).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ).Scan(&bnID); err != nil { return err } // Add new materials for _, material := range materials { if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext( ctx, bnID, material, ); err != nil { return err } } return nil }); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) return nil } // Now that adding BNs to staging was successful, write import tracking // information to database: for _, oldID := range oldBnIDs { // It is possible, that two new bottlenecks intersect with the // same old one, therefore we have to handle duplicates in // oldBnIds. if !seenOldBnIDs[oldID] { if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", oldID, true, ); err != nil { return err } seenOldBnIDs[oldID] = true } } if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", bnID, false, ); err != nil { return err } *nids = append(*nids, bn.Bottleneck_id) return nil }