Mercurial > gemma
view pkg/imports/bn.go @ 4850:18d5461bec5d
Fixed some golint issues.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 19 Nov 2019 12:45:52 +0100 |
parents | ca6a5f722471 |
children | faabfed7f0e9 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha Wilde <sascha.wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) VALUES ( $1, $2::tstzrange, isrs_fromText($3), $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 ) RETURNING id ` // We only check for NOT NULL values, for correct compairison with // values, which might be null (and then muyst not be compairt with `=' // but with `IS NULL' is comlicated and that we are checking more than // only (bottleneck_id, validity, date_info) is luxury already. findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, stretch, responsible_country, limiting, date_info, source_organization, staging_done ) = ( SELECT $1, $2::tstzrange, isrs_fromText($3), (SELECT r FROM r), $6, $7, $8::timestamptz, $9, true ) ` findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks WHERE (bottleneck_id, staging_done) = ($1, true) AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) VALUES ($1, $2) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` bnStageDoneDeleteSQL = ` DELETE FROM waterway.bottlenecks WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND deletion )` bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass AND NOT deletion )` ) func (bn *Bottleneck) Description() (string, error) { var descs []string descs = append(descs, bn.URL) return strings.Join(descs, "|"), nil } type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) } return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{ Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}}, } resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "the requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } type bnStmts struct { insert *sql.Stmt findExactMatch *sql.Stmt findIntersecting *sql.Stmt insertMaterial *sql.Stmt track *sql.Stmt } func (bs *bnStmts) close() { for _, s := range []**sql.Stmt{ &bs.insert, &bs.findExactMatch, &bs.findIntersecting, &bs.insertMaterial, &bs.track, } { if *s != nil { (*s).Close() *s = nil } } } func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error { for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &bs.insert}, {findExactMatchBottleneckSQL, &bs.findExactMatch}, {findIntersectingBottleneckSQL, &bs.findIntersecting}, {insertBottleneckMaterialSQL, &bs.insertMaterial}, {trackImportDeletionSQL, &bs.track}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return err } } return nil } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var bs bnStmts defer bs.close() if err := bs.prepare(ctx, conn); err != nil { return nil, err } var nids []string seenOldBnIDs := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() savepoint := Savepoint(ctx, tx, "insert_bottlenck") for _, bn := range bns { if err := storeBottleneck( ctx, tx, importID, feedback, bn, &nids, seenOldBnIDs, tolerance, &bs, savepoint, ); err != nil { return nil, err } } if err = tx.Commit(); err != nil { return nil, err } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, tx *sql.Tx, importID int64, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, seenOldBnIDs map[int64]bool, tolerance float64, bs *bnStmts, savepoint func(func() error) error, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) var ( tfrom, tto pgtype.Timestamptz uBound pgtype.BoundType ) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not // contain the optional validity information. // // The current solution makes every change in the data an // update, which might be not the best solution. Alternative // approaches could include usingthe Date Info of the data as // start date, and maybe even inspecting the details of changed // data for hints wether an update or historisation with a new // version is advisable. // // Never the less, the current solution "just works" for the // time being and reflects the upstream systems... -- sw feedback.Info("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { if kv.Key == fromKey || kv.Key == toKey { t, err := time.Parse(time.RFC3339, kv.Value) if err != nil { return err } fromTo[kv.Key] = t } } if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Error("Missing start date") return nil } if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive feedback.Info("Valid from %s to %s", fromTo[fromKey].Format(common.TimeFormat), fromTo[toKey].Format(common.TimeFormat)) } else { uBound = pgtype.Unbounded feedback.Info("Valid from %s", fromTo[fromKey].Format(common.TimeFormat)) } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } // Check if a bottleneck identical to the one we would insert already // exists: var old int64 err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.From_ISRS, bn.To_ISRS, country, limiting, bn.Date_Info, bn.Source, ).Scan(&old) switch { case err == sql.ErrNoRows: // We dont have a matching old. case err != nil: return err default: // We could check if the materials are also matching -- but per // specification the Date_Info would hvae to change on that kind of // change anyway. So actually we are already checking more in depth than // required. feedback.Info("unchanged") return nil } // Additional Information, only of interest when we change something, so // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) // Check if the new bottleneck intersects with the validity of existing // for the same bottleneck_id we consider this an update and mark the // old data for deletion. bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext( ctx, bn.Bottleneck_id, &validity, ) if err != nil { return err } defer bns.Close() // Mark old intersecting bottleneck data for deletion. Don't worry about // materials, they will be deleted via cascading. var oldBnIDs []int64 for bns.Next() { var oldID int64 err := bns.Scan(&oldID) if err != nil { return err } oldBnIDs = append(oldBnIDs, oldID) } if err := bns.Err(); err != nil { return err } switch { case len(oldBnIDs) == 1: feedback.Info("Bottleneck '%s' "+ "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) case len(oldBnIDs) > 1: // This case is unexpected and should only happen when historic // data in the bottleneck service was changed subsequently... // We handle it gracefully anyway, but warn. feedback.Warn("More than one Bottelneck '%s' "+ "with intersecting validity already exists: "+ "REPLACING all of them!", bn.Bottleneck_id) } // We write the actual tracking information for deletion of superseded // bottlenecks later to the database -- AFTER the new bottleneck was // created successfully. That way, we don't change the database, when // an error arises during inserting the new data. var bnID int64 // Add new BN data: if err := savepoint(func() error { if err := tx.StmtContext(ctx, bs.insert).QueryRowContext( ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ).Scan(&bnID); err != nil { return err } // Add new materials for _, material := range materials { if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext( ctx, bnID, material, ); err != nil { return err } } return nil }); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) return nil } // Now that adding BNs to staging was successful, write import tracking // information to database: for _, oldID := range oldBnIDs { // It is possible, that two new bottlenecks intersect with the // same old one, therefore we have to handle duplicates in // oldBnIds. if !seenOldBnIDs[oldID] { if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", oldID, true, ); err != nil { return err } seenOldBnIDs[oldID] = true } } if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", bnID, false, ); err != nil { return err } *nids = append(*nids, bn.Bottleneck_id) return nil }