Mercurial > gemma
diff pkg/imports/bn.go @ 3666:db87f34805fb
Align bottleneck validity at gauges
Ensuring the validity of a bottleneck version is always contained
by the validity of the referenced gauge version allows to reliably
determine matching reference values of the gauge at a point in time.
Since this implies that a bottleneck version might be cut into more
than one time ranges, the concept of having only one non-erased
version is no longer applicable and replaced by using the 'current'
version of a bottleneck.
Fairway availability data are always kept with the 'current'
bottleneck version to have them at hand alltogether for analyses
over longer time ranges.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Sat, 15 Jun 2019 14:36:50 +0200 |
parents | 29ef6d41e4af |
children | 0227670dedd5 |
line wrap: on
line diff
--- a/pkg/imports/bn.go Sat Jun 15 09:24:28 2019 +0200 +++ b/pkg/imports/bn.go Sat Jun 15 14:36:50 2019 +0200 @@ -43,21 +43,6 @@ const BNJobKind JobKind = "bn" const ( - hasBottleneckSQL = ` -WITH upd AS ( - UPDATE waterway.bottlenecks SET - erased = true - WHERE bottleneck_id = $1 - AND NOT erased - -- Don't touch old entry if new validity contains old: will be updated - AND NOT validity <@ $2 - RETURNING 1 -) --- Decide whether a new version will be INSERTed -SELECT EXISTS(SELECT 1 FROM upd) - OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1) -` - insertBottleneckSQL = ` WITH bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), @@ -80,15 +65,11 @@ limiting, date_info, source_organization -) VALUES ( +) SELECT $1, - $2, - isrs_fromText($3), - COALESCE( - (SELECT validity FROM waterway.gauges - WHERE location = isrs_fromText($3) - AND validity @> lower(CAST($2 AS tstzrange))), - tstzrange(NULL, NULL)), + validity * $2, -- intersections with gauge validity ranges + location, + validity, $4, $5, (SELECT r FROM r), @@ -104,67 +85,57 @@ $12, $13, $14 -) -RETURNING id` + FROM waterway.gauges + WHERE location = isrs_fromText($3) AND validity && $2 +ON CONFLICT (bottleneck_id, validity) DO UPDATE SET + gauge_location = EXCLUDED.gauge_location, + gauge_validity = EXCLUDED.gauge_validity, + objnam = EXCLUDED.objnam, + nobjnm = EXCLUDED.nobjnm, + stretch = EXCLUDED.stretch, + area = EXCLUDED.area, + rb = EXCLUDED.rb, + lb = EXCLUDED.lb, + responsible_country = EXCLUDED.responsible_country, + revisiting_time = EXCLUDED.revisiting_time, + limiting = EXCLUDED.limiting, + date_info = EXCLUDED.date_info, + source_organization = EXCLUDED.source_organization +RETURNING id +` + + // Alignment with gauge validity might have generated new entries + // for the same time range. Thus, remove the old ones + deleteObsoleteBNSQL = ` +DELETE FROM waterway.bottlenecks +WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) +` fixBNValiditySQL = ` UPDATE waterway.bottlenecks SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE bottleneck_id = $1 - AND validity && $2 - AND erased -` - - updateBottleneckSQL = ` -WITH -bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))), -r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) -UPDATE waterway.bottlenecks b SET - gauge_location = isrs_fromtext($2), - gauge_validity = COALESCE( - (SELECT validity FROM waterway.gauges g - WHERE g.location = isrs_fromText($2) - AND g.validity @> lower(b.validity)), - tstzrange(NULL, NULL)), - objnam = $3, - nobjnm = $4, - stretch = (SELECT r FROM r), - area = ISRSrange_area( - ISRSrange_axis((SELECT r FROM r), $14), - (SELECT ST_Collect(CAST(area AS geometry)) - FROM waterway.waterway_area)), - rb = $7, - lb = $8, - responsible_country = $9, - revisiting_time = $10, - limiting = $11, - date_info = $12, - source_organization = $13, - validity = $15 -WHERE bottleneck_id = $1 - AND NOT erased - AND $12 > date_info -RETURNING id + AND validity && $2 AND NOT validity <@ $2 ` deleteBottleneckMaterialSQL = ` -DELETE FROM waterway.bottlenecks_riverbed_materials -WHERE bottleneck_id = $1 - AND riverbed <> ALL($2) -RETURNING riverbed +WITH del AS ( + DELETE FROM waterway.bottlenecks_riverbed_materials + WHERE bottleneck_id = ANY($1) + AND riverbed <> ALL($2) + RETURNING riverbed) +SELECT DISTINCT riverbed FROM del ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed -) VALUES ( - $1, - $2 -) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING +) SELECT * +FROM unnest(CAST($1 AS int[])) AS bns, + unnest(CAST($2 AS varchar[])) AS materials +ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` ) @@ -265,17 +236,16 @@ feedback.Info("Found %d bottlenecks for import", len(bns)) - var hasStmt, insertStmt, fixValidityStmt, updateStmt, + var insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ - {hasBottleneckSQL, &hasStmt}, {insertBottleneckSQL, &insertStmt}, + {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, {fixBNValiditySQL, &fixValidityStmt}, - {updateBottleneckSQL, &updateStmt}, {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, {insertBottleneckMaterialSQL, &insertMaterialStmt}, {trackImportSQL, &trackStmt}, @@ -294,7 +264,7 @@ for _, bn := range bns { if err := storeBottleneck( ctx, importID, conn, feedback, bn, &nids, tolerance, - hasStmt, insertStmt, fixValidityStmt, updateStmt, + insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { return nil, err } @@ -321,7 +291,7 @@ bn *ifbn.BottleNeckType, nids *[]string, tolerance float64, - hasStmt, insertStmt, fixValidityStmt, updateStmt, + insertStmt, deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) @@ -408,99 +378,57 @@ } defer tx.Rollback() - var isNew bool - var nid int64 - err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx, + var bnIds []int64 + bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, bn.Bottleneck_id, - validity, - ).Scan(&isNew) - switch { - case err != nil: + &validity, + bn.Fk_g_fid, + bn.OBJNAM, + bn.NOBJNM, + bn.From_ISRS, bn.To_ISRS, + rb, + lb, + country, + revisitingTime, + limiting, + bn.Date_Info, + bn.Source, + tolerance, + ) + if err != nil { + feedback.Warn(handleError(err).Error()) + return nil + } + defer bns.Close() + for bns.Next() { + var nid int64 + if err := bns.Scan(&nid); err != nil { + return err + } + bnIds = append(bnIds, nid) + } + if err := bns.Err(); err != nil { + return err + } + if len(bnIds) == 0 { + feedback.Warn( + "No gauge matching '%s' or given time available", bn.Fk_g_fid) + return nil + } + + // Remove obsolete bottleneck version entries + var pgBnIds pgtype.Int8Array + pgBnIds.Set(bnIds) + if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, + bn.Bottleneck_id, + &validity, + &pgBnIds, + ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return err2 } return nil - case isNew: - err = tx.StmtContext(ctx, insertStmt).QueryRowContext( - ctx, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ).Scan(&nid) - if err != nil { - feedback.Warn(handleError(err).Error()) - return nil - } - feedback.Info("insert new version") - case !isNew: - // try to update - err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, - bn.Bottleneck_id, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - &validity, - ).Scan(&nid) - switch { - case err == sql.ErrNoRows: - feedback.Info("unchanged") - if err := tx.Rollback(); err != nil { - return err - } - return nil - case err != nil: - feedback.Warn(handleError(err).Error()) - if err := tx.Rollback(); err != nil { - return err - } - return nil - default: - feedback.Info("update") - - // Remove obsolete riverbed materials - var pgMaterials pgtype.VarcharArray - pgMaterials.Set(materials) - mtls, err := tx.StmtContext(ctx, - deleteMaterialStmt).QueryContext(ctx, - nid, - &pgMaterials, - ) - if err != nil { - return err - } - defer mtls.Close() - for mtls.Next() { - var delMat string - if err := mtls.Scan(&delMat); err != nil { - return err - } - feedback.Warn("Removed riverbed material %s", delMat) - } - if err := mtls.Err(); err != nil { - return err - } - } } // Set end of validity of old version to start of new version @@ -516,22 +444,47 @@ return nil } - // Insert riverbed materials if materials != nil { - for _, mat := range materials { - if _, err := tx.StmtContext(ctx, - insertMaterialStmt).ExecContext( - ctx, nid, mat); err != nil { - feedback.Warn("Failed to insert riverbed material '%s'", mat) - feedback.Warn(handleError(err).Error()) + // Remove obsolete riverbed materials + var pgMaterials pgtype.VarcharArray + pgMaterials.Set(materials) + mtls, err := tx.StmtContext(ctx, + deleteMaterialStmt).QueryContext(ctx, + &pgBnIds, + &pgMaterials, + ) + if err != nil { + return err + } + defer mtls.Close() + for mtls.Next() { + var delMat string + if err := mtls.Scan(&delMat); err != nil { + return err } + feedback.Warn("Removed riverbed material %s", delMat) + } + if err := mtls.Err(); err != nil { + return err + } + + // Insert riverbed materials + if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, + &pgBnIds, + &pgMaterials, + ); err != nil { + feedback.Warn("Failed to insert riverbed materials") + feedback.Warn(handleError(err).Error()) + return nil } } - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, - ); err != nil { - return err + for _, nid := range bnIds { + if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( + ctx, importID, "waterway.bottlenecks", nid, + ); err != nil { + return err + } } if err = tx.Commit(); err != nil { return err