Mercurial > gemma
comparison pkg/imports/wg.go @ 3302:ec6163c6687d
'Historicise' gauges on import
Gauge data sets will be updated or a new version will be inserted
depending on temporal validity and a timestamp marking the last
update in the RIS-Index of a data set. The trigger on date_info is
removed because the value is actually an attribut coming from the
RIS-Index.
Gauge measurements and predictions are associated to the version with
matching temporal validity. Bottlenecks are always associated to the
actual version of the gauge, although this might change as soon as
bottlenecks are 'historicised', too.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 16 May 2019 18:41:43 +0200 |
parents | e640f51b5a4e |
children | 5932f9574493 |
comparison
equal
deleted
inserted
replaced
3301:6514b943654e | 3302:ec6163c6687d |
---|---|
63 | 63 |
64 // CleanUp does nothing as there is nothing to cleanup with gauges. | 64 // CleanUp does nothing as there is nothing to cleanup with gauges. |
65 func (*WaterwayGauge) CleanUp() error { return nil } | 65 func (*WaterwayGauge) CleanUp() error { return nil } |
66 | 66 |
67 const ( | 67 const ( |
68 deleteReferenceWaterLevelsSQL = ` | 68 eraseGaugeSQL = ` |
69 DELETE FROM waterway.gauges_reference_water_levels | 69 UPDATE waterway.gauges SET |
70 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) | 70 erased = true, |
71 AND depth_reference <> ALL($6) | 71 -- Set enddate of old entry to new startdate in case of overlap: |
72 RETURNING depth_reference | 72 validity = validity - $2 |
73 WHERE isrs_astext(location) = $1 | |
74 AND NOT erased | |
75 -- Don't touch old entry if validity did not change: will be updated | |
76 AND validity <> $2 | |
73 ` | 77 ` |
74 | 78 |
75 insertGaugeSQL = ` | 79 insertGaugeSQL = ` |
76 INSERT INTO waterway.gauges ( | 80 INSERT INTO waterway.gauges ( |
77 location, | 81 location, |
81 applicability_to_km, | 85 applicability_to_km, |
82 validity, | 86 validity, |
83 zero_point, | 87 zero_point, |
84 geodref, | 88 geodref, |
85 date_info, | 89 date_info, |
86 source_organization | 90 source_organization, |
91 lastupdate | |
87 ) VALUES ( | 92 ) VALUES ( |
88 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), | 93 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), |
89 $6, | 94 $6, |
90 ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, | 95 ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, |
91 $9, | 96 $9, |
92 $10, | 97 $10, |
93 $11, | 98 $11, |
94 $12, | 99 $12, |
95 $13, | 100 $13, |
96 $14, | 101 $14, |
97 $15 | 102 $15, |
98 ) ON CONFLICT (location) DO UPDATE SET | 103 $16 |
99 objname = EXCLUDED.objname, | 104 -- Exclusion constraints are not supported as arbiters. |
100 geom = EXCLUDED.geom, | 105 -- Thus we need to DO NOTHING here and use an extra UPDATE statement |
101 applicability_from_km = EXCLUDED.applicability_from_km, | 106 ) ON CONFLICT DO NOTHING |
102 applicability_to_km = EXCLUDED.applicability_to_km, | 107 RETURNING 1 |
103 validity = EXCLUDED.validity, | 108 ` |
104 zero_point = EXCLUDED.zero_point, | 109 updateGaugeSQL = ` |
105 geodref = EXCLUDED.geodref, | 110 UPDATE waterway.gauges SET |
106 date_info = EXCLUDED.date_info, | 111 objname = $6, |
107 source_organization = EXCLUDED.source_organization | 112 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), |
113 applicability_from_km = $9, | |
114 applicability_to_km = $10, | |
115 zero_point = $11, | |
116 geodref = $12, | |
117 date_info = $13, | |
118 source_organization = $14, | |
119 lastupdate = $15 | |
120 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) | |
121 AND NOT erased | |
122 AND $15 > lastupdate | |
123 RETURNING 1 | |
124 ` | |
125 | |
126 deleteReferenceWaterLevelsSQL = ` | |
127 DELETE FROM waterway.gauges_reference_water_levels | |
128 WHERE isrs_astext(location) = $1 | |
129 AND validity = $2 | |
130 AND depth_reference <> ALL($3) | |
131 RETURNING depth_reference | |
108 ` | 132 ` |
109 | 133 |
110 isNtSDepthRefSQL = ` | 134 isNtSDepthRefSQL = ` |
111 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` | 135 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` |
112 | 136 |
113 insertReferenceWaterLevelsSQL = ` | 137 insertReferenceWaterLevelsSQL = ` |
114 INSERT INTO waterway.gauges_reference_water_levels ( | 138 INSERT INTO waterway.gauges_reference_water_levels ( |
115 gauge_id, | 139 location, |
140 validity, | |
116 depth_reference, | 141 depth_reference, |
117 value | 142 value |
118 ) VALUES ( | 143 ) VALUES ( |
119 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), | 144 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), |
120 $6, | 145 $6, |
121 $7 | 146 $7, |
122 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET | 147 $8 |
148 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET | |
123 value = EXCLUDED.value | 149 value = EXCLUDED.value |
124 ` | 150 ` |
125 ) | 151 ) |
126 | 152 |
127 func (wg *WaterwayGauge) Do( | 153 func (wg *WaterwayGauge) Do( |
188 } | 214 } |
189 | 215 |
190 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) | 216 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) |
191 } | 217 } |
192 } | 218 } |
193 feedback.Info("ignored gauges: %d", ignored) | 219 feedback.Info("Ignored gauges: %d", ignored) |
194 feedback.Info("insert/update gauges: %d", len(gauges)) | 220 feedback.Info("Further process %d gauges", len(gauges)) |
195 | 221 |
196 if len(gauges) == 0 { | 222 if len(gauges) == 0 { |
197 return nil, UnchangedError("nothing to do") | 223 return nil, UnchangedError("Nothing to do") |
198 } | 224 } |
199 | 225 |
200 // insert/update the gauges | 226 // insert/update the gauges |
201 var insertStmt, deleteReferenceWaterLevelsStmt, | 227 var eraseGaugeStmt, insertStmt, updateStmt, |
228 deleteReferenceWaterLevelsStmt, | |
202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt | 229 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt |
203 for _, x := range []struct { | 230 for _, x := range []struct { |
204 sql string | 231 sql string |
205 stmt **sql.Stmt | 232 stmt **sql.Stmt |
206 }{ | 233 }{ |
234 {eraseGaugeSQL, &eraseGaugeStmt}, | |
207 {insertGaugeSQL, &insertStmt}, | 235 {insertGaugeSQL, &insertStmt}, |
236 {updateGaugeSQL, &updateStmt}, | |
208 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, | 237 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, |
209 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, | 238 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, |
210 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, | 239 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, |
211 } { | 240 } { |
212 var err error | 241 var err error |
214 return nil, err | 243 return nil, err |
215 } | 244 } |
216 defer (*x.stmt).Close() | 245 defer (*x.stmt).Close() |
217 } | 246 } |
218 | 247 |
248 var unchanged int | |
249 | |
219 for i := range gauges { | 250 for i := range gauges { |
220 ic := &gauges[i] | 251 ic := &gauges[i] |
221 dr := responseData[ic.jdx].RisdataReturn[ic.idx] | 252 dr := responseData[ic.jdx].RisdataReturn[ic.idx] |
222 | 253 |
223 feedback.Info("insert/update %s", ic.code) | 254 feedback.Info("Processing %s", ic.code) |
224 | 255 |
225 var from, to sql.NullInt64 | 256 var from, to sql.NullInt64 |
226 | 257 |
227 if dr.Applicabilityfromkm != nil { | 258 if dr.Applicabilityfromkm != nil { |
228 from = sql.NullInt64{ | 259 from = sql.NullInt64{ |
263 | 294 |
264 validity := pgtype.Tstzrange{ | 295 validity := pgtype.Tstzrange{ |
265 Lower: tfrom, | 296 Lower: tfrom, |
266 Upper: tto, | 297 Upper: tto, |
267 LowerType: pgtype.Inclusive, | 298 LowerType: pgtype.Inclusive, |
268 UpperType: pgtype.Inclusive, | 299 UpperType: pgtype.Exclusive, |
269 Status: pgtype.Present, | 300 Status: pgtype.Present, |
270 } | 301 } |
271 | 302 |
272 if dr.Infodate != nil { | 303 if dr.Infodate != nil { |
273 dateInfo = pgtype.Timestamptz{ | 304 dateInfo = pgtype.Timestamptz{ |
300 if err != nil { | 331 if err != nil { |
301 return nil, err | 332 return nil, err |
302 } | 333 } |
303 defer tx.Rollback() | 334 defer tx.Rollback() |
304 | 335 |
305 if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, | 336 // Mark old entries of gauge as erased, if applicable |
337 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, | |
338 ic.code.String(), | |
339 validity, | |
340 ); err != nil { | |
341 feedback.Warn(handleError(err).Error()) | |
342 if err2 := tx.Rollback(); err2 != nil { | |
343 return nil, err2 | |
344 } | |
345 unchanged++ | |
346 continue | |
347 } | |
348 | |
349 // Try to insert gauge entry | |
350 var dummy int | |
351 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, | |
306 ic.code.CountryCode, | 352 ic.code.CountryCode, |
307 ic.code.LoCode, | 353 ic.code.LoCode, |
308 ic.code.FairwaySection, | 354 ic.code.FairwaySection, |
309 ic.code.Orc, | 355 ic.code.Orc, |
310 ic.code.Hectometre, | 356 ic.code.Hectometre, |
315 &validity, | 361 &validity, |
316 float64(*dr.Zeropoint), | 362 float64(*dr.Zeropoint), |
317 geodref, | 363 geodref, |
318 &dateInfo, | 364 &dateInfo, |
319 source, | 365 source, |
320 ); err != nil { | 366 time.Time(*dr.Lastupdate), |
367 ).Scan(&dummy) | |
368 switch { | |
369 case err == sql.ErrNoRows: | |
370 // Assume constraint conflict, try to update | |
371 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, | |
372 ic.code.CountryCode, | |
373 ic.code.LoCode, | |
374 ic.code.FairwaySection, | |
375 ic.code.Orc, | |
376 ic.code.Hectometre, | |
377 string(*dr.Objname.Loc), | |
378 float64(*dr.Lon), float64(*dr.Lat), | |
379 from, | |
380 to, | |
381 float64(*dr.Zeropoint), | |
382 geodref, | |
383 &dateInfo, | |
384 source, | |
385 time.Time(*dr.Lastupdate), | |
386 ).Scan(&dummy) | |
387 switch { | |
388 case err2 == sql.ErrNoRows: | |
389 feedback.Info("unchanged") | |
390 if err3 := tx.Rollback(); err3 != nil { | |
391 return nil, err3 | |
392 } | |
393 unchanged++ | |
394 continue | |
395 case err2 != nil: | |
396 feedback.Warn(handleError(err2).Error()) | |
397 if err3 := tx.Rollback(); err3 != nil { | |
398 return nil, err3 | |
399 } | |
400 unchanged++ | |
401 continue | |
402 default: | |
403 feedback.Info("update") | |
404 } | |
405 | |
406 // Remove obsolete reference water levels | |
407 var currLevels pgtype.VarcharArray | |
408 currLevels.Set([]string{ | |
409 string(*dr.Reflevel1code), | |
410 string(*dr.Reflevel2code), | |
411 string(*dr.Reflevel3code), | |
412 }) | |
413 rwls, err2 := tx.StmtContext(ctx, | |
414 deleteReferenceWaterLevelsStmt).QueryContext(ctx, | |
415 ic.code.String(), | |
416 &validity, | |
417 &currLevels, | |
418 ) | |
419 if err2 != nil { | |
420 return nil, err2 | |
421 } | |
422 defer rwls.Close() | |
423 for rwls.Next() { | |
424 var delRef string | |
425 if err2 = rwls.Scan(&delRef); err2 != nil { | |
426 return nil, err2 | |
427 } | |
428 feedback.Warn("Removed reference water level %s from %s", | |
429 delRef, ic.code) | |
430 } | |
431 case err != nil: | |
321 feedback.Warn(handleError(err).Error()) | 432 feedback.Warn(handleError(err).Error()) |
322 tx.Rollback() | 433 if err2 := tx.Rollback(); err2 != nil { |
434 return nil, err2 | |
435 } | |
436 unchanged++ | |
323 continue | 437 continue |
324 } | 438 default: |
325 | 439 feedback.Info("insert new version") |
326 // Remove obsolete reference water levels | 440 } |
327 var currLevels pgtype.VarcharArray | 441 |
328 currLevels.Set([]string{ | 442 // "Upsert" reference water levels |
329 string(*dr.Reflevel1code), | |
330 string(*dr.Reflevel2code), | |
331 string(*dr.Reflevel3code), | |
332 }) | |
333 rwls, err := tx.StmtContext( | |
334 ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, | |
335 ic.code.CountryCode, | |
336 ic.code.LoCode, | |
337 ic.code.FairwaySection, | |
338 ic.code.Orc, | |
339 ic.code.Hectometre, | |
340 &currLevels, | |
341 ) | |
342 if err != nil { | |
343 return nil, err | |
344 } | |
345 defer rwls.Close() | |
346 for rwls.Next() { | |
347 var delRef string | |
348 if err = rwls.Scan(&delRef); err != nil { | |
349 return nil, err | |
350 } | |
351 feedback.Warn("Removed reference water level %s from %s", | |
352 delRef, ic.code) | |
353 } | |
354 | |
355 // Insert/update reference water levels | |
356 for _, wl := range []struct { | 443 for _, wl := range []struct { |
357 level **erdms.RisreflevelcodeType | 444 level **erdms.RisreflevelcodeType |
358 value **erdms.RisreflevelvalueType | 445 value **erdms.RisreflevelvalueType |
359 }{ | 446 }{ |
360 {&dr.Reflevel1code, &dr.Reflevel1value}, | 447 {&dr.Reflevel1code, &dr.Reflevel1value}, |
386 ic.code.CountryCode, | 473 ic.code.CountryCode, |
387 ic.code.LoCode, | 474 ic.code.LoCode, |
388 ic.code.FairwaySection, | 475 ic.code.FairwaySection, |
389 ic.code.Orc, | 476 ic.code.Orc, |
390 ic.code.Hectometre, | 477 ic.code.Hectometre, |
478 &validity, | |
391 string(**wl.level), | 479 string(**wl.level), |
392 int64(**wl.value), | 480 int64(**wl.value), |
393 ); err != nil { | 481 ); err != nil { |
394 feedback.Warn(handleError(err).Error()) | 482 feedback.Warn(handleError(err).Error()) |
395 tx.Rollback() | 483 tx.Rollback() |
400 if err = tx.Commit(); err != nil { | 488 if err = tx.Commit(); err != nil { |
401 return nil, err | 489 return nil, err |
402 } | 490 } |
403 } | 491 } |
404 | 492 |
405 feedback.Info("Refreshing gauges took %s.", | 493 feedback.Info("Importing gauges took %s", |
406 time.Since(start)) | 494 time.Since(start)) |
495 | |
496 if unchanged == len(gauges) { | |
497 return nil, UnchangedError("All gauges unchanged") | |
498 } | |
407 | 499 |
408 return nil, err | 500 return nil, err |
409 } | 501 } |