# HG changeset patch # User Sascha Wilde # Date 1564678431 -7200 # Node ID 49ec33a7f954040af926ca8eb5c7291a6a0af455 # Parent 52f7264265bbe62b971fb83a80f33ccb3e5d5a2e# Parent 980f12d3c76615fb2083cf1981d86e7e75702775 Merged request_hist_bns branch with improved bottleneck import. diff -r 52f7264265bb -r 49ec33a7f954 pkg/imports/bn.go --- a/pkg/imports/bn.go Thu Aug 01 17:02:09 2019 +0200 +++ b/pkg/imports/bn.go Thu Aug 01 18:53:51 2019 +0200 @@ -24,6 +24,7 @@ "strings" "time" + "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" "github.com/jackc/pgx/pgtype" @@ -89,69 +90,23 @@ RETURNING id ` - updateBottleneckSQL = ` -WITH -bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))), -r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) -UPDATE waterway.bottlenecks SET ( - bottleneck_id, - validity, - gauge_location, - objnam, - nobjnm, - stretch, - area, - rb, - lb, - responsible_country, - revisiting_time, - limiting, - date_info, - source_organization -) = ( - $2, - $3::tstzrange, - isrs_fromText($4), - $5, - $6, - (SELECT r FROM r), - ISRSrange_area( - ISRSrange_axis((SELECT r FROM r), - $16), - (SELECT ST_Collect(CAST(area AS geometry)) - FROM waterway.waterway_area)), - $9, - $10, - $11, - $12::smallint, - $13, - $14::timestamptz, - $15 -) -WHERE id=$1 -RETURNING id -` - + // We only check for NOT NULL values, for correct compairison with + // values, which might be null (and then muyst not be compairt with `=' + // but with `IS NULL' is comlicated and that we are checking more than + // only (bottleneck_id, validity, date_info) is luxury already. findExactMatchBottleneckSQL = ` WITH -bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), +bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))), r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) + (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), + (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, - objnam, - nobjnm, stretch, - rb, - lb, responsible_country, - revisiting_time, limiting, date_info, source_organization, @@ -160,58 +115,19 @@ $1, $2::tstzrange, isrs_fromText($3), - $4, - $5, (SELECT r FROM r), - $8, + $6, + $7, + $8::timestamptz, $9, - $10, - $11::smallint, - $12, - $13::timestamptz, - $14, true ) ` - findMatchBottleneckSQL = ` + findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks -WHERE ( - bottleneck_id, - validity, - staging_done -) = ( - $1, - $2::tstzrange, - true -) -` - // FIXME: Is this still neede wtih the new simplified historization - // model? My intuition is: no it isn't and should be removed, but we - // should double check before doing so... [sw] - // - // Alignment with gauge validity might have generated new entries - // for the same time range. Thus, remove the old ones - deleteObsoleteBNSQL = ` -DELETE FROM waterway.bottlenecks -WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) -` - - fixBNValiditySQL = ` -UPDATE waterway.bottlenecks SET - -- Set enddate of old entry to new startdate in case of overlap: - validity = validity - $2 -WHERE bottleneck_id = $1 - AND validity && $2 AND NOT validity <@ $2 -` - - deleteBottleneckMaterialSQL = ` -WITH del AS ( - DELETE FROM waterway.bottlenecks_riverbed_materials - WHERE bottleneck_id = ANY($1) - AND riverbed <> ALL($2) - RETURNING riverbed) -SELECT DISTINCT riverbed FROM del +WHERE (bottleneck_id, staging_done) = ($1, true) + AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` @@ -223,6 +139,25 @@ unnest(CAST($2 AS varchar[])) AS materials ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` + + bnStageDoneDeleteSQL = ` +DELETE FROM waterway.bottlenecks WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND deletion +)` + + bnStageDoneSQL = ` +UPDATE waterway.bottlenecks SET staging_done = true +WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND NOT deletion +)` ) type bnJobCreator struct{} @@ -244,22 +179,16 @@ } } -const ( - bnStageDoneSQL = ` -UPDATE waterway.bottlenecks SET staging_done = true -WHERE id IN ( - SELECT key from import.track_imports - WHERE import_id = $1 AND - relation = 'waterway.bottlenecks'::regclass)` -) - // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { - _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) + _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) + if err == nil { + _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) + } return err } @@ -287,7 +216,9 @@ fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) - req := &ifbn.Export_bn_by_isrs{} + req := &ifbn.Export_bn_by_isrs{ + Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}}, + } resp, err := client.Export_bn_by_isrs(req) if err != nil { @@ -305,6 +236,48 @@ return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } +type bnStmts struct { + insert *sql.Stmt + findExactMatch *sql.Stmt + findIntersecting *sql.Stmt + insertMaterial *sql.Stmt + track *sql.Stmt +} + +func (bs *bnStmts) close() { + for _, s := range []**sql.Stmt{ + &bs.insert, + &bs.findExactMatch, + &bs.findIntersecting, + &bs.insertMaterial, + &bs.track, + } { + if *s != nil { + (*s).Close() + *s = nil + } + } +} + +func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error { + for _, x := range []struct { + sql string + stmt **sql.Stmt + }{ + {insertBottleneckSQL, &bs.insert}, + {findExactMatchBottleneckSQL, &bs.findExactMatch}, + {findIntersectingBottleneckSQL, &bs.findIntersecting}, + {insertBottleneckMaterialSQL, &bs.insertMaterial}, + {trackImportDeletionSQL, &bs.track}, + } { + var err error + if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { + return err + } + } + return nil +} + func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), @@ -322,46 +295,37 @@ feedback.Info("Found %d bottlenecks for import", len(bns)) - var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, - insertMaterialStmt, trackStmt *sql.Stmt + var bs bnStmts + defer bs.close() - for _, x := range []struct { - sql string - stmt **sql.Stmt - }{ - {insertBottleneckSQL, &insertStmt}, - {updateBottleneckSQL, &updateStmt}, - {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, - {findMatchBottleneckSQL, &findMatchingBNStmt}, - {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, - {fixBNValiditySQL, &fixValidityStmt}, - {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, - {insertBottleneckMaterialSQL, &insertMaterialStmt}, - {trackImportSQL, &trackStmt}, - } { - var err error - if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { - return nil, err - } - defer (*x.stmt).Close() + if err := bs.prepare(ctx, conn); err != nil { + return nil, err } var nids []string + seenOldBnIds := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + for _, bn := range bns { if err := storeBottleneck( - ctx, importID, conn, feedback, bn, &nids, tolerance, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, - trackStmt); err != nil { + tx, ctx, importID, feedback, bn, + &nids, seenOldBnIds, tolerance, + &bs, + ); err != nil { return nil, err } } + if err = tx.Commit(); err != nil { + return nil, err + } + if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } @@ -377,18 +341,15 @@ } func storeBottleneck( + tx *sql.Tx, ctx context.Context, importID int64, - conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, + seenOldBnIds map[int64]bool, tolerance float64, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, - trackStmt *sql.Stmt, + bs *bnStmts, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) @@ -407,7 +368,7 @@ // version is advisable. // // Never the less, the current solution "just works" for the - // rtime being... -- sw + // time being and reflects the upstream systems... -- sw feedback.Warn("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded @@ -438,8 +399,13 @@ if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive + feedback.Info("Valid from %s to %s", + fromTo[fromKey].Format(common.TimeFormat), + fromTo[toKey].Format(common.TimeFormat)) } else { uBound = pgtype.Unbounded + feedback.Info("Valid from %s", + fromTo[fromKey].Format(common.TimeFormat)) } } @@ -487,27 +453,28 @@ // Check if an bottleneck identical to the one we would insert already // exists: - bns, err := findExactMatchingBNStmt.QueryContext(ctx, + var old int64 + err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( + ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, - rb, - lb, country, - revisitingTime, limiting, bn.Date_Info, bn.Source, - ) - - if err != nil { + ).Scan(&old) + switch { + case err == sql.ErrNoRows: + // We dont have a matching old. + case err != nil: return err - } - defer bns.Close() - if bns.Next() { + default: + // We could check if the materials are also matching -- but per + // specification the Date_Info would hvae to change on that kind of + // change anyway. So actualy we ar alreayd checking more in dpth than + // required. feedback.Info("unchanged") return nil } @@ -516,55 +483,67 @@ // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) - // Check if an bottleneck with the same identity - // (bottleneck_id,validity) already exists: - // Check if an bottleneck identical to the one we would insert already - // exists: - var existing_bn_id *int64 - err = findMatchingBNStmt.QueryRowContext(ctx, - bn.Bottleneck_id, - &validity, - ).Scan(&existing_bn_id) - switch { - case err == sql.ErrNoRows: - existing_bn_id = nil - case err != nil: - // This is unexpected and propably a serious error - return err - } - - tx, err := conn.BeginTx(ctx, nil) + // Check if the new bottleneck intersects with the validity of existing + // for the same bottleneck_id we consider this an update and mark the + // old data for deletion. + bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext( + ctx, bn.Bottleneck_id, &validity, + ) if err != nil { return err } - defer tx.Rollback() + defer bns.Close() + + // Mark old intersecting bottleneck data for deletion. Don't worry about + // materials, they will be deleted via cascading. + var oldBnIds []int64 + for bns.Next() { + var oldID int64 + err := bns.Scan(&oldID) + if err != nil { + return err + } + oldBnIds = append(oldBnIds, oldID) + } + + if err := bns.Err(); err != nil { + return err + } + + switch { + case len(oldBnIds) == 1: + feedback.Info("Bottelneck '%s' "+ + "with intersecting validity already exists: "+ + "UPDATING", bn.Bottleneck_id) + case len(oldBnIds) > 1: + // This case is unexpected and should only happen when historic + // data in the bottleneck service was changed subsequently... + // We handle it gracefully anyway, but warn. + feedback.Warn("More than one Bottelneck '%s' "+ + "with intersecting validity already exists: "+ + "REPLACING all of them!", bn.Bottleneck_id) + } + + for _, oldID := range oldBnIds { + // It is possible, that two new bottlenecks intersect with the + // same old noe, therefor we have to handle duplicates in + // oldBnIds. + if !seenOldBnIds[oldID] { + if _, err := tx.StmtContext(ctx, bs.track).ExecContext( + ctx, importID, "waterway.bottlenecks", oldID, true, + ); err != nil { + return err + } + seenOldBnIds[oldID] = true + } + } var bnIds []int64 - if existing_bn_id != nil { - feedback.Info("Bottelneck '%s' "+ - "with matching validity already exists:"+ - "UPDATING", bn.Bottleneck_id) - // Updating existnig BN data: - bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, - existing_bn_id, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ) - } else { - // New BN data: - bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, + // Add new BN data: + savepoint := Savepoint(ctx, tx, "insert_bottlenck") + + err = savepoint(func() error { + bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, @@ -578,114 +557,56 @@ limiting, bn.Date_Info, bn.Source, - tolerance, - ) - } + tolerance) + if err != nil { + return err + } + defer bns.Close() + for bns.Next() { + var nid int64 + if err := bns.Scan(&nid); err != nil { + return err + } + bnIds = append(bnIds, nid) + } + if err := bns.Err(); err != nil { + return err + } + + // Add new materials + if len(bnIds) > 0 && materials != nil { + var ( + pgBnIds pgtype.Int8Array + pgMaterials pgtype.VarcharArray + ) + pgBnIds.Set(bnIds) + pgMaterials.Set(materials) + + // Insert riverbed materials + if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx, + + &pgBnIds, + &pgMaterials, + ); err != nil { + return err + } + } + return nil + }) if err != nil { feedback.Warn(pgxutils.ReadableError{err}.Error()) return nil } - defer bns.Close() - for bns.Next() { - var nid int64 - if err := bns.Scan(&nid); err != nil { + + // Only add new BN data to tracking for staging review. + for _, nid := range bnIds { + if _, err := tx.StmtContext(ctx, bs.track).ExecContext( + ctx, importID, "waterway.bottlenecks", nid, false, + ); err != nil { return err } - bnIds = append(bnIds, nid) - } - if err := bns.Err(); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - if len(bnIds) == 0 { - feedback.Warn( - "No gauge matching '%s' or given time available", bn.Fk_g_fid) - return nil - } - - // Remove obsolete bottleneck version entries - var pgBnIds pgtype.Int8Array - pgBnIds.Set(bnIds) - if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, - bn.Bottleneck_id, - &validity, - &pgBnIds, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } - - // Set end of validity of old version to start of new version - // in case of overlap - if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, - bn.Bottleneck_id, - validity, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } - - if materials != nil { - // Remove obsolete riverbed materials - var pgMaterials pgtype.VarcharArray - pgMaterials.Set(materials) - mtls, err := tx.StmtContext(ctx, - deleteMaterialStmt).QueryContext(ctx, - &pgBnIds, - &pgMaterials, - ) - if err != nil { - return err - } - defer mtls.Close() - for mtls.Next() { - var delMat string - if err := mtls.Scan(&delMat); err != nil { - return err - } - feedback.Warn("Removed riverbed material %s", delMat) - } - if err := mtls.Err(); err != nil { - return err - } - - // Insert riverbed materials - if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, - &pgBnIds, - &pgMaterials, - ); err != nil { - feedback.Warn("Failed to insert riverbed materials") - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } } - // Only add new BN data to tracking for staging review. - // - // FIXME: Review for updated bottlenecks is currently not possible, as - // the update is done instantly in place. - if existing_bn_id == nil { - for _, nid := range bnIds { - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, - ); err != nil { - return err - } - } - } - - if err = tx.Commit(); err != nil { - return err - } - // See above... - if existing_bn_id == nil { - *nids = append(*nids, bn.Bottleneck_id) - } + *nids = append(*nids, bn.Bottleneck_id) return nil } diff -r 52f7264265bb -r 49ec33a7f954 pkg/soap/ifbn/service.go --- a/pkg/soap/ifbn/service.go Thu Aug 01 17:02:09 2019 +0200 +++ b/pkg/soap/ifbn/service.go Thu Aug 01 18:53:51 2019 +0200 @@ -36,7 +36,6 @@ } type Export_bn_by_isrs struct { - //XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"` XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"` ISRS *ArrayOfISRSPair `xml:"ISRS,omitempty"` @@ -826,13 +825,11 @@ } type RequestedPeriod struct { - //XMLName xml.Name `xml:"http://www.ris.eu/wamos/common/3.0 RequestedPeriod"` - - Date_start time.Time `xml:"Date_start,omitempty"` + Date_start *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_start,omitempty"` - Date_end time.Time `xml:"Date_end,omitempty"` + Date_end *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_end,omitempty"` - Value_interval int32 `xml:"Value_interval,omitempty"` + Value_interval *int32 `xml:"http://www.ris.eu/wamos/common/3.0 Value_interval,omitempty"` } type ArrayOfString struct { diff -r 52f7264265bb -r 49ec33a7f954 schema/gemma.sql --- a/schema/gemma.sql Thu Aug 01 17:02:09 2019 +0200 +++ b/schema/gemma.sql Thu Aug 01 18:53:51 2019 +0200 @@ -649,9 +649,6 @@ id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, bottleneck_id varchar NOT NULL, validity tstzrange NOT NULL CHECK (NOT isempty(validity)), - UNIQUE (bottleneck_id, validity), - EXCLUDE USING GiST (bottleneck_id WITH =, validity WITH &&) - DEFERRABLE INITIALLY DEFERRED, gauge_location isrs NOT NULL, objnam varchar, nobjnm varchar, @@ -673,7 +670,12 @@ -- XXX: Also an attribut of sounding result? date_info timestamp with time zone NOT NULL, source_organization varchar NOT NULL, - staging_done boolean NOT NULL DEFAULT false + staging_done boolean NOT NULL DEFAULT false, + UNIQUE (bottleneck_id, validity, staging_done), + EXCLUDE USING GiST (bottleneck_id WITH =, + validity WITH &&, + CAST(staging_done AS int) WITH =) + DEFERRABLE INITIALLY DEFERRED ) CREATE CONSTRAINT TRIGGER waterway_bottlenecks_reference_gauge AFTER INSERT OR UPDATE OF gauge_location ON bottlenecks diff -r 52f7264265bb -r 49ec33a7f954 schema/updates/1102/01.bn_constraint.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1102/01.bn_constraint.sql Thu Aug 01 18:53:51 2019 +0200 @@ -0,0 +1,14 @@ +ALTER TABLE waterway.bottlenecks + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_key, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_excl, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_key, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_excl; + +ALTER TABLE waterway.bottlenecks + ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_key + UNIQUE (bottleneck_id, validity, staging_done), + ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_excl + EXCLUDE USING GiST (bottleneck_id WITH =, + validity WITH &&, + CAST(staging_done AS int) WITH =) + DEFERRABLE INITIALLY DEFERRED; diff -r 52f7264265bb -r 49ec33a7f954 schema/version.sql --- a/schema/version.sql Thu Aug 01 17:02:09 2019 +0200 +++ b/schema/version.sql Thu Aug 01 18:53:51 2019 +0200 @@ -1,1 +1,1 @@ -INSERT INTO gemma_schema_version(version) VALUES (1101); +INSERT INTO gemma_schema_version(version) VALUES (1102);