Mercurial > gemma
diff pkg/imports/wg.go @ 3302:ec6163c6687d
'Historicise' gauges on import
Gauge data sets will be updated or a new version will be inserted
depending on temporal validity and a timestamp marking the last
update in the RIS-Index of a data set. The trigger on date_info is
removed because the value is actually an attribut coming from the
RIS-Index.
Gauge measurements and predictions are associated to the version with
matching temporal validity. Bottlenecks are always associated to the
actual version of the gauge, although this might change as soon as
bottlenecks are 'historicised', too.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 16 May 2019 18:41:43 +0200 |
parents | e640f51b5a4e |
children | 5932f9574493 |
line wrap: on
line diff
--- a/pkg/imports/wg.go Thu May 16 17:22:33 2019 +0200 +++ b/pkg/imports/wg.go Thu May 16 18:41:43 2019 +0200 @@ -65,11 +65,15 @@ func (*WaterwayGauge) CleanUp() error { return nil } const ( - deleteReferenceWaterLevelsSQL = ` -DELETE FROM waterway.gauges_reference_water_levels -WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) - AND depth_reference <> ALL($6) -RETURNING depth_reference + eraseGaugeSQL = ` +UPDATE waterway.gauges SET + erased = true, + -- Set enddate of old entry to new startdate in case of overlap: + validity = validity - $2 +WHERE isrs_astext(location) = $1 + AND NOT erased + -- Don't touch old entry if validity did not change: will be updated + AND validity <> $2 ` insertGaugeSQL = ` @@ -83,7 +87,8 @@ zero_point, geodref, date_info, - source_organization + source_organization, + lastupdate ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, @@ -94,17 +99,36 @@ $12, $13, $14, - $15 -) ON CONFLICT (location) DO UPDATE SET - objname = EXCLUDED.objname, - geom = EXCLUDED.geom, - applicability_from_km = EXCLUDED.applicability_from_km, - applicability_to_km = EXCLUDED.applicability_to_km, - validity = EXCLUDED.validity, - zero_point = EXCLUDED.zero_point, - geodref = EXCLUDED.geodref, - date_info = EXCLUDED.date_info, - source_organization = EXCLUDED.source_organization + $15, + $16 +-- Exclusion constraints are not supported as arbiters. +-- Thus we need to DO NOTHING here and use an extra UPDATE statement +) ON CONFLICT DO NOTHING +RETURNING 1 +` + updateGaugeSQL = ` +UPDATE waterway.gauges SET + objname = $6, + geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), + applicability_from_km = $9, + applicability_to_km = $10, + zero_point = $11, + geodref = $12, + date_info = $13, + source_organization = $14, + lastupdate = $15 +WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) + AND NOT erased + AND $15 > lastupdate +RETURNING 1 +` + + deleteReferenceWaterLevelsSQL = ` +DELETE FROM waterway.gauges_reference_water_levels +WHERE isrs_astext(location) = $1 + AND validity = $2 + AND depth_reference <> ALL($3) +RETURNING depth_reference ` isNtSDepthRefSQL = ` @@ -112,14 +136,16 @@ insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( - gauge_id, + location, + validity, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, - $7 -) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET + $7, + $8 +) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) @@ -190,21 +216,24 @@ gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) } } - feedback.Info("ignored gauges: %d", ignored) - feedback.Info("insert/update gauges: %d", len(gauges)) + feedback.Info("Ignored gauges: %d", ignored) + feedback.Info("Further process %d gauges", len(gauges)) if len(gauges) == 0 { - return nil, UnchangedError("nothing to do") + return nil, UnchangedError("Nothing to do") } // insert/update the gauges - var insertStmt, deleteReferenceWaterLevelsStmt, + var eraseGaugeStmt, insertStmt, updateStmt, + deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ + {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, + {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, @@ -216,11 +245,13 @@ defer (*x.stmt).Close() } + var unchanged int + for i := range gauges { ic := &gauges[i] dr := responseData[ic.jdx].RisdataReturn[ic.idx] - feedback.Info("insert/update %s", ic.code) + feedback.Info("Processing %s", ic.code) var from, to sql.NullInt64 @@ -265,7 +296,7 @@ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, - UpperType: pgtype.Inclusive, + UpperType: pgtype.Exclusive, Status: pgtype.Present, } @@ -302,7 +333,22 @@ } defer tx.Rollback() - if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, + // Mark old entries of gauge as erased, if applicable + if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, + ic.code.String(), + validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + + // Try to insert gauge entry + var dummy int + err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, @@ -317,42 +363,83 @@ geodref, &dateInfo, source, - ); err != nil { + time.Time(*dr.Lastupdate), + ).Scan(&dummy) + switch { + case err == sql.ErrNoRows: + // Assume constraint conflict, try to update + err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, + ic.code.CountryCode, + ic.code.LoCode, + ic.code.FairwaySection, + ic.code.Orc, + ic.code.Hectometre, + string(*dr.Objname.Loc), + float64(*dr.Lon), float64(*dr.Lat), + from, + to, + float64(*dr.Zeropoint), + geodref, + &dateInfo, + source, + time.Time(*dr.Lastupdate), + ).Scan(&dummy) + switch { + case err2 == sql.ErrNoRows: + feedback.Info("unchanged") + if err3 := tx.Rollback(); err3 != nil { + return nil, err3 + } + unchanged++ + continue + case err2 != nil: + feedback.Warn(handleError(err2).Error()) + if err3 := tx.Rollback(); err3 != nil { + return nil, err3 + } + unchanged++ + continue + default: + feedback.Info("update") + } + + // Remove obsolete reference water levels + var currLevels pgtype.VarcharArray + currLevels.Set([]string{ + string(*dr.Reflevel1code), + string(*dr.Reflevel2code), + string(*dr.Reflevel3code), + }) + rwls, err2 := tx.StmtContext(ctx, + deleteReferenceWaterLevelsStmt).QueryContext(ctx, + ic.code.String(), + &validity, + &currLevels, + ) + if err2 != nil { + return nil, err2 + } + defer rwls.Close() + for rwls.Next() { + var delRef string + if err2 = rwls.Scan(&delRef); err2 != nil { + return nil, err2 + } + feedback.Warn("Removed reference water level %s from %s", + delRef, ic.code) + } + case err != nil: feedback.Warn(handleError(err).Error()) - tx.Rollback() + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ continue + default: + feedback.Info("insert new version") } - // Remove obsolete reference water levels - var currLevels pgtype.VarcharArray - currLevels.Set([]string{ - string(*dr.Reflevel1code), - string(*dr.Reflevel2code), - string(*dr.Reflevel3code), - }) - rwls, err := tx.StmtContext( - ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, - ic.code.CountryCode, - ic.code.LoCode, - ic.code.FairwaySection, - ic.code.Orc, - ic.code.Hectometre, - &currLevels, - ) - if err != nil { - return nil, err - } - defer rwls.Close() - for rwls.Next() { - var delRef string - if err = rwls.Scan(&delRef); err != nil { - return nil, err - } - feedback.Warn("Removed reference water level %s from %s", - delRef, ic.code) - } - - // Insert/update reference water levels + // "Upsert" reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType @@ -388,6 +475,7 @@ ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, + &validity, string(**wl.level), int64(**wl.value), ); err != nil { @@ -402,8 +490,12 @@ } } - feedback.Info("Refreshing gauges took %s.", + feedback.Info("Importing gauges took %s", time.Since(start)) + if unchanged == len(gauges) { + return nil, UnchangedError("All gauges unchanged") + } + return nil, err }