Mercurial > gemma
changeset 4113:f39d20427e89 request_hist_bns
WIP: Reworked bottleneck import:
- all available bottlenecks from the service are loaded (including
historic data) - kudos to Tom Gottfried
- Bottlenecks which are identical to an exiting (non staged) one are
recognized as "unchanged" and ignored.
- All remaining Bottlenecks are inserted into Staging area
- On "staging done" all existing bottlenecks with conflicting
(bottleneck_id,validity) are deleted (replaced by the new
bottlenecks).
Known Issues:
- Sometimes the identification of unchanged bottlenecks does not work,
needs debugging.
- The front end does not handle historic data correctly (we need to
define what "correctly" should be).
- Save points should be used to distinguish ignored errors from fatal
ones.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 31 Jul 2019 17:04:31 +0200 |
parents | 692aba3e8b85 |
children | b9ddc6cdc871 |
files | pkg/imports/bn.go |
diffstat | 1 files changed, 123 insertions(+), 230 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/bn.go Tue Jul 30 13:14:47 2019 +0200 +++ b/pkg/imports/bn.go Wed Jul 31 17:04:31 2019 +0200 @@ -90,51 +90,6 @@ RETURNING id ` - updateBottleneckSQL = ` -WITH -bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))), -r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) -UPDATE waterway.bottlenecks SET ( - bottleneck_id, - validity, - gauge_location, - objnam, - nobjnm, - stretch, - area, - rb, - lb, - responsible_country, - revisiting_time, - limiting, - date_info, - source_organization -) = ( - $2, - $3::tstzrange, - isrs_fromText($4), - $5, - $6, - (SELECT r FROM r), - ISRSrange_area( - ISRSrange_axis((SELECT r FROM r), - $16), - (SELECT ST_Collect(CAST(area AS geometry)) - FROM waterway.waterway_area)), - $9, - $10, - $11, - $12::smallint, - $13, - $14::timestamptz, - $15 -) -WHERE id=$1 -RETURNING id -` - findExactMatchBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), @@ -175,44 +130,10 @@ ) ` - findMatchBottleneckSQL = ` + findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks -WHERE ( - bottleneck_id, - validity, - staging_done -) = ( - $1, - $2::tstzrange, - true -) -` - // FIXME: Is this still neede wtih the new simplified historization - // model? My intuition is: no it isn't and should be removed, but we - // should double check before doing so... [sw] - // - // Alignment with gauge validity might have generated new entries - // for the same time range. Thus, remove the old ones - deleteObsoleteBNSQL = ` -DELETE FROM waterway.bottlenecks -WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) -` - - fixBNValiditySQL = ` -UPDATE waterway.bottlenecks SET - -- Set enddate of old entry to new startdate in case of overlap: - validity = validity - $2 -WHERE bottleneck_id = $1 - AND validity && $2 AND NOT validity <@ $2 -` - - deleteBottleneckMaterialSQL = ` -WITH del AS ( - DELETE FROM waterway.bottlenecks_riverbed_materials - WHERE bottleneck_id = ANY($1) - AND riverbed <> ALL($2) - RETURNING riverbed) -SELECT DISTINCT riverbed FROM del +WHERE (bottleneck_id, staging_done) = ($1, true) + AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` @@ -224,6 +145,25 @@ unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` + + bnStageDoneDeleteSQL = ` +DELETE FROM waterway.bottlenecks WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND deletion +)` + + bnStageDoneSQL = ` +UPDATE waterway.bottlenecks SET staging_done = true +WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND NOT deletion +)` ) type bnJobCreator struct{} @@ -245,22 +185,16 @@ } } -const ( - bnStageDoneSQL = ` -UPDATE waterway.bottlenecks SET staging_done = true -WHERE id IN ( - SELECT key from import.track_imports - WHERE import_id = $1 AND - relation = 'waterway.bottlenecks'::regclass)` -) - // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { - _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) + _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) + if err == nil { + _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) + } return err } @@ -325,8 +259,7 @@ feedback.Info("Found %d bottlenecks for import", len(bns)) - var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, + var insertStmt, findExactMatchingBNStmt, findIntersectingBNStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { @@ -334,14 +267,10 @@ stmt **sql.Stmt }{ {insertBottleneckSQL, &insertStmt}, - {updateBottleneckSQL, &updateStmt}, {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, - {findMatchBottleneckSQL, &findMatchingBNStmt}, - {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, - {fixBNValiditySQL, &fixValidityStmt}, - {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, + {findIntersectingBottleneckSQL, &findIntersectingBNStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, - {trackImportSQL, &trackStmt}, + {trackImportDeletionSQL, &trackStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { @@ -351,16 +280,18 @@ } var nids []string + seenOldBnIds := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) for _, bn := range bns { if err := storeBottleneck( - ctx, importID, conn, feedback, bn, &nids, tolerance, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, + ctx, importID, conn, feedback, bn, + &nids, &seenOldBnIds, tolerance, + insertStmt, + findExactMatchingBNStmt, + findIntersectingBNStmt, + insertMaterialStmt, trackStmt); err != nil { return nil, err } @@ -386,11 +317,11 @@ feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, + seenOldBnIds *map[int64]bool, tolerance float64, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, + insertStmt, + findExactMatchingBNStmt, findIntersectingBNStmt, + insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) @@ -410,7 +341,7 @@ // version is advisable. // // Never the less, the current solution "just works" for the - // rtime being... -- sw + // time being and reflects the upstream systems... -- sw feedback.Warn("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded @@ -510,11 +441,14 @@ bn.Date_Info, bn.Source, ) - if err != nil { return err } defer bns.Close() + // We could check if the materials are also matching -- but per + // specification the Date_Info would hvae to change on that kind of + // change anyway. So actualy we ar alreayd checking more in dpth than + // required. if bns.Next() { feedback.Info("unchanged") return nil @@ -524,71 +458,80 @@ // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) - // Check if an bottleneck with the same identity - // (bottleneck_id,validity) already exists: - // Check if an bottleneck identical to the one we would insert already - // exists: - var existing_bn_id *int64 - err = findMatchingBNStmt.QueryRowContext(ctx, - bn.Bottleneck_id, - &validity, - ).Scan(&existing_bn_id) - switch { - case err == sql.ErrNoRows: - existing_bn_id = nil - case err != nil: - // This is unexpected and propably a serious error - return err - } - tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() - var bnIds []int64 - if existing_bn_id != nil { + // Check if the new bottleneck intersects with the validity of existing + // for the same bottleneck_id we consider this an update and mark the + // old data for deletion. + bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext( + ctx, bn.Bottleneck_id, &validity, + ) + if err != nil { + return err + } + defer bns.Close() + + // Mark old intersecting bottleneck data for deletion. Don't worry about + // materials, they will be deleted via cascading. + var oldBnIds []int64 + for bns.Next() { + var oldID int64 + err := bns.Scan(&oldID) + if err != nil { + return err + } + oldBnIds = append(oldBnIds, oldID) + } + + switch { + case len(oldBnIds) == 1: feedback.Info("Bottelneck '%s' "+ - "with matching validity already exists:"+ + "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) - // Updating existnig BN data: - bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, - existing_bn_id, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ) - } else { - // New BN data: - bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ) + case len(oldBnIds) > 1: + // This case is unexpected and should only happen when historic + // data in the bottleneck service was changed subsequently... + // We handle it gracefully anyway, but warn. + feedback.Warn("More than one Bottelneck '%s' "+ + "with intersecting validity already exists: "+ + "REPLACING all of them!", bn.Bottleneck_id) } + + for _, oldID := range oldBnIds { + // It is possible, that two new bottlenecks intersect with the + // same old noe, therefor we have to handle duplicates in + // oldBnIds. + if !(*seenOldBnIds)[oldID] { + if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( + ctx, importID, "waterway.bottlenecks", oldID, true, + ); err != nil { + return err + } + (*seenOldBnIds)[oldID] = true + } + } + + var bnIds []int64 + // Add new BN data: + bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, + bn.Bottleneck_id, + &validity, + bn.Fk_g_fid, + bn.OBJNAM, + bn.NOBJNM, + bn.From_ISRS, bn.To_ISRS, + rb, + lb, + country, + revisitingTime, + limiting, + bn.Date_Info, + bn.Source, + tolerance) if err != nil { feedback.Warn(pgxutils.ReadableError{err}.Error()) return nil @@ -611,58 +554,15 @@ return nil } - // Remove obsolete bottleneck version entries - var pgBnIds pgtype.Int8Array + // Add new materials + var ( + pgBnIds pgtype.Int8Array + pgMaterials pgtype.VarcharArray + ) pgBnIds.Set(bnIds) - if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, - bn.Bottleneck_id, - &validity, - &pgBnIds, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } - - // Set end of validity of old version to start of new version - // in case of overlap - if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, - bn.Bottleneck_id, - validity, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } + pgMaterials.Set(materials) if materials != nil { - // Remove obsolete riverbed materials - var pgMaterials pgtype.VarcharArray - pgMaterials.Set(materials) - mtls, err := tx.StmtContext(ctx, - deleteMaterialStmt).QueryContext(ctx, - &pgBnIds, - &pgMaterials, - ) - if err != nil { - return err - } - defer mtls.Close() - for mtls.Next() { - var delMat string - if err := mtls.Scan(&delMat); err != nil { - return err - } - feedback.Warn("Removed riverbed material %s", delMat) - } - if err := mtls.Err(); err != nil { - return err - } - // Insert riverbed materials if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, &pgBnIds, @@ -675,25 +575,18 @@ } // Only add new BN data to tracking for staging review. - // - // FIXME: Review for updated bottlenecks is currently not possible, as - // the update is done instantly in place. - if existing_bn_id == nil { - for _, nid := range bnIds { - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, - ); err != nil { - return err - } + for _, nid := range bnIds { + if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( + ctx, importID, "waterway.bottlenecks", nid, false, + ); err != nil { + return err } } if err = tx.Commit(); err != nil { return err } - // See above... - if existing_bn_id == nil { - *nids = append(*nids, bn.Bottleneck_id) - } + + *nids = append(*nids, bn.Bottleneck_id) return nil }