comparison pkg/imports/wg.go @ 3302:ec6163c6687d

'Historicise' gauges on import Gauge data sets will be updated or a new version will be inserted depending on temporal validity and a timestamp marking the last update in the RIS-Index of a data set. The trigger on date_info is removed because the value is actually an attribut coming from the RIS-Index. Gauge measurements and predictions are associated to the version with matching temporal validity. Bottlenecks are always associated to the actual version of the gauge, although this might change as soon as bottlenecks are 'historicised', too.
author Tom Gottfried <tom@intevation.de>
date Thu, 16 May 2019 18:41:43 +0200
parents e640f51b5a4e
children 5932f9574493
comparison
equal deleted inserted replaced
3301:6514b943654e 3302:ec6163c6687d
63 63
64 // CleanUp does nothing as there is nothing to cleanup with gauges. 64 // CleanUp does nothing as there is nothing to cleanup with gauges.
65 func (*WaterwayGauge) CleanUp() error { return nil } 65 func (*WaterwayGauge) CleanUp() error { return nil }
66 66
67 const ( 67 const (
68 deleteReferenceWaterLevelsSQL = ` 68 eraseGaugeSQL = `
69 DELETE FROM waterway.gauges_reference_water_levels 69 UPDATE waterway.gauges SET
70 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) 70 erased = true,
71 AND depth_reference <> ALL($6) 71 -- Set enddate of old entry to new startdate in case of overlap:
72 RETURNING depth_reference 72 validity = validity - $2
73 WHERE isrs_astext(location) = $1
74 AND NOT erased
75 -- Don't touch old entry if validity did not change: will be updated
76 AND validity <> $2
73 ` 77 `
74 78
75 insertGaugeSQL = ` 79 insertGaugeSQL = `
76 INSERT INTO waterway.gauges ( 80 INSERT INTO waterway.gauges (
77 location, 81 location,
81 applicability_to_km, 85 applicability_to_km,
82 validity, 86 validity,
83 zero_point, 87 zero_point,
84 geodref, 88 geodref,
85 date_info, 89 date_info,
86 source_organization 90 source_organization,
91 lastupdate
87 ) VALUES ( 92 ) VALUES (
88 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), 93 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
89 $6, 94 $6,
90 ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, 95 ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
91 $9, 96 $9,
92 $10, 97 $10,
93 $11, 98 $11,
94 $12, 99 $12,
95 $13, 100 $13,
96 $14, 101 $14,
97 $15 102 $15,
98 ) ON CONFLICT (location) DO UPDATE SET 103 $16
99 objname = EXCLUDED.objname, 104 -- Exclusion constraints are not supported as arbiters.
100 geom = EXCLUDED.geom, 105 -- Thus we need to DO NOTHING here and use an extra UPDATE statement
101 applicability_from_km = EXCLUDED.applicability_from_km, 106 ) ON CONFLICT DO NOTHING
102 applicability_to_km = EXCLUDED.applicability_to_km, 107 RETURNING 1
103 validity = EXCLUDED.validity, 108 `
104 zero_point = EXCLUDED.zero_point, 109 updateGaugeSQL = `
105 geodref = EXCLUDED.geodref, 110 UPDATE waterway.gauges SET
106 date_info = EXCLUDED.date_info, 111 objname = $6,
107 source_organization = EXCLUDED.source_organization 112 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
113 applicability_from_km = $9,
114 applicability_to_km = $10,
115 zero_point = $11,
116 geodref = $12,
117 date_info = $13,
118 source_organization = $14,
119 lastupdate = $15
120 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
121 AND NOT erased
122 AND $15 > lastupdate
123 RETURNING 1
124 `
125
126 deleteReferenceWaterLevelsSQL = `
127 DELETE FROM waterway.gauges_reference_water_levels
128 WHERE isrs_astext(location) = $1
129 AND validity = $2
130 AND depth_reference <> ALL($3)
131 RETURNING depth_reference
108 ` 132 `
109 133
110 isNtSDepthRefSQL = ` 134 isNtSDepthRefSQL = `
111 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` 135 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`
112 136
113 insertReferenceWaterLevelsSQL = ` 137 insertReferenceWaterLevelsSQL = `
114 INSERT INTO waterway.gauges_reference_water_levels ( 138 INSERT INTO waterway.gauges_reference_water_levels (
115 gauge_id, 139 location,
140 validity,
116 depth_reference, 141 depth_reference,
117 value 142 value
118 ) VALUES ( 143 ) VALUES (
119 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), 144 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
120 $6, 145 $6,
121 $7 146 $7,
122 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET 147 $8
148 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
123 value = EXCLUDED.value 149 value = EXCLUDED.value
124 ` 150 `
125 ) 151 )
126 152
127 func (wg *WaterwayGauge) Do( 153 func (wg *WaterwayGauge) Do(
188 } 214 }
189 215
190 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) 216 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
191 } 217 }
192 } 218 }
193 feedback.Info("ignored gauges: %d", ignored) 219 feedback.Info("Ignored gauges: %d", ignored)
194 feedback.Info("insert/update gauges: %d", len(gauges)) 220 feedback.Info("Further process %d gauges", len(gauges))
195 221
196 if len(gauges) == 0 { 222 if len(gauges) == 0 {
197 return nil, UnchangedError("nothing to do") 223 return nil, UnchangedError("Nothing to do")
198 } 224 }
199 225
200 // insert/update the gauges 226 // insert/update the gauges
201 var insertStmt, deleteReferenceWaterLevelsStmt, 227 var eraseGaugeStmt, insertStmt, updateStmt,
228 deleteReferenceWaterLevelsStmt,
202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt 229 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
203 for _, x := range []struct { 230 for _, x := range []struct {
204 sql string 231 sql string
205 stmt **sql.Stmt 232 stmt **sql.Stmt
206 }{ 233 }{
234 {eraseGaugeSQL, &eraseGaugeStmt},
207 {insertGaugeSQL, &insertStmt}, 235 {insertGaugeSQL, &insertStmt},
236 {updateGaugeSQL, &updateStmt},
208 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, 237 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
209 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, 238 {isNtSDepthRefSQL, &isNtSDepthRefStmt},
210 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, 239 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
211 } { 240 } {
212 var err error 241 var err error
214 return nil, err 243 return nil, err
215 } 244 }
216 defer (*x.stmt).Close() 245 defer (*x.stmt).Close()
217 } 246 }
218 247
248 var unchanged int
249
219 for i := range gauges { 250 for i := range gauges {
220 ic := &gauges[i] 251 ic := &gauges[i]
221 dr := responseData[ic.jdx].RisdataReturn[ic.idx] 252 dr := responseData[ic.jdx].RisdataReturn[ic.idx]
222 253
223 feedback.Info("insert/update %s", ic.code) 254 feedback.Info("Processing %s", ic.code)
224 255
225 var from, to sql.NullInt64 256 var from, to sql.NullInt64
226 257
227 if dr.Applicabilityfromkm != nil { 258 if dr.Applicabilityfromkm != nil {
228 from = sql.NullInt64{ 259 from = sql.NullInt64{
263 294
264 validity := pgtype.Tstzrange{ 295 validity := pgtype.Tstzrange{
265 Lower: tfrom, 296 Lower: tfrom,
266 Upper: tto, 297 Upper: tto,
267 LowerType: pgtype.Inclusive, 298 LowerType: pgtype.Inclusive,
268 UpperType: pgtype.Inclusive, 299 UpperType: pgtype.Exclusive,
269 Status: pgtype.Present, 300 Status: pgtype.Present,
270 } 301 }
271 302
272 if dr.Infodate != nil { 303 if dr.Infodate != nil {
273 dateInfo = pgtype.Timestamptz{ 304 dateInfo = pgtype.Timestamptz{
300 if err != nil { 331 if err != nil {
301 return nil, err 332 return nil, err
302 } 333 }
303 defer tx.Rollback() 334 defer tx.Rollback()
304 335
305 if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, 336 // Mark old entries of gauge as erased, if applicable
337 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
338 ic.code.String(),
339 validity,
340 ); err != nil {
341 feedback.Warn(handleError(err).Error())
342 if err2 := tx.Rollback(); err2 != nil {
343 return nil, err2
344 }
345 unchanged++
346 continue
347 }
348
349 // Try to insert gauge entry
350 var dummy int
351 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
306 ic.code.CountryCode, 352 ic.code.CountryCode,
307 ic.code.LoCode, 353 ic.code.LoCode,
308 ic.code.FairwaySection, 354 ic.code.FairwaySection,
309 ic.code.Orc, 355 ic.code.Orc,
310 ic.code.Hectometre, 356 ic.code.Hectometre,
315 &validity, 361 &validity,
316 float64(*dr.Zeropoint), 362 float64(*dr.Zeropoint),
317 geodref, 363 geodref,
318 &dateInfo, 364 &dateInfo,
319 source, 365 source,
320 ); err != nil { 366 time.Time(*dr.Lastupdate),
367 ).Scan(&dummy)
368 switch {
369 case err == sql.ErrNoRows:
370 // Assume constraint conflict, try to update
371 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
372 ic.code.CountryCode,
373 ic.code.LoCode,
374 ic.code.FairwaySection,
375 ic.code.Orc,
376 ic.code.Hectometre,
377 string(*dr.Objname.Loc),
378 float64(*dr.Lon), float64(*dr.Lat),
379 from,
380 to,
381 float64(*dr.Zeropoint),
382 geodref,
383 &dateInfo,
384 source,
385 time.Time(*dr.Lastupdate),
386 ).Scan(&dummy)
387 switch {
388 case err2 == sql.ErrNoRows:
389 feedback.Info("unchanged")
390 if err3 := tx.Rollback(); err3 != nil {
391 return nil, err3
392 }
393 unchanged++
394 continue
395 case err2 != nil:
396 feedback.Warn(handleError(err2).Error())
397 if err3 := tx.Rollback(); err3 != nil {
398 return nil, err3
399 }
400 unchanged++
401 continue
402 default:
403 feedback.Info("update")
404 }
405
406 // Remove obsolete reference water levels
407 var currLevels pgtype.VarcharArray
408 currLevels.Set([]string{
409 string(*dr.Reflevel1code),
410 string(*dr.Reflevel2code),
411 string(*dr.Reflevel3code),
412 })
413 rwls, err2 := tx.StmtContext(ctx,
414 deleteReferenceWaterLevelsStmt).QueryContext(ctx,
415 ic.code.String(),
416 &validity,
417 &currLevels,
418 )
419 if err2 != nil {
420 return nil, err2
421 }
422 defer rwls.Close()
423 for rwls.Next() {
424 var delRef string
425 if err2 = rwls.Scan(&delRef); err2 != nil {
426 return nil, err2
427 }
428 feedback.Warn("Removed reference water level %s from %s",
429 delRef, ic.code)
430 }
431 case err != nil:
321 feedback.Warn(handleError(err).Error()) 432 feedback.Warn(handleError(err).Error())
322 tx.Rollback() 433 if err2 := tx.Rollback(); err2 != nil {
434 return nil, err2
435 }
436 unchanged++
323 continue 437 continue
324 } 438 default:
325 439 feedback.Info("insert new version")
326 // Remove obsolete reference water levels 440 }
327 var currLevels pgtype.VarcharArray 441
328 currLevels.Set([]string{ 442 // "Upsert" reference water levels
329 string(*dr.Reflevel1code),
330 string(*dr.Reflevel2code),
331 string(*dr.Reflevel3code),
332 })
333 rwls, err := tx.StmtContext(
334 ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx,
335 ic.code.CountryCode,
336 ic.code.LoCode,
337 ic.code.FairwaySection,
338 ic.code.Orc,
339 ic.code.Hectometre,
340 &currLevels,
341 )
342 if err != nil {
343 return nil, err
344 }
345 defer rwls.Close()
346 for rwls.Next() {
347 var delRef string
348 if err = rwls.Scan(&delRef); err != nil {
349 return nil, err
350 }
351 feedback.Warn("Removed reference water level %s from %s",
352 delRef, ic.code)
353 }
354
355 // Insert/update reference water levels
356 for _, wl := range []struct { 443 for _, wl := range []struct {
357 level **erdms.RisreflevelcodeType 444 level **erdms.RisreflevelcodeType
358 value **erdms.RisreflevelvalueType 445 value **erdms.RisreflevelvalueType
359 }{ 446 }{
360 {&dr.Reflevel1code, &dr.Reflevel1value}, 447 {&dr.Reflevel1code, &dr.Reflevel1value},
386 ic.code.CountryCode, 473 ic.code.CountryCode,
387 ic.code.LoCode, 474 ic.code.LoCode,
388 ic.code.FairwaySection, 475 ic.code.FairwaySection,
389 ic.code.Orc, 476 ic.code.Orc,
390 ic.code.Hectometre, 477 ic.code.Hectometre,
478 &validity,
391 string(**wl.level), 479 string(**wl.level),
392 int64(**wl.value), 480 int64(**wl.value),
393 ); err != nil { 481 ); err != nil {
394 feedback.Warn(handleError(err).Error()) 482 feedback.Warn(handleError(err).Error())
395 tx.Rollback() 483 tx.Rollback()
400 if err = tx.Commit(); err != nil { 488 if err = tx.Commit(); err != nil {
401 return nil, err 489 return nil, err
402 } 490 }
403 } 491 }
404 492
405 feedback.Info("Refreshing gauges took %s.", 493 feedback.Info("Importing gauges took %s",
406 time.Since(start)) 494 time.Since(start))
495
496 if unchanged == len(gauges) {
497 return nil, UnchangedError("All gauges unchanged")
498 }
407 499
408 return nil, err 500 return nil, err
409 } 501 }