comparison pkg/imports/wg.go @ 3402:c04b1409a596

Fix adaptation of gauge temporal validity The necessity to update validity implies the new value has to cascade through to referencing columns. Because measurements falling into the validity of a new version might have been already imported, a deadlock situation might occur with the CHECK constraint on gauge_measurements preventing an update of validity and the exclusion constraint on gauges preventing the insertion of a new version before the update. Thus, the exclusion constraint is now deferred and cannot longer be used with ON CONFLICT in the INSERT statement. Gauge measurements matching the validity of a new gauge version are now 'moved' to that new version before the validity of the old version is adapted.
author Tom Gottfried <tom@intevation.de>
date Thu, 23 May 2019 12:27:14 +0200
parents 0f6b156cff55
children aa3c83fb7018
comparison
equal deleted inserted replaced
3401:9f4308edc70a 3402:c04b1409a596
70 WHERE NOT erased AND isrs_astext(location) <> ALL($1) 70 WHERE NOT erased AND isrs_astext(location) <> ALL($1)
71 RETURNING isrs_astext(location) 71 RETURNING isrs_astext(location)
72 ` 72 `
73 73
74 eraseGaugeSQL = ` 74 eraseGaugeSQL = `
75 UPDATE waterway.gauges SET 75 WITH upd AS (
76 erased = true, 76 UPDATE waterway.gauges SET
77 -- Set enddate of old entry to new startdate in case of overlap: 77 erased = true
78 validity = validity - $2 78 WHERE isrs_astext(location) = $1
79 WHERE isrs_astext(location) = $1 79 AND NOT erased
80 AND NOT erased 80 -- Don't touch old entry if validity did not change: will be updated
81 -- Don't touch old entry if validity did not change: will be updated 81 AND validity <> $2
82 AND validity <> $2 82 RETURNING 1
83 )
84 -- Decide whether a new version will be INSERTed
85 SELECT EXISTS(SELECT 1 FROM upd)
86 OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
83 ` 87 `
84 88
85 insertGaugeSQL = ` 89 insertGaugeSQL = `
86 INSERT INTO waterway.gauges ( 90 INSERT INTO waterway.gauges (
87 location, 91 location,
105 $12, 109 $12,
106 $13, 110 $13,
107 $14, 111 $14,
108 $15, 112 $15,
109 $16 113 $16
110 -- Exclusion constraints are not supported as arbiters. 114 )
111 -- Thus we need to DO NOTHING here and use an extra UPDATE statement 115 `
112 ) ON CONFLICT DO NOTHING 116
113 RETURNING 1 117 moveGMSQL = `
114 ` 118 UPDATE waterway.gauge_measurements
119 -- Associate measurements to matching gauge version
120 SET validity = $2
121 WHERE isrs_astext(location) = $1
122 AND measure_date <@ CAST($2 AS tstzrange)
123 `
124
125 fixValiditySQL = `
126 UPDATE waterway.gauges SET
127 -- Set enddate of old entry to new startdate in case of overlap:
128 validity = validity - $2
129 WHERE isrs_astext(location) = $1
130 AND validity && $2
131 AND erased
132 `
133
115 updateGaugeSQL = ` 134 updateGaugeSQL = `
116 UPDATE waterway.gauges SET 135 UPDATE waterway.gauges SET
117 objname = $6, 136 objname = $6,
118 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), 137 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
119 applicability_from_km = $9, 138 applicability_from_km = $9,
176 "wtwgag") 195 "wtwgag")
177 if err != nil { 196 if err != nil {
178 return nil, err 197 return nil, err
179 } 198 }
180 199
181 var eraseGaugeStmt, insertStmt, updateStmt, 200 var eraseGaugeStmt, insertStmt, moveGMStmt, fixValidityStmt, updateStmt,
182 deleteReferenceWaterLevelsStmt, 201 deleteReferenceWaterLevelsStmt,
183 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt 202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
184 for _, x := range []struct { 203 for _, x := range []struct {
185 sql string 204 sql string
186 stmt **sql.Stmt 205 stmt **sql.Stmt
187 }{ 206 }{
188 {eraseGaugeSQL, &eraseGaugeStmt}, 207 {eraseGaugeSQL, &eraseGaugeStmt},
189 {insertGaugeSQL, &insertStmt}, 208 {insertGaugeSQL, &insertStmt},
209 {moveGMSQL, &moveGMStmt},
210 {fixValiditySQL, &fixValidityStmt},
190 {updateGaugeSQL, &updateStmt}, 211 {updateGaugeSQL, &updateStmt},
191 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, 212 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
192 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, 213 {isNtSDepthRefSQL, &isNtSDepthRefStmt},
193 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, 214 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
194 } { 215 } {
292 if err != nil { 313 if err != nil {
293 return nil, err 314 return nil, err
294 } 315 }
295 defer tx.Rollback() 316 defer tx.Rollback()
296 317
297 // Mark old entries of gauge as erased, if applicable 318 // Mark old entry of gauge as erased, if applicable
298 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, 319 var isNew bool
320 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
299 code.String(), 321 code.String(),
300 validity, 322 validity,
301 ); err != nil { 323 ).Scan(&isNew)
324 switch {
325 case err != nil:
302 feedback.Warn(handleError(err).Error()) 326 feedback.Warn(handleError(err).Error())
303 if err2 := tx.Rollback(); err2 != nil { 327 if err2 := tx.Rollback(); err2 != nil {
304 return nil, err2 328 return nil, err2
305 } 329 }
306 unchanged++ 330 unchanged++
307 continue 331 continue
308 } 332 case isNew:
309 333 // insert gauge version entry
310 // Try to insert gauge entry 334 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
311 var dummy int 335 code.CountryCode,
312 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, 336 code.LoCode,
313 code.CountryCode, 337 code.FairwaySection,
314 code.LoCode, 338 code.Orc,
315 code.FairwaySection, 339 code.Hectometre,
316 code.Orc, 340 dr.Objname.Loc,
317 code.Hectometre, 341 dr.Lon, dr.Lat,
318 dr.Objname.Loc, 342 from,
319 dr.Lon, dr.Lat, 343 to,
320 from, 344 &validity,
321 to, 345 dr.Zeropoint,
322 &validity, 346 geodref,
323 dr.Zeropoint, 347 &dateInfo,
324 geodref, 348 source,
325 &dateInfo, 349 time.Time(*dr.Lastupdate),
326 source, 350 ); err != nil {
327 time.Time(*dr.Lastupdate), 351 feedback.Warn(handleError(err).Error())
328 ).Scan(&dummy) 352 if err2 := tx.Rollback(); err2 != nil {
329 switch { 353 return nil, err2
330 case err == sql.ErrNoRows: 354 }
331 // Assume constraint conflict, try to update 355 unchanged++
356 continue
357 }
358 // Move gauge measurements to new matching gauge version,
359 // if applicable
360 if _, err = tx.StmtContext(ctx, moveGMStmt).ExecContext(ctx,
361 code.String(),
362 &validity,
363 ); err != nil {
364 feedback.Warn(handleError(err).Error())
365 if err2 := tx.Rollback(); err2 != nil {
366 return nil, err2
367 }
368 unchanged++
369 continue
370 }
371 // Set end of validity of old version to start of new version
372 // in case of overlap
373 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
374 ctx,
375 code.String(),
376 &validity,
377 ); err != nil {
378 feedback.Warn(handleError(err).Error())
379 if err2 := tx.Rollback(); err2 != nil {
380 return nil, err2
381 }
382 unchanged++
383 continue
384 }
385 feedback.Info("insert new version")
386 case !isNew:
387 // try to update
388 var dummy int
332 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, 389 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
333 code.CountryCode, 390 code.CountryCode,
334 code.LoCode, 391 code.LoCode,
335 code.FairwaySection, 392 code.FairwaySection,
336 code.Orc, 393 code.Orc,
390 delRef, code) 447 delRef, code)
391 } 448 }
392 if err := rwls.Err(); err != nil { 449 if err := rwls.Err(); err != nil {
393 return nil, err 450 return nil, err
394 } 451 }
395 case err != nil:
396 feedback.Warn(handleError(err).Error())
397 if err2 := tx.Rollback(); err2 != nil {
398 return nil, err2
399 }
400 unchanged++
401 continue
402 default:
403 feedback.Info("insert new version")
404 } 452 }
405 453
406 // "Upsert" reference water levels 454 // "Upsert" reference water levels
407 for _, wl := range []struct { 455 for _, wl := range []struct {
408 level **erdms.RisreflevelcodeType 456 level **erdms.RisreflevelcodeType