Mercurial > gemma
view pkg/imports/bn.go @ 4134:9864d682ab47
bn-import: Downgraded info on missing validity from Warning to Info.
The validity is optional in the upstream data and currently only
available in AT. As warnings this was cluttering the log, therefor
the downgrade to regular "Info".
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Thu, 01 Aug 2019 19:27:30 +0200 |
parents | eb08fbe33074 |
children | f464cbcdf2f2 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha Wilde <sascha.wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) VALUES ( $1, $2::tstzrange, isrs_fromText($3), $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 ) RETURNING id ` // We only check for NOT NULL values, for correct compairison with // values, which might be null (and then muyst not be compairt with `=' // but with `IS NULL' is comlicated and that we are checking more than // only (bottleneck_id, validity, date_info) is luxury already. findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, stretch, responsible_country, limiting, date_info, source_organization, staging_done ) = ( SELECT $1, $2::tstzrange, isrs_fromText($3), (SELECT r FROM r), $6, $7, $8::timestamptz, $9, true ) ` findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks WHERE (bottleneck_id, staging_done) = ($1, true) AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) SELECT * FROM unnest(CAST($1 AS int[])) AS bns, unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` bnStageDoneDeleteSQL = ` DELETE FROM waterway.bottlenecks WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND deletion )` bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND NOT deletion )` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) } return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{ Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}}, } resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "The requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } type bnStmts struct { insert *sql.Stmt findExactMatch *sql.Stmt findIntersecting *sql.Stmt insertMaterial *sql.Stmt track *sql.Stmt } func (bs *bnStmts) close() { for _, s := range []**sql.Stmt{ &bs.insert, &bs.findExactMatch, &bs.findIntersecting, &bs.insertMaterial, &bs.track, } { if *s != nil { (*s).Close() *s = nil } } } func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error { for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &bs.insert}, {findExactMatchBottleneckSQL, &bs.findExactMatch}, {findIntersectingBottleneckSQL, &bs.findIntersecting}, {insertBottleneckMaterialSQL, &bs.insertMaterial}, {trackImportDeletionSQL, &bs.track}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return err } } return nil } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var bs bnStmts defer bs.close() if err := bs.prepare(ctx, conn); err != nil { return nil, err } var nids []string seenOldBnIds := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() for _, bn := range bns { if err := storeBottleneck( tx, ctx, importID, feedback, bn, &nids, seenOldBnIds, tolerance, &bs, ); err != nil { return nil, err } } if err = tx.Commit(); err != nil { return nil, err } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( tx *sql.Tx, ctx context.Context, importID int64, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, seenOldBnIds map[int64]bool, tolerance float64, bs *bnStmts, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) var tfrom, tto pgtype.Timestamptz var uBound pgtype.BoundType if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not // contain the optional validity information. // // The current solution makes every change in the data an // update, which might be not the best solution. Alternative // approaches could include usingthe Date Info of the data as // start date, and maybe even inspecting the details of changed // data for hints wether an update or historisation with a new // version is advisable. // // Never the less, the current solution "just works" for the // time being and reflects the upstream systems... -- sw feedback.Info("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { k := string(kv.Key) if k == fromKey || k == toKey { if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { return err } else { fromTo[k] = t } } } if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Warn("Missing start date") return nil } if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive feedback.Info("Valid from %s to %s", fromTo[fromKey].Format(common.TimeFormat), fromTo[toKey].Format(common.TimeFormat)) } else { uBound = pgtype.Unbounded feedback.Info("Valid from %s", fromTo[fromKey].Format(common.TimeFormat)) } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } // Check if an bottleneck identical to the one we would insert already // exists: var old int64 err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.From_ISRS, bn.To_ISRS, country, limiting, bn.Date_Info, bn.Source, ).Scan(&old) switch { case err == sql.ErrNoRows: // We dont have a matching old. case err != nil: return err default: // We could check if the materials are also matching -- but per // specification the Date_Info would hvae to change on that kind of // change anyway. So actualy we ar alreayd checking more in dpth than // required. feedback.Info("unchanged") return nil } // Additional Information, only of interest when we change something, so // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) // Check if the new bottleneck intersects with the validity of existing // for the same bottleneck_id we consider this an update and mark the // old data for deletion. bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext( ctx, bn.Bottleneck_id, &validity, ) if err != nil { return err } defer bns.Close() // Mark old intersecting bottleneck data for deletion. Don't worry about // materials, they will be deleted via cascading. var oldBnIds []int64 for bns.Next() { var oldID int64 err := bns.Scan(&oldID) if err != nil { return err } oldBnIds = append(oldBnIds, oldID) } if err := bns.Err(); err != nil { return err } switch { case len(oldBnIds) == 1: feedback.Info("Bottelneck '%s' "+ "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) case len(oldBnIds) > 1: // This case is unexpected and should only happen when historic // data in the bottleneck service was changed subsequently... // We handle it gracefully anyway, but warn. feedback.Warn("More than one Bottelneck '%s' "+ "with intersecting validity already exists: "+ "REPLACING all of them!", bn.Bottleneck_id) } // We write the actual tracking information for deletion of superseded // bottlenecks later to the databs -- AFTER the new bottleneck was // created successfully. That way, we don't change the database, when // an error arises during inserting the new data. var bnIds []int64 // Add new BN data: savepoint := Savepoint(ctx, tx, "insert_bottlenck") err = savepoint(func() error { bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance) if err != nil { return err } defer bns.Close() for bns.Next() { var nid int64 if err := bns.Scan(&nid); err != nil { return err } bnIds = append(bnIds, nid) } if err := bns.Err(); err != nil { return err } // Add new materials if len(bnIds) > 0 && materials != nil { var ( pgBnIds pgtype.Int8Array pgMaterials pgtype.VarcharArray ) pgBnIds.Set(bnIds) pgMaterials.Set(materials) // Insert riverbed materials if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx, &pgBnIds, &pgMaterials, ); err != nil { return err } } return nil }) if err != nil { feedback.Warn(pgxutils.ReadableError{err}.Error()) return nil } // Now that adding BNs to staging was successful, write import tracking // information to database: for _, oldID := range oldBnIds { // It is possible, that two new bottlenecks intersect with the // same old noe, therefor we have to handle duplicates in // oldBnIds. if !seenOldBnIds[oldID] { if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", oldID, true, ); err != nil { return err } seenOldBnIds[oldID] = true } } for _, nid := range bnIds { if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", nid, false, ); err != nil { return err } } *nids = append(*nids, bn.Bottleneck_id) return nil }