# HG changeset patch # User Thomas Junk # Date 1564745429 -7200 # Node ID 552ea22ed2662c2d6739d896cc196eddfc29af9f # Parent 4d7569cca5e69bda319eb307a5a170c8ac42f63b# Parent d54d86d27e82a5d9db1d9633b5c8f559b1ef8e13 merge default into improvepdf diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/components/gauge/Waterlevel.vue --- a/client/src/components/gauge/Waterlevel.vue Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/components/gauge/Waterlevel.vue Fri Aug 02 13:30:29 2019 +0200 @@ -106,7 +106,7 @@ import { endOfDay } from "date-fns"; import debounce from "debounce"; import { saveAs } from "file-saver"; -import { diagram, pdfgen, templateLoader } from "@/lib/mixins"; +import { diagram, pdfgen, templateLoader, refwaterlevels } from "@/lib/mixins"; import { HTTP } from "@/lib/http"; import { displayError } from "@/lib/errors"; import { defaultDiagramTemplate } from "@/lib/DefaultDiagramTemplate"; @@ -116,7 +116,7 @@ const d3 = Object.assign(d3Base, { lineChunked }); export default { - mixins: [diagram, pdfgen, templateLoader], + mixins: [diagram, pdfgen, templateLoader, refwaterlevels], components: { DiagramLegend: () => import("@/components/DiagramLegend") }, @@ -465,6 +465,7 @@ .attr("clip-path", "url(#waterlevel-clip)"); svg.selectAll(".hdc-line").attr("stroke", "red"); svg.selectAll(".ldc-line").attr("stroke", "green"); + svg.selectAll(".mw-line").attr("stroke", "rgb(128,128,128)"); svg.selectAll(".rn-line").attr("stroke", "rgb(128,128,128)"); svg @@ -559,32 +560,29 @@ .style("font-size", "0.8em"); }, getExtent(refWaterLevels) { + let rest; + if (refWaterLevels) { + // set min/max values for the waterlevel axis + // including HDC (+ 1/8 HDC-LDC) and LDC (- 1/4 HDC-LDC) + const { LDC, HDC } = this.determineLDCHDC(refWaterLevels); + rest = [ + { + waterlevel: HDC + (HDC - LDC) / 8 + }, + { + waterlevel: Math.max(LDC - (HDC - LDC) / 4, 0) + } + ]; + } else { + rest = []; + } return { // set min/max values for the date axis date: [ this.waterlevels[0].date, endOfDay(this.waterlevels[this.waterlevels.length - 1].date) ], - // set min/max values for the waterlevel axis - // including HDC (+ 1/8 HDC-LDC) and LDC (- 1/4 HDC-LDC) - waterlevel: d3.extent( - [ - ...this.waterlevels, - { - waterlevel: - refWaterLevels.HDC + - (refWaterLevels.HDC - refWaterLevels.LDC) / 8 - }, - { - waterlevel: Math.max( - refWaterLevels.LDC - - (refWaterLevels.HDC - refWaterLevels.LDC) / 4, - 0 - ) - } - ], - d => d.waterlevel - ) + waterlevel: d3.extent([...this.waterlevels, ...rest], d => d.waterlevel) }; }, getScale({ dimensions, extent }) { @@ -593,10 +591,11 @@ const y = d3.scaleLinear().range([dimensions.mainHeight, 0]); const x2 = d3.scaleTime().range([0, dimensions.width]); const y2 = d3.scaleLinear().range([dimensions.navHeight, 0]); - + const [lo, hi] = extent.waterlevel; // setting the min and max values for the diagram axes + const dy = Math.ceil(0.15 * (hi - lo)); x.domain(d3.extent(extent.date)); - y.domain(extent.waterlevel); + y.domain([lo - dy, hi + dy]); x2.domain(x.domain()); y2.domain(y.domain()); @@ -701,6 +700,8 @@ .call(lineChunked); }, drawNowLines({ scale, extent, diagram, navigation }) { + const [lo, hi] = extent.waterlevel; + const dy = Math.ceil(0.15 * (hi - lo)); const nowLine = d3 .line() .x(d => scale.x(d.x)) @@ -709,9 +710,7 @@ const nowLabel = selection => { selection.attr( "transform", - `translate(${scale.x(new Date())}, ${scale.y( - extent.waterlevel[1] - 16 - )})` + `translate(${scale.x(new Date())}, ${scale.y(hi + dy * 0.4)})` ); }; @@ -719,8 +718,8 @@ diagram .append("path") .datum([ - { x: new Date(), y: extent.waterlevel[0] }, - { x: new Date(), y: extent.waterlevel[1] - 20 } + { x: new Date(), y: lo - dy }, + { x: new Date(), y: hi + dy * 0.4 } ]) .attr("class", "now-line") .attr("d", nowLine); @@ -734,10 +733,7 @@ // draw in nav navigation .append("path") - .datum([ - { x: new Date(), y: extent.waterlevel[0] }, - { x: new Date(), y: extent.waterlevel[1] - 20 } - ]) + .datum([{ x: new Date(), y: hi + dy }, { x: new Date(), y: lo - dy }]) .attr("class", "now-line") .attr( "d", @@ -778,23 +774,31 @@ }; }, drawRefLines({ refWaterLevels, diagram, scale, dimensions, extent }) { - // filling area between HDC and LDC - diagram - .append("rect") - .attr("class", "hdc-ldc-area") - .attr("x", 0) - .attr("y", scale.y(refWaterLevels.HDC)) - .attr("width", dimensions.width) - .attr( - "height", - scale.y(refWaterLevels.LDC) - scale.y(refWaterLevels.HDC) - ); + if (refWaterLevels) { + const { LDC, HDC } = this.determineLDCHDC(refWaterLevels); + // filling area between HDC and LDC if both of them are available + if (LDC && HDC) { + diagram + .append("rect") + .attr("class", "hdc-ldc-area") + .attr("x", 0) + .attr("y", scale.y(HDC)) + .attr("width", dimensions.width) + .attr("height", refWaterLevels ? scale.y(LDC) - scale.y(HDC) : 0); + } + } const refWaterlevelLine = d3 .line() .x(d => scale.x(d.x)) .y(d => scale.y(d.y)); + const levelStyle = name => { + if (/HDC/.test(name)) return "hdc-line"; + if (/LDC/.test(name)) return "ldc-line"; + return `${name.toLowerCase()}-line`; + }; + for (let ref in refWaterLevels) { if (refWaterLevels[ref]) { diagram @@ -803,7 +807,7 @@ { x: 0, y: refWaterLevels[ref] }, { x: extent.date[1], y: refWaterLevels[ref] } ]) - .attr("class", ref.toLowerCase() + "-line") + .attr("class", levelStyle(ref)) .attr("d", refWaterlevelLine); diagram // label .append("rect") @@ -955,6 +959,7 @@ .attr("class", "chart-dots") .append("circle") .attr("class", "chart-dot") + .attr("opacity", 0) .attr("r", 4); // create container for the tooltip diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/components/importoverview/ImportOverview.vue --- a/client/src/components/importoverview/ImportOverview.vue Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/components/importoverview/ImportOverview.vue Fri Aug 02 13:30:29 2019 +0200 @@ -414,6 +414,7 @@ .then(response => { this.loadLogs(); this.$store.commit("imports/setReviewed", []); + this.$store.commit("gauges/deleteNashSutcliffeCache"); this.$store.dispatch("map/refreshLayers"); this.$store.dispatch("imports/loadStagingNotifications"); this.$store.dispatch("imports/loadStretches"); diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/components/layers/Layers.vue --- a/client/src/components/layers/Layers.vue Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/components/layers/Layers.vue Fri Aug 02 13:30:29 2019 +0200 @@ -86,6 +86,7 @@ this.$store.commit("application/showLayers", false); }, refreshLayers() { + this.$store.commit("gauges/deleteNashSutcliffeCache"); this.$store.dispatch("map/refreshLayers"); } } diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/components/map/Map.vue --- a/client/src/components/map/Map.vue Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/components/map/Map.vue Fri Aug 02 13:30:29 2019 +0200 @@ -291,7 +291,7 @@ }) }); this.layers.get("BOTTLENECKS").setStyle(newStyle); - + this.$store.commit("gauges/deleteNashSutcliffeCache"); this.$store.dispatch("map/refreshLayers"); }) .catch(error => { diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/lib/mixins.js --- a/client/src/lib/mixins.js Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/lib/mixins.js Fri Aug 02 13:30:29 2019 +0200 @@ -47,6 +47,21 @@ } } }; +/** + * Since the names of LDC and HDC aren't normalized, we have to do guesswork + * best fit is key with HDC or LDC in it + */ +export const refwaterlevels = { + methods: { + determineLDCHDC(refWaterLevels) { + let HDC = + refWaterLevels[Object.keys(refWaterLevels).find(e => /HDC/.test(e))]; + let LDC = + refWaterLevels[Object.keys(refWaterLevels).find(e => /LDC/.test(e))]; + return { LDC, HDC }; + } + } +}; export const diagram = { methods: { diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/store/fairwayavailability.js --- a/client/src/store/fairwayavailability.js Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/store/fairwayavailability.js Fri Aug 02 13:30:29 2019 +0200 @@ -18,9 +18,7 @@ import { format, subYears, - startOfDay, startOfMonth, - endOfDay, endOfMonth, startOfYear, endOfYear, @@ -240,13 +238,11 @@ } else if (type == TYPES.SECTION || type == TYPES.STRETCH) { additionalParams = `&depthbreaks=${depthLimit1},${depthLimit2}&widthbreaks=${widthLimit1},${widthLimit2}`; } + const start = encodeURIComponent("00:00:00+00:00"); + const end = encodeURIComponent("23:59:59+00:00"); const URL = `data/${endpoint}/fairway-depth/${encodeURIComponent( name - )}?from=${encodeURIComponent( - format(startOfDay(from), "YYYY-MM-DDTHH:mm:ssZ") - )}&to=${encodeURIComponent( - format(endOfDay(to), "YYYY-MM-DDTHH:mm:ssZ") - )}&mode=${frequency}&los=${LOS}${additionalParams}`; + )}?from=${from}T${start}&to=${to}T${end}&mode=${frequency}&los=${LOS}${additionalParams}`; HTTP.get(URL, { headers: { "X-Gemma-Auth": localStorage.getItem("token") } }) @@ -282,6 +278,8 @@ ? feature.properties.name : feature.get("objnam"); [from, to] = getIntervallBorders(from, to, frequency); + const start = encodeURIComponent("00:00:00+00:00"); + const end = encodeURIComponent("23:59:59+00:00"); let additionalParams = ""; let endpoint = type || TYPES.BOTTLENECK; if (type === TYPES.BOTTLENECK) { @@ -294,11 +292,7 @@ } const URL = `data/${endpoint}/availability/${encodeURIComponent( name - )}?from=${encodeURIComponent( - format(startOfDay(from), "YYYY-MM-DDTHH:mm:ssZ") - )}&to=${encodeURIComponent( - format(endOfDay(to), "YYYY-MM-DDTHH:mm:ssZ") - )}&mode=${frequency}&los=${LOS}${additionalParams}`; + )}?from=${from}T${start}&to=${to}T${end}&mode=${frequency}&los=${LOS}${additionalParams}`; HTTP.get(URL, { headers: { "X-Gemma-Auth": localStorage.getItem("token") } }) diff -r 4d7569cca5e6 -r 552ea22ed266 client/src/store/gauges.js --- a/client/src/store/gauges.js Thu Aug 01 12:48:33 2019 +0200 +++ b/client/src/store/gauges.js Fri Aug 02 13:30:29 2019 +0200 @@ -78,6 +78,9 @@ nashSutcliffe: (state, data) => { state.nashSutcliffe = data; }, + deleteNashSutcliffeCache: state => { + state.nashSutcliffeOverview = []; + }, addNashSutcliffeOverviewEntry: (state, data) => { let existingIndex = state.nashSutcliffeOverview.findIndex( d => d.feature.get("id") === data.feature.get("id") diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/controllers/bottlenecks.go --- a/pkg/controllers/bottlenecks.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/controllers/bottlenecks.go Fri Aug 02 13:30:29 2019 +0200 @@ -257,7 +257,7 @@ if f == "" { return def.UTC(), true } - v, err := time.Parse(common.TimeFormat, f) + v, err := common.ParseTime(f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/controllers/importqueue.go --- a/pkg/controllers/importqueue.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/controllers/importqueue.go Fri Aug 02 13:30:29 2019 +0200 @@ -373,7 +373,7 @@ if err = rows.Scan(&t, &entry.Kind, &entry.Message); err != nil { return } - entry.Time = models.ImportTime{t.UTC()} + entry.Time = models.ImportTime{Time: t.UTC()} entries = append(entries, &entry) } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/controllers/user.go --- a/pkg/controllers/user.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/controllers/user.go Fri Aug 02 13:30:29 2019 +0200 @@ -263,7 +263,7 @@ } if err != nil { - m, c := pgxutils.ReadableError{err}.MessageAndCode() + m, c := pgxutils.ReadableError{Err: err}.MessageAndCode() err = JSONError{Code: c, Message: m} return } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/agm.go --- a/pkg/imports/agm.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/agm.go Fri Aug 02 13:30:29 2019 +0200 @@ -41,7 +41,7 @@ Originator string `json:"originator"` } -// GMAPJobKind is the unique name of an approved gauge measurements import job. +// AGMJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/bn.go --- a/pkg/imports/bn.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/bn.go Fri Aug 02 13:30:29 2019 +0200 @@ -24,9 +24,11 @@ "strings" "time" + "github.com/jackc/pgx/pgtype" + + "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" - "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import @@ -89,69 +91,23 @@ RETURNING id ` - updateBottleneckSQL = ` -WITH -bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))), -r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) -UPDATE waterway.bottlenecks SET ( - bottleneck_id, - validity, - gauge_location, - objnam, - nobjnm, - stretch, - area, - rb, - lb, - responsible_country, - revisiting_time, - limiting, - date_info, - source_organization -) = ( - $2, - $3::tstzrange, - isrs_fromText($4), - $5, - $6, - (SELECT r FROM r), - ISRSrange_area( - ISRSrange_axis((SELECT r FROM r), - $16), - (SELECT ST_Collect(CAST(area AS geometry)) - FROM waterway.waterway_area)), - $9, - $10, - $11, - $12::smallint, - $13, - $14::timestamptz, - $15 -) -WHERE id=$1 -RETURNING id -` - + // We only check for NOT NULL values, for correct compairison with + // values, which might be null (and then muyst not be compairt with `=' + // but with `IS NULL' is comlicated and that we are checking more than + // only (bottleneck_id, validity, date_info) is luxury already. findExactMatchBottleneckSQL = ` WITH -bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), +bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))), r AS (SELECT isrsrange( - (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), - (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) + (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), + (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) SELECT id FROM waterway.bottlenecks WHERE ( bottleneck_id, validity, gauge_location, - objnam, - nobjnm, stretch, - rb, - lb, responsible_country, - revisiting_time, limiting, date_info, source_organization, @@ -160,69 +116,47 @@ $1, $2::tstzrange, isrs_fromText($3), - $4, - $5, (SELECT r FROM r), - $8, + $6, + $7, + $8::timestamptz, $9, - $10, - $11::smallint, - $12, - $13::timestamptz, - $14, true ) ` - findMatchBottleneckSQL = ` + findIntersectingBottleneckSQL = ` SELECT id FROM waterway.bottlenecks -WHERE ( - bottleneck_id, - validity, - staging_done -) = ( - $1, - $2::tstzrange, - true -) -` - // FIXME: Is this still neede wtih the new simplified historization - // model? My intuition is: no it isn't and should be removed, but we - // should double check before doing so... [sw] - // - // Alignment with gauge validity might have generated new entries - // for the same time range. Thus, remove the old ones - deleteObsoleteBNSQL = ` -DELETE FROM waterway.bottlenecks -WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) -` - - fixBNValiditySQL = ` -UPDATE waterway.bottlenecks SET - -- Set enddate of old entry to new startdate in case of overlap: - validity = validity - $2 -WHERE bottleneck_id = $1 - AND validity && $2 AND NOT validity <@ $2 -` - - deleteBottleneckMaterialSQL = ` -WITH del AS ( - DELETE FROM waterway.bottlenecks_riverbed_materials - WHERE bottleneck_id = ANY($1) - AND riverbed <> ALL($2) - RETURNING riverbed) -SELECT DISTINCT riverbed FROM del +WHERE (bottleneck_id, staging_done) = ($1, true) + AND $2::tstzrange && validity ` insertBottleneckMaterialSQL = ` INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed -) SELECT * -FROM unnest(CAST($1 AS int[])) AS bns, - unnest(CAST($2 AS varchar[])) AS materials +) VALUES ($1, $2) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` + + bnStageDoneDeleteSQL = ` +DELETE FROM waterway.bottlenecks WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND deletion +)` + + bnStageDoneSQL = ` +UPDATE waterway.bottlenecks SET staging_done = true +WHERE id IN ( + SELECT key + FROM import.track_imports + WHERE import_id = $1 + AND relation = 'waterway.bottlenecks'::regclass + AND NOT deletion +)` ) type bnJobCreator struct{} @@ -244,22 +178,16 @@ } } -const ( - bnStageDoneSQL = ` -UPDATE waterway.bottlenecks SET staging_done = true -WHERE id IN ( - SELECT key from import.track_imports - WHERE import_id = $1 AND - relation = 'waterway.bottlenecks'::regclass)` -) - // StageDone moves the imported bottleneck out of the staging area. func (bnJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { - _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) + _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id) + if err == nil { + _, err = tx.ExecContext(ctx, bnStageDoneSQL, id) + } return err } @@ -287,7 +215,9 @@ fetch := func() ([]*ifbn.BottleNeckType, error) { client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil) - req := &ifbn.Export_bn_by_isrs{} + req := &ifbn.Export_bn_by_isrs{ + Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}}, + } resp, err := client.Export_bn_by_isrs(req) if err != nil { @@ -305,6 +235,48 @@ return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) } +type bnStmts struct { + insert *sql.Stmt + findExactMatch *sql.Stmt + findIntersecting *sql.Stmt + insertMaterial *sql.Stmt + track *sql.Stmt +} + +func (bs *bnStmts) close() { + for _, s := range []**sql.Stmt{ + &bs.insert, + &bs.findExactMatch, + &bs.findIntersecting, + &bs.insertMaterial, + &bs.track, + } { + if *s != nil { + (*s).Close() + *s = nil + } + } +} + +func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error { + for _, x := range []struct { + sql string + stmt **sql.Stmt + }{ + {insertBottleneckSQL, &bs.insert}, + {findExactMatchBottleneckSQL, &bs.findExactMatch}, + {findIntersectingBottleneckSQL, &bs.findIntersecting}, + {insertBottleneckMaterialSQL, &bs.insertMaterial}, + {trackImportDeletionSQL, &bs.track}, + } { + var err error + if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { + return err + } + } + return nil +} + func storeBottlenecks( ctx context.Context, fetch func() ([]*ifbn.BottleNeckType, error), @@ -322,46 +294,41 @@ feedback.Info("Found %d bottlenecks for import", len(bns)) - var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, - insertMaterialStmt, trackStmt *sql.Stmt + var bs bnStmts + defer bs.close() - for _, x := range []struct { - sql string - stmt **sql.Stmt - }{ - {insertBottleneckSQL, &insertStmt}, - {updateBottleneckSQL, &updateStmt}, - {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, - {findMatchBottleneckSQL, &findMatchingBNStmt}, - {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, - {fixBNValiditySQL, &fixValidityStmt}, - {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, - {insertBottleneckMaterialSQL, &insertMaterialStmt}, - {trackImportSQL, &trackStmt}, - } { - var err error - if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { - return nil, err - } - defer (*x.stmt).Close() + if err := bs.prepare(ctx, conn); err != nil { + return nil, err } var nids []string + seenOldBnIDs := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + savepoint := Savepoint(ctx, tx, "insert_bottlenck") + for _, bn := range bns { if err := storeBottleneck( - ctx, importID, conn, feedback, bn, &nids, tolerance, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, - trackStmt); err != nil { + ctx, tx, + importID, feedback, bn, + &nids, seenOldBnIDs, tolerance, + &bs, + savepoint, + ); err != nil { return nil, err } } + if err = tx.Commit(); err != nil { + return nil, err + } + if len(nids) == 0 { return nil, UnchangedError("No new bottlenecks inserted") } @@ -378,22 +345,22 @@ func storeBottleneck( ctx context.Context, + tx *sql.Tx, importID int64, - conn *sql.Conn, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, + seenOldBnIDs map[int64]bool, tolerance float64, - insertStmt, updateStmt, - findExactMatchingBNStmt, findMatchingBNStmt, - deleteObsoleteBNStmt, fixValidityStmt, - deleteMaterialStmt, insertMaterialStmt, - trackStmt *sql.Stmt, + bs *bnStmts, + savepoint func(func() error) error, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) - var tfrom, tto pgtype.Timestamptz - var uBound pgtype.BoundType + var ( + tfrom, tto pgtype.Timestamptz + uBound pgtype.BoundType + ) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not @@ -407,8 +374,8 @@ // version is advisable. // // Never the less, the current solution "just works" for the - // rtime being... -- sw - feedback.Warn("No validity information, assuming infinite validity.") + // time being and reflects the upstream systems... -- sw + feedback.Info("No validity information, assuming infinite validity.") tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) uBound = pgtype.Unbounded } else { @@ -418,13 +385,12 @@ ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { - k := string(kv.Key) - if k == fromKey || k == toKey { - if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { + if kv.Key == fromKey || kv.Key == toKey { + t, err := time.Parse(time.RFC3339, kv.Value) + if err != nil { return err - } else { - fromTo[k] = t } + fromTo[kv.Key] = t } } @@ -438,8 +404,13 @@ if t, ok := fromTo[toKey]; ok { tto.Set(t) uBound = pgtype.Exclusive + feedback.Info("Valid from %s to %s", + fromTo[fromKey].Format(common.TimeFormat), + fromTo[toKey].Format(common.TimeFormat)) } else { uBound = pgtype.Unbounded + feedback.Info("Valid from %s", + fromTo[fromKey].Format(common.TimeFormat)) } } @@ -485,29 +456,30 @@ } } - // Check if an bottleneck identical to the one we would insert already + // Check if a bottleneck identical to the one we would insert already // exists: - bns, err := findExactMatchingBNStmt.QueryContext(ctx, + var old int64 + err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( + ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, bn.From_ISRS, bn.To_ISRS, - rb, - lb, country, - revisitingTime, limiting, bn.Date_Info, bn.Source, - ) - - if err != nil { + ).Scan(&old) + switch { + case err == sql.ErrNoRows: + // We dont have a matching old. + case err != nil: return err - } - defer bns.Close() - if bns.Next() { + default: + // We could check if the materials are also matching -- but per + // specification the Date_Info would hvae to change on that kind of + // change anyway. So actually we are already checking more in depth than + // required. feedback.Info("unchanged") return nil } @@ -516,55 +488,56 @@ // it can be used for debugging if something goes wrong... feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) - // Check if an bottleneck with the same identity - // (bottleneck_id,validity) already exists: - // Check if an bottleneck identical to the one we would insert already - // exists: - var existing_bn_id *int64 - err = findMatchingBNStmt.QueryRowContext(ctx, - bn.Bottleneck_id, - &validity, - ).Scan(&existing_bn_id) - switch { - case err == sql.ErrNoRows: - existing_bn_id = nil - case err != nil: - // This is unexpected and propably a serious error - return err - } - - tx, err := conn.BeginTx(ctx, nil) + // Check if the new bottleneck intersects with the validity of existing + // for the same bottleneck_id we consider this an update and mark the + // old data for deletion. + bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext( + ctx, bn.Bottleneck_id, &validity, + ) if err != nil { return err } - defer tx.Rollback() + defer bns.Close() - var bnIds []int64 - if existing_bn_id != nil { - feedback.Info("Bottelneck '%s' "+ - "with matching validity already exists:"+ + // Mark old intersecting bottleneck data for deletion. Don't worry about + // materials, they will be deleted via cascading. + var oldBnIDs []int64 + for bns.Next() { + var oldID int64 + err := bns.Scan(&oldID) + if err != nil { + return err + } + oldBnIDs = append(oldBnIDs, oldID) + } + + if err := bns.Err(); err != nil { + return err + } + + switch { + case len(oldBnIDs) == 1: + feedback.Info("Bottleneck '%s' "+ + "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) - // Updating existnig BN data: - bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, - existing_bn_id, - bn.Bottleneck_id, - &validity, - bn.Fk_g_fid, - bn.OBJNAM, - bn.NOBJNM, - bn.From_ISRS, bn.To_ISRS, - rb, - lb, - country, - revisitingTime, - limiting, - bn.Date_Info, - bn.Source, - tolerance, - ) - } else { - // New BN data: - bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, + case len(oldBnIDs) > 1: + // This case is unexpected and should only happen when historic + // data in the bottleneck service was changed subsequently... + // We handle it gracefully anyway, but warn. + feedback.Warn("More than one Bottelneck '%s' "+ + "with intersecting validity already exists: "+ + "REPLACING all of them!", bn.Bottleneck_id) + } + // We write the actual tracking information for deletion of superseded + // bottlenecks later to the database -- AFTER the new bottleneck was + // created successfully. That way, we don't change the database, when + // an error arises during inserting the new data. + + var bnID int64 + // Add new BN data: + if err := savepoint(func() error { + if err := tx.StmtContext(ctx, bs.insert).QueryRowContext( + ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, @@ -579,113 +552,48 @@ bn.Date_Info, bn.Source, tolerance, - ) - } - if err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - defer bns.Close() - for bns.Next() { - var nid int64 - if err := bns.Scan(&nid); err != nil { - return err - } - bnIds = append(bnIds, nid) - } - if err := bns.Err(); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - if len(bnIds) == 0 { - feedback.Warn( - "No gauge matching '%s' or given time available", bn.Fk_g_fid) - return nil - } - - // Remove obsolete bottleneck version entries - var pgBnIds pgtype.Int8Array - pgBnIds.Set(bnIds) - if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, - bn.Bottleneck_id, - &validity, - &pgBnIds, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } - - // Set end of validity of old version to start of new version - // in case of overlap - if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, - bn.Bottleneck_id, - validity, - ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return err2 - } - return nil - } - - if materials != nil { - // Remove obsolete riverbed materials - var pgMaterials pgtype.VarcharArray - pgMaterials.Set(materials) - mtls, err := tx.StmtContext(ctx, - deleteMaterialStmt).QueryContext(ctx, - &pgBnIds, - &pgMaterials, - ) - if err != nil { - return err - } - defer mtls.Close() - for mtls.Next() { - var delMat string - if err := mtls.Scan(&delMat); err != nil { - return err - } - feedback.Warn("Removed riverbed material %s", delMat) - } - if err := mtls.Err(); err != nil { + ).Scan(&bnID); err != nil { return err } - // Insert riverbed materials - if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, - &pgBnIds, - &pgMaterials, - ); err != nil { - feedback.Warn("Failed to insert riverbed materials") - feedback.Warn(pgxutils.ReadableError{err}.Error()) - return nil - } - } - - // Only add new BN data to tracking for staging review. - // - // FIXME: Review for updated bottlenecks is currently not possible, as - // the update is done instantly in place. - if existing_bn_id == nil { - for _, nid := range bnIds { - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, + // Add new materials + for _, material := range materials { + if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext( + ctx, + bnID, + material, ); err != nil { return err } } + return nil + }); err != nil { + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) + return nil } - if err = tx.Commit(); err != nil { + // Now that adding BNs to staging was successful, write import tracking + // information to database: + for _, oldID := range oldBnIDs { + // It is possible, that two new bottlenecks intersect with the + // same old one, therefore we have to handle duplicates in + // oldBnIds. + if !seenOldBnIDs[oldID] { + if _, err := tx.StmtContext(ctx, bs.track).ExecContext( + ctx, importID, "waterway.bottlenecks", oldID, true, + ); err != nil { + return err + } + seenOldBnIDs[oldID] = true + } + } + + if _, err := tx.StmtContext(ctx, bs.track).ExecContext( + ctx, importID, "waterway.bottlenecks", bnID, false, + ); err != nil { return err } - // See above... - if existing_bn_id == nil { - *nids = append(*nids, bn.Bottleneck_id) - } + + *nids = append(*nids, bn.Bottleneck_id) return nil } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/dma.go --- a/pkg/imports/dma.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/dma.go Fri Aug 02 13:30:29 2019 +0200 @@ -25,8 +25,8 @@ "gemma.intevation.de/gemma/pkg/wfs" ) -// FairwayDimension is an import job to import -// the fairway dimensions in form of polygon geometries +// DistanceMarksAshore is an import job to import +// the distance marks ashore in form of point geometries // and attribute data from a WFS service. type DistanceMarksAshore struct { // URL the GetCapabilities URL of the WFS service. diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/fd.go --- a/pkg/imports/fd.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/fd.go Fri Aug 02 13:30:29 2019 +0200 @@ -311,7 +311,7 @@ // ignore -> filtered by responsibility_areas continue features case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) continue features } // Store for potential later removal. diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/gm.go --- a/pkg/imports/gm.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/gm.go Fri Aug 02 13:30:29 2019 +0200 @@ -162,8 +162,8 @@ mt := nts.Message_type_typeWRM var dis []*nts.Date_pair dis = append(dis, &nts.Date_pair{ - Date_start: nts.Date{time.Now().Add(time.Duration(-24) * time.Hour)}, - Date_end: nts.Date{time.Now()}, + Date_start: nts.Date{Time: time.Now().Add(time.Duration(-24) * time.Hour)}, + Date_end: nts.Date{Time: time.Now()}, }) req := &nts.Get_messages_query{ @@ -443,7 +443,7 @@ case err == sql.ErrNoRows: // thats expected, nothing to do case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) default: newP++ } @@ -474,7 +474,7 @@ case err == sql.ErrNoRows: // thats expected, nothing to do case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) default: newM++ } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/misc.go --- a/pkg/imports/misc.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/misc.go Fri Aug 02 13:30:29 2019 +0200 @@ -55,9 +55,7 @@ } } }() - err = fn() - - if err == nil { + if err = fn(); err == nil { done = true _, err = tx.ExecContext(ctx, release) } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/scheduled.go --- a/pkg/imports/scheduled.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/scheduled.go Fri Aug 02 13:30:29 2019 +0200 @@ -58,14 +58,14 @@ return 0, err } if cfg == nil { - return 0, fmt.Errorf("no config found for id %d.", id) + return 0, fmt.Errorf("no config found for id %d", id) } kind := JobKind(cfg.Kind) ctor := ImportModelForJobKind(kind) if ctor == nil { - return 0, fmt.Errorf("no constructor for kind '%s'.", cfg.Kind) + return 0, fmt.Errorf("no constructor for kind '%s'", cfg.Kind) } what := ctor() @@ -77,7 +77,7 @@ converted := ConvertToInternal(kind, what) if converted == nil { - return 0, fmt.Errorf("Conversion of model for kind '%s' failed.", kind) + return 0, fmt.Errorf("conversion of model for kind '%s' failed", kind) } var serialized string diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/sec.go --- a/pkg/imports/sec.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/sec.go Fri Aug 02 13:30:29 2019 +0200 @@ -182,7 +182,7 @@ sec.Source, sec.Tolerance, ).Scan(&id); err != nil { - return nil, pgxutils.ReadableError{err} + return nil, pgxutils.ReadableError{Err: err} } if err := track(ctx, tx, importID, "waterway.sections", id); err != nil { diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/st.go --- a/pkg/imports/st.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/st.go Fri Aug 02 13:30:29 2019 +0200 @@ -203,7 +203,7 @@ st.Source, st.Tolerance, ).Scan(&id); err != nil { - return nil, pgxutils.ReadableError{err} + return nil, pgxutils.ReadableError{Err: err} } // store the associated countries. diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/wa.go --- a/pkg/imports/wa.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/wa.go Fri Aug 02 13:30:29 2019 +0200 @@ -247,7 +247,7 @@ outside++ // ignore -> filtered by responsibility_areas case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) default: features++ } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/wg.go --- a/pkg/imports/wg.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/wg.go Fri Aug 02 13:30:29 2019 +0200 @@ -327,7 +327,7 @@ ).Scan(&isNew) switch { case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } @@ -352,7 +352,7 @@ source, time.Time(*dr.Lastupdate), ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } @@ -389,7 +389,7 @@ unchanged++ continue case err2 != nil: - feedback.Warn(pgxutils.ReadableError{err2}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err2}.Error()) if err3 := tx.Rollback(); err3 != nil { return nil, err3 } @@ -436,7 +436,7 @@ code.String(), &validity, ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } @@ -484,7 +484,7 @@ string(**wl.level), int64(**wl.value), ); err != nil { - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) tx.Rollback() continue } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/imports/wx.go --- a/pkg/imports/wx.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/imports/wx.go Fri Aug 02 13:30:29 2019 +0200 @@ -321,7 +321,7 @@ // ignore -> filtered by responsibility_areas return nil case err != nil: - feedback.Warn(pgxutils.ReadableError{err}.Error()) + feedback.Warn(pgxutils.ReadableError{Err: err}.Error()) default: *features++ } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/octree/tree.go --- a/pkg/octree/tree.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/octree/tree.go Fri Aug 02 13:30:29 2019 +0200 @@ -321,7 +321,7 @@ return result } -func (t *Tree) GenerateRandomVertices(n int, callback func([]Vertex)) { +func (ot *Tree) GenerateRandomVertices(n int, callback func([]Vertex)) { var wg sync.WaitGroup jobs := make(chan int) @@ -336,8 +336,8 @@ wg.Add(1) go func() { defer wg.Done() - xRange := common.Random(t.Min.X, t.Max.X) - yRange := common.Random(t.Min.Y, t.Max.Y) + xRange := common.Random(ot.Min.X, ot.Max.X) + yRange := common.Random(ot.Min.Y, ot.Max.Y) for size := range jobs { var vertices []Vertex @@ -348,7 +348,7 @@ } for len(vertices) < size { x, y := xRange(), yRange() - if z, ok := t.Value(x, y); ok { + if z, ok := ot.Value(x, y); ok { vertices = append(vertices, Vertex{X: x, Y: y, Z: z}) } } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/octree/triangulator.go --- a/pkg/octree/triangulator.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/octree/triangulator.go Fri Aug 02 13:30:29 2019 +0200 @@ -20,7 +20,7 @@ package octree import ( - "fmt" + "errors" "math" "sort" ) @@ -43,22 +43,22 @@ // sorting a triangulator sorts the `ids` such that the referenced points // are in order by their distance to `center` -func (a *triangulator) Len() int { - return len(a.points) +func (tri *triangulator) Len() int { + return len(tri.points) } -func (a *triangulator) Swap(i, j int) { - a.ids[i], a.ids[j] = a.ids[j], a.ids[i] +func (tri *triangulator) Swap(i, j int) { + tri.ids[i], tri.ids[j] = tri.ids[j], tri.ids[i] } -func (a *triangulator) Less(i, j int) bool { - d1 := a.squaredDistances[a.ids[i]] - d2 := a.squaredDistances[a.ids[j]] +func (tri *triangulator) Less(i, j int) bool { + d1 := tri.squaredDistances[tri.ids[i]] + d2 := tri.squaredDistances[tri.ids[j]] if d1 != d2 { return d1 < d2 } - p1 := a.points[a.ids[i]] - p2 := a.points[a.ids[j]] + p1 := tri.points[tri.ids[i]] + p2 := tri.points[tri.ids[j]] if p1.X != p2.X { return p1.X < p2.X } @@ -135,7 +135,7 @@ } } if minRadius == infinity { - return fmt.Errorf("No Delaunay triangulation exists for this input.") + return errors.New("no delaunay triangulation exists for this input") } // swap the order of the seed points for counter-clockwise orientation @@ -264,35 +264,35 @@ return nil } -func (t *triangulator) hashKey(point Vertex) int { - d := point.Sub2D(t.center) - return int(pseudoAngle(d.X, d.Y) * float64(len(t.hash))) +func (tri *triangulator) hashKey(point Vertex) int { + d := point.Sub2D(tri.center) + return int(pseudoAngle(d.X, d.Y) * float64(len(tri.hash))) } -func (t *triangulator) hashEdge(e *node) { - t.hash[t.hashKey(t.points[e.i])] = e +func (tri *triangulator) hashEdge(e *node) { + tri.hash[tri.hashKey(tri.points[e.i])] = e } -func (t *triangulator) addTriangle(i0, i1, i2, a, b, c int32) int32 { - i := int32(t.trianglesLen) - t.triangles[i] = i0 - t.triangles[i+1] = i1 - t.triangles[i+2] = i2 - t.link(i, a) - t.link(i+1, b) - t.link(i+2, c) - t.trianglesLen += 3 +func (tri *triangulator) addTriangle(i0, i1, i2, a, b, c int32) int32 { + i := int32(tri.trianglesLen) + tri.triangles[i] = i0 + tri.triangles[i+1] = i1 + tri.triangles[i+2] = i2 + tri.link(i, a) + tri.link(i+1, b) + tri.link(i+2, c) + tri.trianglesLen += 3 return i } -func (t *triangulator) link(a, b int32) { - t.halfedges[a] = b +func (tri *triangulator) link(a, b int32) { + tri.halfedges[a] = b if b >= 0 { - t.halfedges[b] = a + tri.halfedges[b] = a } } -func (t *triangulator) legalize(a int32) int32 { +func (tri *triangulator) legalize(a int32) int32 { // if the pair of triangles doesn't satisfy the Delaunay condition // (p1 is inside the circumcircle of [p0, pl, pr]), flip them, // then do the same check/flip recursively for the new pair of triangles @@ -308,7 +308,7 @@ // \||/ \ / // pr pr - b := t.halfedges[a] + b := tri.halfedges[a] a0 := a - a%3 b0 := b - b%3 @@ -321,53 +321,53 @@ return ar } - p0 := t.triangles[ar] - pr := t.triangles[a] - pl := t.triangles[al] - p1 := t.triangles[bl] + p0 := tri.triangles[ar] + pr := tri.triangles[a] + pl := tri.triangles[al] + p1 := tri.triangles[bl] - illegal := inCircle(t.points[p0], t.points[pr], t.points[pl], t.points[p1]) + illegal := inCircle(tri.points[p0], tri.points[pr], tri.points[pl], tri.points[p1]) if illegal { - t.triangles[a] = p1 - t.triangles[b] = p0 + tri.triangles[a] = p1 + tri.triangles[b] = p0 // edge swapped on the other side of the hull (rare) // fix the halfedge reference - if t.halfedges[bl] == -1 { - e := t.hull + if tri.halfedges[bl] == -1 { + e := tri.hull for { if e.t == bl { e.t = a break } e = e.next - if e == t.hull { + if e == tri.hull { break } } } - t.link(a, t.halfedges[bl]) - t.link(b, t.halfedges[ar]) - t.link(ar, bl) + tri.link(a, tri.halfedges[bl]) + tri.link(b, tri.halfedges[ar]) + tri.link(ar, bl) br := b0 + (b+1)%3 - t.legalize(a) - return t.legalize(br) + tri.legalize(a) + return tri.legalize(br) } return ar } -func (t *triangulator) convexHull() []Vertex { +func (tri *triangulator) convexHull() []Vertex { var result []Vertex - e := t.hull + e := tri.hull for e != nil { - result = append(result, t.points[e.i]) + result = append(result, tri.points[e.i]) e = e.prev - if e == t.hull { + if e == tri.hull { break } } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/pgxutils/errors.go --- a/pkg/pgxutils/errors.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/pgxutils/errors.go Fri Aug 02 13:30:29 2019 +0200 @@ -79,7 +79,7 @@ switch err.TableName { case "gauge_measurements", "gauge_predictions", "bottlenecks": switch err.ConstraintName { - case "gauge_key": + case "gauge_key", "waterway_bottlenecks_reference_gauge": m = "Referenced gauge with matching temporal validity not available" return } diff -r 4d7569cca5e6 -r 552ea22ed266 pkg/soap/ifbn/service.go --- a/pkg/soap/ifbn/service.go Thu Aug 01 12:48:33 2019 +0200 +++ b/pkg/soap/ifbn/service.go Fri Aug 02 13:30:29 2019 +0200 @@ -36,7 +36,6 @@ } type Export_bn_by_isrs struct { - //XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"` XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"` ISRS *ArrayOfISRSPair `xml:"ISRS,omitempty"` @@ -826,13 +825,11 @@ } type RequestedPeriod struct { - //XMLName xml.Name `xml:"http://www.ris.eu/wamos/common/3.0 RequestedPeriod"` - - Date_start time.Time `xml:"Date_start,omitempty"` + Date_start *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_start,omitempty"` - Date_end time.Time `xml:"Date_end,omitempty"` + Date_end *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_end,omitempty"` - Value_interval int32 `xml:"Value_interval,omitempty"` + Value_interval *int32 `xml:"http://www.ris.eu/wamos/common/3.0 Value_interval,omitempty"` } type ArrayOfString struct { diff -r 4d7569cca5e6 -r 552ea22ed266 schema/gemma.sql --- a/schema/gemma.sql Thu Aug 01 12:48:33 2019 +0200 +++ b/schema/gemma.sql Fri Aug 02 13:30:29 2019 +0200 @@ -103,7 +103,7 @@ USING DETAIL = format('No matching gauge %s found.', isrs_AsText(referenced_gauge::isrs)), - ERRCODE = 23505, + ERRCODE = 23503, SCHEMA = TG_TABLE_SCHEMA, TABLE = TG_TABLE_NAME, COLUMN = TG_ARGV[0], @@ -141,7 +141,7 @@ USING DETAIL = format('No matching gauge %s found.', isrs_AsText(referenced_gauge::isrs)), - ERRCODE = 23505, + ERRCODE = 23503, SCHEMA = TG_TABLE_SCHEMA, TABLE = TG_TABLE_NAME, COLUMN = TG_ARGV[0], @@ -179,7 +179,7 @@ USING DETAIL = format('No matching bottleneck %s for %s found.', referenced_bottleneck_id, new_tstz), - ERRCODE = 23505, + ERRCODE = 23503, SCHEMA = TG_TABLE_SCHEMA, TABLE = TG_TABLE_NAME, COLUMN = TG_ARGV[0], @@ -649,9 +649,6 @@ id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, bottleneck_id varchar NOT NULL, validity tstzrange NOT NULL CHECK (NOT isempty(validity)), - UNIQUE (bottleneck_id, validity), - EXCLUDE USING GiST (bottleneck_id WITH =, validity WITH &&) - DEFERRABLE INITIALLY DEFERRED, gauge_location isrs NOT NULL, objnam varchar, nobjnm varchar, @@ -673,7 +670,12 @@ -- XXX: Also an attribut of sounding result? date_info timestamp with time zone NOT NULL, source_organization varchar NOT NULL, - staging_done boolean NOT NULL DEFAULT false + staging_done boolean NOT NULL DEFAULT false, + UNIQUE (bottleneck_id, validity, staging_done), + EXCLUDE USING GiST (bottleneck_id WITH =, + validity WITH &&, + CAST(staging_done AS int) WITH =) + DEFERRABLE INITIALLY DEFERRED ) CREATE CONSTRAINT TRIGGER waterway_bottlenecks_reference_gauge AFTER INSERT OR UPDATE OF gauge_location ON bottlenecks @@ -881,9 +883,13 @@ tmp RECORD; BEGIN FOR tmp IN - SELECT * FROM import.track_imports WHERE import_id = imp_id AND NOT deletion + SELECT relation, array_agg(key) AS keys + FROM import.track_imports + WHERE import_id = imp_id AND NOT deletion + GROUP BY relation LOOP - EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key; + EXECUTE format('DELETE FROM %s WHERE id = ANY($1)', tmp.relation) + USING tmp.keys; END LOOP; END; $$ diff -r 4d7569cca5e6 -r 552ea22ed266 schema/import_tests.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/import_tests.sql Fri Aug 02 13:30:29 2019 +0200 @@ -0,0 +1,81 @@ +-- This is Free Software under GNU Affero General Public License v >= 3.0 +-- without warranty, see README.md and license for details. + +-- SPDX-License-Identifier: AGPL-3.0-or-later +-- License-Filename: LICENSES/AGPL-3.0.txt + +-- Copyright (C) 2019 by via donau +-- – Österreichische Wasserstraßen-Gesellschaft mbH +-- Software engineering by Intevation GmbH + +-- Author(s): +-- * Tom Gottfried + +-- +-- pgTAP test script for import queue +-- + +\set imp_id 99 +PREPARE insert_gms AS WITH +gms AS ( + INSERT INTO waterway.gauge_measurements ( + location, + measure_date, + country_code, + sender, + language_code, + date_issue, + reference_code, + water_level, + date_info, + source_organization + ) SELECT + ('AT', 'XXX', '00001', 'G0001', 1)::isrs, + t, + 'AT', + 'test', + 'DE', + current_timestamp, + 'ZPG', + 0, + current_timestamp, + 'test' + FROM generate_series( + current_timestamp - '12 h'::interval, + current_timestamp - '6 h'::interval, + '15 min'::interval) AS times (t) + RETURNING id), +imps AS ( + INSERT INTO import.imports (id, kind, username) VALUES ( + $1, 'test', 'test_admin_ro') + RETURNING id) +INSERT INTO import.track_imports (import_id, relation, key) + SELECT imps.id, 'waterway.gauge_measurements', gms.id + FROM imps, gms; + +EXECUTE insert_gms(:imp_id); +EXECUTE insert_gms(:imp_id + 1); + +SELECT ok((SELECT count(*) FROM waterway.gauge_measurements + WHERE id IN(SELECT key + FROM import.track_imports + WHERE import_id IN(:imp_id, :imp_id + 1)) + ) = (SELECT count(*) + FROM import.track_imports WHERE import_id IN(:imp_id, :imp_id + 1)), + 'Tracked entries of test imports exist'); + +SELECT import.del_import(:imp_id); + +SELECT ok(0 = (SELECT count(*) FROM waterway.gauge_measurements + WHERE id IN(SELECT key + FROM import.track_imports + WHERE import_id = :imp_id)), + 'Tracked entries of first test import are deleted'); + +SELECT ok((SELECT count(*) FROM waterway.gauge_measurements + WHERE id IN(SELECT key + FROM import.track_imports + WHERE import_id = :imp_id + 1) + ) = (SELECT count(*) + FROM import.track_imports WHERE import_id = :imp_id + 1), + 'Tracked entries of second test import still exist'); diff -r 4d7569cca5e6 -r 552ea22ed266 schema/run_tests.sh --- a/schema/run_tests.sh Thu Aug 01 12:48:33 2019 +0200 +++ b/schema/run_tests.sh Fri Aug 02 13:30:29 2019 +0200 @@ -80,7 +80,7 @@ -c 'SET client_min_messages TO WARNING' \ -c "DROP ROLE IF EXISTS $TEST_ROLES" \ -f "$BASEDIR"/tap_tests_data.sql \ - -c "SELECT plan(70 + ( + -c "SELECT plan(73 + ( SELECT count(*)::int FROM information_schema.tables WHERE table_schema = 'waterway'))" \ @@ -88,4 +88,5 @@ -f "$BASEDIR"/isrs_tests.sql \ -f "$BASEDIR"/auth_tests.sql \ -f "$BASEDIR"/manage_users_tests.sql \ + -f "$BASEDIR"/import_tests.sql \ -c 'SELECT * FROM finish()' diff -r 4d7569cca5e6 -r 552ea22ed266 schema/tap_tests_data.sql --- a/schema/tap_tests_data.sql Thu Aug 01 12:48:33 2019 +0200 +++ b/schema/tap_tests_data.sql Fri Aug 02 13:30:29 2019 +0200 @@ -17,6 +17,8 @@ -- INSERT INTO countries VALUES ('AT'), ('RO'), ('DE'); +INSERT INTO language_codes VALUES ('DE'); +INSERT INTO depth_references VALUES ('ZPG'); INSERT INTO users.responsibility_areas VALUES ('AT', ST_geomfromtext('MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)))', 4326)), diff -r 4d7569cca5e6 -r 552ea22ed266 schema/updates/1101/01.improve_del_import.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1101/01.improve_del_import.sql Fri Aug 02 13:30:29 2019 +0200 @@ -0,0 +1,17 @@ +CREATE OR REPLACE FUNCTION import.del_import(imp_id int) RETURNS void AS +$$ +DECLARE + tmp RECORD; +BEGIN + FOR tmp IN + SELECT relation, array_agg(key) AS keys + FROM import.track_imports + WHERE import_id = imp_id AND NOT deletion + GROUP BY relation + LOOP + EXECUTE format('DELETE FROM %s WHERE id = ANY($1)', tmp.relation) + USING tmp.keys; + END LOOP; +END; +$$ +LANGUAGE plpgsql; diff -r 4d7569cca5e6 -r 552ea22ed266 schema/updates/1102/01.bn_constraint.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1102/01.bn_constraint.sql Fri Aug 02 13:30:29 2019 +0200 @@ -0,0 +1,14 @@ +ALTER TABLE waterway.bottlenecks + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_key, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_excl, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_key, + DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_excl; + +ALTER TABLE waterway.bottlenecks + ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_key + UNIQUE (bottleneck_id, validity, staging_done), + ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_excl + EXCLUDE USING GiST (bottleneck_id WITH =, + validity WITH &&, + CAST(staging_done AS int) WITH =) + DEFERRABLE INITIALLY DEFERRED; diff -r 4d7569cca5e6 -r 552ea22ed266 schema/updates/1103/01.fix_constraint_error_codes.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1103/01.fix_constraint_error_codes.sql Fri Aug 02 13:30:29 2019 +0200 @@ -0,0 +1,128 @@ +-- This is Free Software under GNU Affero General Public License v >= 3.0 +-- without warranty, see README.md and license for details. + +-- SPDX-License-Identifier: AGPL-3.0-or-later +-- License-Filename: LICENSES/AGPL-3.0.txt + +-- Copyright (C) 2019 by via donau +-- – Österreichische Wasserstraßen-Gesellschaft mbH +-- Software engineering by Intevation GmbH + +-- Author(s): +-- * Sascha Wilde + +-- +-- CONSTRAINT FUNCTIONS +-- +-- The only change in the following functions is the error code +-- returend in case of failure: it should be +-- 23503: foreign_key_violation +-- as what we are emulating is kind of an foreign key... + +CREATE OR REPLACE FUNCTION check_valid_gauge() RETURNS trigger AS +$$ +DECLARE + -- FIXME: I'm using text for the isrs code and cast it on demand. + -- If someone is able to get it to work with isrs or isrs_base as + -- type, feel free to show me how its done... ;-) [sw] + referenced_gauge text; + new_validity tstzrange; +BEGIN + EXECUTE format('SELECT $1.%I', TG_ARGV[0]) + INTO referenced_gauge + USING NEW; + EXECUTE format('SELECT $1.%I', TG_ARGV[1]) + INTO new_validity + USING NEW; + IF EXISTS ( SELECT * FROM waterway.gauges + WHERE location = referenced_gauge::isrs + AND validity && new_validity ) + THEN + RETURN NEW; + ELSE + RAISE EXCEPTION + 'new row for relation "%" violates constraint trigger "%"', + TG_TABLE_NAME, TG_NAME + USING + DETAIL = format('No matching gauge %s found.', + isrs_AsText(referenced_gauge::isrs)), + ERRCODE = 23503, + SCHEMA = TG_TABLE_SCHEMA, + TABLE = TG_TABLE_NAME, + COLUMN = TG_ARGV[0], + CONSTRAINT = TG_NAME; + END IF; +END; +$$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION check_valid_gauge_ts() RETURNS trigger AS +$$ +DECLARE + -- FIXME: I'm using text for the isrs code and cast it on demand. + -- If someone is able to get it to work with isrs or isrs_base as + -- type, feel free to show me how its done... ;-) [sw] + referenced_gauge text; + new_tstz timestamptz; +BEGIN + EXECUTE format('SELECT $1.%I', TG_ARGV[0]) + INTO referenced_gauge + USING NEW; + EXECUTE format('SELECT $1.%I', TG_ARGV[1]) + INTO new_tstz + USING NEW; + IF EXISTS ( SELECT * FROM waterway.gauges + WHERE location = referenced_gauge::isrs + AND validity @> new_tstz ) + THEN + RETURN NEW; + ELSE + RAISE EXCEPTION + 'new row for relation "%" violates constraint trigger "%"', + TG_TABLE_NAME, TG_NAME + USING + DETAIL = format('No matching gauge %s for %s found.', + (isrs_AsText(referenced_gauge::isrs)), new_tstz), + ERRCODE = 23503, + SCHEMA = TG_TABLE_SCHEMA, + TABLE = TG_TABLE_NAME, + COLUMN = TG_ARGV[0], + CONSTRAINT = TG_NAME; + END IF; +END; +$$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION check_valid_bottleneck_ts() RETURNS trigger AS +$$ +DECLARE + referenced_bottleneck_id text; + new_tstz timestamptz; +BEGIN + EXECUTE format('SELECT $1.%I', TG_ARGV[0]) + INTO referenced_bottleneck_id + USING NEW; + EXECUTE format('SELECT $1.%I', TG_ARGV[1]) + INTO new_tstz + USING NEW; + IF EXISTS ( SELECT * FROM waterway.bottlenecks + WHERE bottleneck_id = referenced_bottleneck_id + AND validity @> new_tstz ) + THEN + RETURN NEW; + ELSE + RAISE EXCEPTION + 'new row for relation "%" violates constraint trigger "%"', + TG_TABLE_NAME, TG_NAME + USING + DETAIL = format('No matching bottleneck %s for %s found.', + referenced_bottleneck_id, new_tstz), + ERRCODE = 23503, + SCHEMA = TG_TABLE_SCHEMA, + TABLE = TG_TABLE_NAME, + COLUMN = TG_ARGV[0], + CONSTRAINT = TG_NAME; + END IF; +END; +$$ +LANGUAGE plpgsql; diff -r 4d7569cca5e6 -r 552ea22ed266 schema/version.sql --- a/schema/version.sql Thu Aug 01 12:48:33 2019 +0200 +++ b/schema/version.sql Fri Aug 02 13:30:29 2019 +0200 @@ -1,1 +1,1 @@ -INSERT INTO gemma_schema_version(version) VALUES (1100); +INSERT INTO gemma_schema_version(version) VALUES (1103);