# HG changeset patch # User Tom Gottfried # Date 1557940129 -7200 # Node ID 232fc90e6ee268b274d5d85ab78a84b51bfffb19 # Parent 75db3199f76eaefb0e65f1aff236268afdbe289a Disentangle gauge measurements and predictions Representing both in one table has led to the necessity to make the distinction at many places such as statements, definitions of partial indexes and application code. At least in one place in the AGM import the distinction in application code was too late and measurements matching an approved measurement could have been missed. diff -r 75db3199f76e -r 232fc90e6ee2 pkg/controllers/gauges.go --- a/pkg/controllers/gauges.go Wed May 15 17:55:38 2019 +0200 +++ b/pkg/controllers/gauges.go Wed May 15 19:08:49 2019 +0200 @@ -42,7 +42,23 @@ date_issue, predicted, water_level -FROM waterway.gauge_measurements +FROM ( + SELECT + fk_gauge_id, + measure_date, + date_issue, + false AS predicted, + water_level + FROM waterway.gauge_measurements + UNION ALL + SELECT + fk_gauge_id, + measure_date, + date_issue, + true AS predicted, + water_level + FROM waterway.gauge_predictions +) AS gmp WHERE fk_gauge_id = ( $1::char(2), @@ -62,7 +78,27 @@ value_min, value_max, predicted -FROM waterway.gauge_measurements +FROM ( + SELECT + fk_gauge_id, + measure_date, + date_issue, + water_level, + NULL AS value_min, + NULL AS value_max, + false AS predicted + FROM waterway.gauge_measurements + UNION ALL + SELECT + fk_gauge_id, + measure_date, + date_issue, + water_level, + lower(conf_interval) AS value_min, + upper(conf_interval) AS value_max, + true AS predicted + FROM waterway.gauge_predictions +) AS gmp WHERE ` @@ -78,7 +114,6 @@ $4::char(5), $5::int )::isrs - AND NOT predicted AND staging_done ` @@ -100,7 +135,6 @@ $4::char(5), $5::int )::isrs - AND NOT predicted AND staging_done GROUP BY extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar; @@ -111,7 +145,6 @@ water_level FROM waterway.gauge_measurements WHERE - NOT predicted AND staging_done AND fk_gauge_id = ( $1::char(2), diff -r 75db3199f76e -r 232fc90e6ee2 pkg/imports/agm.go --- a/pkg/imports/agm.go Wed May 15 17:55:38 2019 +0200 +++ b/pkg/imports/agm.go Wed May 15 19:08:49 2019 +0200 @@ -78,7 +78,6 @@ ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) - AND NOT o.predicted ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` @@ -120,38 +119,23 @@ } type agmLine struct { - CountryCode string `json:"country-code"` - Sender string `json:"sender"` - LanguageCode string `json:"language-code"` - DateIssue timetz `json:"date-issue"` - ReferenceCode string `json:"reference-code"` - WaterLevel float64 `json:"water-level"` - Predicted bool `json:"predicted"` - ValueMin *float64 `json:"value-min"` - ValueMax *float64 `json:"value-max"` - DateInfo timetz `json:"date-info"` - SourceOrganization string `json:"source-organization"` + CountryCode string `json:"country-code"` + Sender string `json:"sender"` + LanguageCode string `json:"language-code"` + DateIssue timetz `json:"date-issue"` + ReferenceCode string `json:"reference-code"` + WaterLevel float64 `json:"water-level"` + DateInfo timetz `json:"date-info"` + SourceOrganization string `json:"source-organization"` } func (a *agmLine) hasDiff(b *agmLine) bool { const eps = 0.00001 - fdiff := func(x, y *float64) bool { - if x == nil && y == nil { - return false - } - if (x == nil && y != nil) || (x != nil && y == nil) { - return true - } - return math.Abs(*x-*y) > eps - } return a.CountryCode != b.CountryCode || a.Sender != b.Sender || a.LanguageCode != b.LanguageCode || a.ReferenceCode != b.ReferenceCode || math.Abs(a.WaterLevel-b.WaterLevel) > eps || - a.Predicted != b.Predicted || - fdiff(a.ValueMin, b.ValueMin) || - fdiff(a.ValueMax, b.ValueMax) || a.SourceOrganization != b.SourceOrganization } @@ -171,9 +155,6 @@ date_issue, reference_code, water_level, - predicted, - value_min, - value_max, date_info, source_organization FROM waterway.gauge_measurements @@ -191,9 +172,6 @@ date_issue, reference_code, water_level, - predicted, - value_min, - value_max, date_info, source_organization, is_waterlevel, @@ -209,9 +187,6 @@ $12, $13, $14, - $15, - $16, - $17, true, false ) @@ -382,9 +357,6 @@ oldDateIssue time.Time oldReferenceCode string oldValue float64 - oldPredicted bool - oldValueMin sql.NullFloat64 - oldValueMax sql.NullFloat64 oldDateInfo time.Time oldSourceOrganization string ) @@ -405,9 +377,6 @@ &oldDateIssue, &oldReferenceCode, &oldValue, - &oldPredicted, - &oldValueMin, - &oldValueMax, &oldDateInfo, &oldSourceOrganization, ) @@ -433,17 +402,6 @@ } newValue := value - newPredicted := false - - newValueMin := sql.NullFloat64{ - Float64: 0, - Valid: true, - } - newValueMax := sql.NullFloat64{ - Float64: 0, - Valid: true, - } - newDateInfo := newDateIssue newSourceOrganization := newSender @@ -470,9 +428,6 @@ newDateIssue, newReferenceCode, newValue, - newPredicted, - newValueMin, - newValueMax, newDateInfo, newSourceOrganization, ).Scan(&newID); err != nil { @@ -499,9 +454,6 @@ newDateIssue, newReferenceCode, newValue, - newPredicted, - newValueMin, - newValueMax, newDateInfo, newSourceOrganization, ) @@ -521,14 +473,11 @@ oldDateIssue, oldReferenceCode, oldValue, - oldPredicted, - oldValueMin, - oldValueMax, oldDateInfo, oldSourceOrganization, ) // Ignore if there is no diff. - if o.Predicted || !n.hasDiff(o) { + if !n.hasDiff(o) { continue } ase.Versions = []*agmLine{o, n} @@ -549,19 +498,9 @@ dateIssue time.Time, referenceCode string, waterLevel float64, - predicted bool, - valueMin sql.NullFloat64, - valueMax sql.NullFloat64, dateInfo time.Time, sourceOrganization string, ) *agmLine { - nilFloat := func(v sql.NullFloat64) *float64 { - var p *float64 - if v.Valid { - p = &v.Float64 - } - return p - } return &agmLine{ CountryCode: countryCode, Sender: sender, @@ -569,9 +508,6 @@ DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, WaterLevel: waterLevel, - Predicted: predicted, - ValueMin: nilFloat(valueMin), - ValueMax: nilFloat(valueMax), DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization, } diff -r 75db3199f76e -r 232fc90e6ee2 pkg/imports/gm.go --- a/pkg/imports/gm.go Wed May 15 17:55:38 2019 +0200 +++ b/pkg/imports/gm.go Wed May 15 19:08:49 2019 +0200 @@ -24,6 +24,7 @@ "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/nts" + "github.com/jackc/pgx/pgtype" ) // GaugeMeasurement is an import job to import @@ -53,10 +54,7 @@ date_issue, reference_code, water_level, - predicted, is_waterlevel, - value_min, - value_max, date_info, source_organization, staging_done @@ -72,13 +70,42 @@ $13, $14, $15, - $16, - $17, - $18, - $19 + true ) ON CONFLICT DO NOTHING -RETURNING id +RETURNING 1 +` + + insertGPSQL = ` +INSERT INTO waterway.gauge_predictions ( + fk_gauge_id, + measure_date, + sender, + language_code, + country_code, + date_issue, + reference_code, + water_level, + is_waterlevel, + conf_interval, + date_info, + source_organization +) VALUES( + ($1, $2, $3, $4, $5), + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16 +) +ON CONFLICT DO NOTHING +RETURNING 1 ` ) @@ -224,11 +251,17 @@ feedback Feedback, ) ([]string, error) { - insertStmt, err := conn.PrepareContext(ctx, insertGMSQL) + insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL) if err != nil { return nil, err } - defer insertStmt.Close() + defer insertGPStmt.Close() + + insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL) + if err != nil { + return nil, err + } + defer insertGMStmt.Close() result, err := fetch() if err != nil { @@ -237,7 +270,7 @@ var gids []string for _, msg := range result { - var gid int64 + var dummy int for _, wrm := range msg.Wrm { curr := string(*wrm.Geo_object.Id) currIsrs, err := models.IsrsFromString(curr) @@ -245,7 +278,7 @@ feedback.Warn("Invalid ISRS code %v", err) continue } - feedback.Info("Found measurements for %s", curr) + feedback.Info("Found measurements/predictions for %s", curr) var referenceCode string if wrm.Reference_code == nil { @@ -255,7 +288,7 @@ referenceCode = string(*wrm.Reference_code) } - var newCnt int = 0 + newM, newP := 0, 0 for _, measure := range wrm.Measure { var unit string if measure.Unit == nil { @@ -273,39 +306,81 @@ convert(measure.Value_max) isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL - err = insertStmt.QueryRowContext( - ctx, - currIsrs.CountryCode, - currIsrs.LoCode, - currIsrs.FairwaySection, - currIsrs.Orc, - currIsrs.Hectometre, - measure.Measuredate, - msg.Identification.From, - msg.Identification.Language_code, - msg.Identification.Country_code, - msg.Identification.Date_issue, - referenceCode, - measure.Value, - measure.Predicted, - isWaterlevel, - measure.Value_min, - measure.Value_max, - msg.Identification.Date_issue, - msg.Identification.Originator, - true, // staging_done - ).Scan(&gid) - switch { - case err == sql.ErrNoRows: - // thats expected, nothing to do - case err != nil: - feedback.Warn(handleError(err).Error()) - default: - newCnt++ + + if measure.Predicted { + var confInterval pgtype.Numrange + if measure.Value_min != nil && measure.Value_max != nil { + var valueMin, valueMax pgtype.Numeric + valueMin.Set(measure.Value_min) + valueMax.Set(measure.Value_max) + confInterval = pgtype.Numrange{ + Lower: valueMin, + Upper: valueMax, + LowerType: pgtype.Inclusive, + UpperType: pgtype.Inclusive, + Status: pgtype.Present, + } + } + err = insertGPStmt.QueryRowContext( + ctx, + currIsrs.CountryCode, + currIsrs.LoCode, + currIsrs.FairwaySection, + currIsrs.Orc, + currIsrs.Hectometre, + measure.Measuredate, + msg.Identification.From, + msg.Identification.Language_code, + msg.Identification.Country_code, + msg.Identification.Date_issue, + referenceCode, + measure.Value, + isWaterlevel, + &confInterval, + msg.Identification.Date_issue, + msg.Identification.Originator, + ).Scan(&dummy) + switch { + case err == sql.ErrNoRows: + // thats expected, nothing to do + case err != nil: + feedback.Warn(handleError(err).Error()) + default: + newP++ + } + } else { + err = insertGMStmt.QueryRowContext( + ctx, + currIsrs.CountryCode, + currIsrs.LoCode, + currIsrs.FairwaySection, + currIsrs.Orc, + currIsrs.Hectometre, + measure.Measuredate, + msg.Identification.From, + msg.Identification.Language_code, + msg.Identification.Country_code, + msg.Identification.Date_issue, + referenceCode, + measure.Value, + isWaterlevel, + msg.Identification.Date_issue, + msg.Identification.Originator, + ).Scan(&dummy) + switch { + case err == sql.ErrNoRows: + // thats expected, nothing to do + case err != nil: + feedback.Warn(handleError(err).Error()) + default: + newM++ + } } } feedback.Info("Inserted %d measurements for %s", - newCnt, curr) + newM, curr) + feedback.Info("Inserted %d predictions for %s", + newP, curr) gids = append(gids, curr) } } diff -r 75db3199f76e -r 232fc90e6ee2 schema/gemma.sql --- a/schema/gemma.sql Wed May 15 17:55:38 2019 +0200 +++ b/schema/gemma.sql Wed May 15 19:08:49 2019 +0200 @@ -306,38 +306,34 @@ fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges, measure_date timestamp with time zone NOT NULL, country_code char(2) NOT NULL REFERENCES countries, - -- TODO: add relations to stuff provided as enumerations - sender varchar NOT NULL, -- "from" attribute from DRC + sender varchar NOT NULL, -- "from" element from NtS response language_code varchar NOT NULL REFERENCES language_codes, date_issue timestamp with time zone NOT NULL, reference_code varchar(4) NOT NULL REFERENCES depth_references, water_level double precision NOT NULL, - predicted boolean NOT NULL, is_waterlevel boolean NOT NULL, - -- XXX: "measure_code" if really only W or Q - -- XXX: Do we need "unit" attribute or can we normalise on import? - value_min double precision, -- XXX: NOT NULL if predicted? - value_max double precision, -- XXX: NOT NULL if predicted? - --- TODO: Add a double range type for checking? date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, - source_organization varchar NOT NULL, -- "originator" - staging_done boolean NOT NULL DEFAULT false + source_organization varchar NOT NULL, -- "originator" from NtS response + staging_done boolean NOT NULL DEFAULT false, + UNIQUE (fk_gauge_id, measure_date, staging_done) ) - -- Constraints are conditional for gauge_measurements, as they - -- differ between predicted values and measured ones. PG does not - -- have real conditional unique constraints, but we can use unique - -- indeces for that. - -- - -- So we can have a staged and a non-staged - -- fk_gauge_id/measure_date pairs in measured values. - CREATE UNIQUE INDEX gm_measured_unique_constraint - ON gauge_measurements (fk_gauge_id, measure_date, staging_done) - WHERE NOT predicted - -- And we can have multiple predictions for one point in time - -- (but they are never staged). - CREATE UNIQUE INDEX gm_predicted_unique_constraint - ON gauge_measurements (fk_gauge_id, measure_date, date_issue) - WHERE predicted + + CREATE TABLE gauge_predictions ( + fk_gauge_id isrs NOT NULL CONSTRAINT gauge_key REFERENCES gauges, + measure_date timestamp with time zone NOT NULL, + country_code char(2) NOT NULL REFERENCES countries, + sender varchar NOT NULL, -- "from" element from NtS response + language_code varchar NOT NULL REFERENCES language_codes, + date_issue timestamp with time zone NOT NULL, + reference_code varchar(4) NOT NULL REFERENCES depth_references, + water_level double precision NOT NULL, + is_waterlevel boolean NOT NULL, + conf_interval numrange + CHECK (conf_interval @> CAST(water_level AS numeric)), + date_info timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + source_organization varchar NOT NULL, -- "originator" from NtS response + PRIMARY KEY (fk_gauge_id, measure_date, date_issue) + ) CREATE TABLE waterway_axis ( id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, diff -r 75db3199f76e -r 232fc90e6ee2 schema/geoserver_views.sql --- a/schema/geoserver_views.sql Wed May 15 17:55:38 2019 +0200 +++ b/schema/geoserver_views.sql Wed May 15 19:08:49 2019 +0200 @@ -46,7 +46,7 @@ waterlevel_latest AS ( SELECT DISTINCT ON (fk_gauge_id) fk_gauge_id, water_level FROM waterway.gauge_measurements - WHERE is_waterlevel AND NOT predicted + WHERE is_waterlevel ORDER BY fk_gauge_id, measure_date DESC) SELECT b.id,