Mercurial > gemma
view pkg/imports/bn.go @ 3678:8f58851927c0
client: make layer factory only return new layer config for individual maps
instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers.
But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory
reuses the same objects it created before, per map.
author | Markus Kottlaender <markus@intevation.de> |
---|---|
date | Mon, 17 Jun 2019 17:31:35 +0200 |
parents | db87f34805fb |
children | 0227670dedd5 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "errors" "regexp" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import // bottlenecks from an IFBN SOAP service. type Bottleneck struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Tolerance used for axis snapping Tolerance float64 `json:"tolerance"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // BNJobKind is the import queue type identifier. const BNJobKind JobKind = "bn" const ( insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), r AS (SELECT isrsrange( (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) INSERT INTO waterway.bottlenecks ( bottleneck_id, validity, gauge_location, gauge_validity, objnam, nobjnm, stretch, area, rb, lb, responsible_country, revisiting_time, limiting, date_info, source_organization ) SELECT $1, validity * $2, -- intersections with gauge validity ranges location, validity, $4, $5, (SELECT r FROM r), ISRSrange_area( ISRSrange_axis((SELECT r FROM r), $15), (SELECT ST_Collect(CAST(area AS geometry)) FROM waterway.waterway_area)), $8, $9, $10, $11, $12, $13, $14 FROM waterway.gauges WHERE location = isrs_fromText($3) AND validity && $2 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET gauge_location = EXCLUDED.gauge_location, gauge_validity = EXCLUDED.gauge_validity, objnam = EXCLUDED.objnam, nobjnm = EXCLUDED.nobjnm, stretch = EXCLUDED.stretch, area = EXCLUDED.area, rb = EXCLUDED.rb, lb = EXCLUDED.lb, responsible_country = EXCLUDED.responsible_country, revisiting_time = EXCLUDED.revisiting_time, limiting = EXCLUDED.limiting, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id ` // Alignment with gauge validity might have generated new entries // for the same time range. Thus, remove the old ones deleteObsoleteBNSQL = ` DELETE FROM waterway.bottlenecks WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) ` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 AND validity && $2 AND NOT validity <@ $2 ` deleteBottleneckMaterialSQL = ` WITH del AS ( DELETE FROM waterway.bottlenecks_riverbed_materials WHERE bottleneck_id = ANY($1) AND riverbed <> ALL($2) RETURNING riverbed) SELECT DISTINCT riverbed FROM del ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed ) SELECT * FROM unnest(CAST($1 AS int[])) AS bns, unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) type bnJobCreator struct{} func init() { RegisterJobCreator(BNJobKind, bnJobCreator{}) } func (bnJobCreator) Description() string { return "bottlenecks" } func (bnJobCreator) AutoAccept() bool { return false } func (bnJobCreator) Create() Job { return new(Bottleneck) } func (bnJobCreator) Depends() [2][]string { return [2][]string{ {"bottlenecks", "bottlenecks_riverbed_materials"}, {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, } } const ( bnStageDoneSQL = ` UPDATE waterway.bottlenecks SET staging_done = true WHERE id IN ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.bottlenecks'::regclass)` ) // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) return err } // CleanUp of a bottleneck import is a NOP. func (*Bottleneck) CleanUp() error { return nil } var rblbRe = regexp.MustCompile(`(..)_(..)`) func splitRBLB(s string) (*string, *string) { m := rblbRe.FindStringSubmatch(s) if len(m) == 0 { return nil, nil } return &m[1], &m[2] } // Do executes the actual bottleneck import. func (bn *Bottleneck) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) req := &ifbn.Export_bn_by_isrs{} resp, err := client.Export_bn_by_isrs(req) if err != nil { return nil, err } if resp.Export_bn_by_isrsResult == nil { return nil, errors.New( "The requested service returned no bottlenecks") } return resp.Export_bn_by_isrsResult.BottleNeckType, nil } return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), importID int64, conn *sql.Conn, feedback Feedback, tolerance float64, ) (interface{}, error) { start := time.Now() bns, err := fetch() if err != nil { return nil, err } feedback.Info("Found %d bottlenecks for import", len(bns)) var insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertBottleneckSQL, &insertStmt}, {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, {fixBNValiditySQL, &fixValidityStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var nids []string feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } } if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) feedback.Info("Import of bottlenecks was successful") summary := struct { Bottlenecks []string `json:"bottlenecks"` }{ Bottlenecks: nids, } return &summary, nil } func storeBottleneck( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { feedback.Warn("Missing validity information") return nil } const ( fromKey = "Valid_from_date" toKey = "Valid_to_date" ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { k := string(kv.Key) if k == fromKey || k == toKey { if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { return err } else { fromTo[k] = t } } } var tfrom, tto pgtype.Timestamptz if t, ok := fromTo[fromKey]; ok { tfrom.Set(t) } else { feedback.Warn("Missing start date") return nil } var uBound pgtype.BoundType if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive } else { uBound = pgtype.Unbounded } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: uBound, Status: pgtype.Present, } rb, lb := splitRBLB(bn.Rb_lb) var revisitingTime *int if bn.Revisiting_time != nil && len(strings.TrimSpace(*bn.Revisiting_time)) > 0 { i, err := strconv.Atoi(*bn.Revisiting_time) if err != nil { feedback.Warn( "Cannot convert given revisiting time '%s' to number of months", *bn.Revisiting_time) } else { revisitingTime = &i } } var limiting, country string if bn.Limiting_factor != nil { limiting = string(*bn.Limiting_factor) } if bn.Responsible_country != nil { country = string(*bn.Responsible_country) } var materials []string if bn.Riverbed != nil { for _, material := range bn.Riverbed.Material { if material != nil { materials = append(materials, string(*material)) } } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() var bnIds []int64 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, bn.OBJNAM, bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, rb, lb, country, revisitingTime, limiting, bn.Date_Info, bn.Source, tolerance, ) if err != nil { feedback.Warn(handleError(err).Error()) return nil } defer bns.Close() for bns.Next() { var nid int64 if err := bns.Scan(&nid); err != nil { return err } bnIds = append(bnIds, nid) } if err := bns.Err(); err != nil { return err } if len(bnIds) == 0 { feedback.Warn( "No gauge matching '%s' or given time available", bn.Fk_g_fid) return nil } // Remove obsolete bottleneck version entries var pgBnIds pgtype.Int8Array pgBnIds.Set(bnIds) if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, bn.Bottleneck_id, &validity, &pgBnIds, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, bn.Bottleneck_id, validity, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil } if materials != nil { // Remove obsolete riverbed materials var pgMaterials pgtype.VarcharArray pgMaterials.Set(materials) mtls, err := tx.StmtContext(ctx, deleteMaterialStmt).QueryContext(ctx, &pgBnIds, &pgMaterials, ) if err != nil { return err } defer mtls.Close() for mtls.Next() { var delMat string if err := mtls.Scan(&delMat); err != nil { return err } feedback.Warn("Removed riverbed material %s", delMat) } if err := mtls.Err(); err != nil { return err } // Insert riverbed materials if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, &pgBnIds, &pgMaterials, ); err != nil { feedback.Warn("Failed to insert riverbed materials") feedback.Warn(handleError(err).Error()) return nil } } for _, nid := range bnIds { if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( ctx, importID, "waterway.bottlenecks", nid, ); err != nil { return err } } if err = tx.Commit(); err != nil { return err } *nids = append(*nids, bn.Bottleneck_id) return nil }