Mercurial > gemma
view pkg/imports/bn.go @ 3705:7006b92c0334
Handle updates (vs. historized and new versions) separately.
We need this distinction as updated data currently can not be
reviewed. More precisely: it can not be declined after review, as the
old data is updated in place.
The current exclusion from the review is a workaround and not meant to
be the final solution. Note that there are additional minor problems,
like the fact that the updated data is not counted as changed data for
the import.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 19 Jun 2019 17:00:08 +0200 |
parents | b07511ff859e |
children | aa7bede70b96 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha Wilde <sascha.wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) SELECT $1, validity * $2, -- intersections with gauge validity ranges location, validity, $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 RETURNING id ` updateBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) UPDATE waterway.bottlenecks SET ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) = ( SELECT $2, validity * $3, -- intersections with gauge validity ranges location, validity, $5, $6, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $16), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $9, $10, $11, $12::smallint, $13, $14::timestamptz, $15 FROM waterway.gauges WHERE location = isrs_fromText($4) AND validity && $3 ) WHERE id=$1 RETURNING id ` findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization, staging_done ) = ( SELECT $1, validity * $2, -- intersections with gauge validity ranges location, validity, $4, $5, (SELECT r FROM r), $8, $9, $10, $11::smallint, $12, $13::timestamptz, $14, true FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 ) ` findMatchBottleneckSQL = ` SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, staging_done ) = ( SELECT $1, validity * $2, -- intersections with gauge validity ranges true FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 ) ` // Alignment with gauge validity might have generated new entries // for the same time range. Thus, remove the old ones deleteObsoleteBNSQL = ` DELETE FROM waterway.bottlenecks WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) ` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 AND validity && $2 AND NOT validity <@ $2 ` deleteBottleneckMaterialSQL = ` WITH del AS ( DELETE FROM waterway.bottlenecks_riverbed_materials WHERE bottleneck_id = ANY($1) AND riverbed <> ALL($2) RETURNING riverbed) SELECT DISTINCT riverbed FROM del ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) SELECT * FROM unnest(CAST($1 AS int[])) AS bns, unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } const ( bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass)` ) // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{} resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "The requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &insertStmt}, {updateBottleneckSQL, &updateStmt}, {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, {findMatchBottleneckSQL, &findMatchingBNStmt}, {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, {fixBNValiditySQL, &fixValidityStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var nids []string feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) var tfrom, tto pgtype.Timestamptz var uBound pgtype.BoundType if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not // contain the optional validity information. // // The current solution makes every change in the data an // update, which might be not the best solution. Alternative // approaches could include usingthe Date Info of the data as // start date, and maybe even inspecting the details of changed // data for hints wether an update or historisation with a new // version is advisable. // // Never the less, the current solution "just works" for the // rtime being... -- sw feedback.Warn("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { k := string(kv.Key) if k == fromKey || k == toKey { if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { return err } else { fromTo[k] = t } } } if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Warn("Missing start date") return nil } if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive } else { uBound = pgtype.Unbounded } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } // Check if an bottleneck identical to the one we would insert already // exists: bns, err := findExactMatchingBNStmt.QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, ) if err != nil { return err } defer bns.Close() if bns.Next() { feedback.Info("unchanged") return nil } // Check if an bottleneck with the same identity // (bottleneck_id,validity) already exists: // Check if an bottleneck identical to the one we would insert already // exists: var existing_bn_id *int64 err = findMatchingBNStmt.QueryRowContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, ).Scan(&existing_bn_id) switch { case err == sql.ErrNoRows: existing_bn_id = nil case err != nil: // This is unexpected and propably a serious error return err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() var bnIds []int64 if existing_bn_id != nil { feedback.Info("Bottelneck '%s' "+ "with matching validity already exists:"+ "UPDATING", bn.Bottleneck_id) // Updating existnig BN data: bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, existing_bn_id, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ) } else { // New BN data: bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ) } if err != nil { feedback.Warn(handleError(err).Error()) return nil } defer bns.Close() for bns.Next() { var nid int64 if err := bns.Scan(&nid); err != nil { return err } bnIds = append(bnIds, nid) } if err := bns.Err(); err != nil { feedback.Warn(handleError(err).Error()) return nil } if len(bnIds) == 0 { feedback.Warn( "No gauge matching '%s' or given time available", bn.Fk_g_fid) return nil } // Remove obsolete bottleneck version entries var pgBnIds pgtype.Int8Array pgBnIds.Set(bnIds) if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, bn.Bottleneck_id, &validity, &pgBnIds, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, bn.Bottleneck_id, validity, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } if materials != nil { // Remove obsolete riverbed materials var pgMaterials pgtype.VarcharArray pgMaterials.Set(materials) mtls, err := tx.StmtContext(ctx, deleteMaterialStmt).QueryContext(ctx, &pgBnIds, &pgMaterials, ) if err != nil { return err } defer mtls.Close() for mtls.Next() { var delMat string if err := mtls.Scan(&delMat); err != nil { return err } feedback.Warn("Removed riverbed material %s", delMat) } if err := mtls.Err(); err != nil { return err } // Insert riverbed materials if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, &pgBnIds, &pgMaterials, ); err != nil { feedback.Warn("Failed to insert riverbed materials") feedback.Warn(handleError(err).Error()) return nil } } // Only add new BN data to tracking for staging review. // // FIXME: Review for updated bottlenecks is currently not possible, as // the update is done instantly in place. if existing_bn_id == nil { for _, nid := range bnIds { if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( ctx, importID, "waterway.bottlenecks", nid, ); err != nil { return err } } } if err = tx.Commit(); err != nil { return err } // See above... if existing_bn_id == nil { *nids = append(*nids, bn.Bottleneck_id) } return nil }