Mercurial > gemma
view pkg/imports/bn.go @ 3703:b07511ff859e
Don't include calculated area in unchanged bottleneck detection.
This makes BN imports considerably faster. The only downside is, that
when die waterway area changes there is no easy way to recalculate the
areas of existing BN. But the semantics in that case are somewhat
hard anyway (think of historization for the old area) so this should
be ok.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 19 Jun 2019 12:34:48 +0200 |
parents | 063a1883b5cb |
children | 7006b92c0334 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) SELECT $1, validity * $2, -- intersections with gauge validity ranges location, validity, $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET gauge_location = EXCLUDED.gauge_location, gauge_validity = EXCLUDED.gauge_validity, objnam = EXCLUDED.objnam, nobjnm = EXCLUDED.nobjnm, stretch = EXCLUDED.stretch, area = EXCLUDED.area, rb = EXCLUDED.rb, lb = EXCLUDED.lb, responsible_country = EXCLUDED.responsible_country, revisiting_time = EXCLUDED.revisiting_time, limiting = EXCLUDED.limiting, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id ` findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization, staging_done ) = ( SELECT $1, validity * $2, -- intersections with gauge validity ranges location, validity, $4, $5, (SELECT r FROM r), $8, $9, $10, $11::smallint, $12, $13::timestamptz, $14, true FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 ) ` // Alignment with gauge validity might have generated new entries // for the same time range. Thus, remove the old ones deleteObsoleteBNSQL = ` DELETE FROM waterway.bottlenecks WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) ` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 AND validity && $2 AND NOT validity <@ $2 ` deleteBottleneckMaterialSQL = ` WITH del AS ( DELETE FROM waterway.bottlenecks_riverbed_materials WHERE bottleneck_id = ANY($1) AND riverbed <> ALL($2) RETURNING riverbed) SELECT DISTINCT riverbed FROM del ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) SELECT * FROM unnest(CAST($1 AS int[])) AS bns, unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } const ( bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass)` ) // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{} resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "The requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &insertStmt}, {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, {fixBNValiditySQL, &fixValidityStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var nids []string feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) var tfrom, tto pgtype.Timestamptz var uBound pgtype.BoundType if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not // contain the optional validity information. // // The current solution makes every change in the data an // update, which might be not the best solution. Alternative // approaches could include usingthe Date Info of the data as // start date, and maybe even inspecting the details of changed // data for hints wether an update or historisation with a new // version is advisable. // // Never the less, the current solution "just works" for the // rtime being... -- sw feedback.Warn("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { k := string(kv.Key) if k == fromKey || k == toKey { if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { return err } else { fromTo[k] = t } } } if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Warn("Missing start date") return nil } if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive } else { uBound = pgtype.Unbounded } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } // Check if an bottleneck identical to the one we would insert already // exists: bns, err := findExactMatchingBNStmt.QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, ) if err != nil { return err } defer bns.Close() if bns.Next() { feedback.Info("unchanged") return nil } tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() var bnIds []int64 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ) if err != nil { feedback.Warn(handleError(err).Error()) return nil } defer bns.Close() for bns.Next() { var nid int64 if err := bns.Scan(&nid); err != nil { return err } bnIds = append(bnIds, nid) } if err := bns.Err(); err != nil { feedback.Warn(handleError(err).Error()) return nil } if len(bnIds) == 0 { feedback.Warn( "No gauge matching '%s' or given time available", bn.Fk_g_fid) return nil } // Remove obsolete bottleneck version entries var pgBnIds pgtype.Int8Array pgBnIds.Set(bnIds) if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, bn.Bottleneck_id, &validity, &pgBnIds, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, bn.Bottleneck_id, validity, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } if materials != nil { // Remove obsolete riverbed materials var pgMaterials pgtype.VarcharArray pgMaterials.Set(materials) mtls, err := tx.StmtContext(ctx, deleteMaterialStmt).QueryContext(ctx, &pgBnIds, &pgMaterials, ) if err != nil { return err } defer mtls.Close() for mtls.Next() { var delMat string if err := mtls.Scan(&delMat); err != nil { return err } feedback.Warn("Removed riverbed material %s", delMat) } if err := mtls.Err(); err != nil { return err } // Insert riverbed materials if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, &pgBnIds, &pgMaterials, ); err != nil { feedback.Warn("Failed to insert riverbed materials") feedback.Warn(handleError(err).Error()) return nil } } for _, nid := range bnIds { if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( ctx, importID, "waterway.bottlenecks", nid, ); err != nil { return err } } if err = tx.Commit(); err != nil { return err } *nids = append(*nids, bn.Bottleneck_id) return nil }