Mercurial > gemma
comparison pkg/imports/wg.go @ 3402:c04b1409a596
Fix adaptation of gauge temporal validity
The necessity to update validity implies the new value has to cascade
through to referencing columns.
Because measurements falling into the validity of a new version might
have been already imported, a deadlock situation might occur with
the CHECK constraint on gauge_measurements preventing an update
of validity and the exclusion constraint on gauges preventing the
insertion of a new version before the update. Thus, the exclusion
constraint is now deferred and cannot longer be used with ON CONFLICT
in the INSERT statement. Gauge measurements matching the validity of
a new gauge version are now 'moved' to that new version before the
validity of the old version is adapted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 23 May 2019 12:27:14 +0200 |
parents | 0f6b156cff55 |
children | aa3c83fb7018 |
comparison
equal
deleted
inserted
replaced
3401:9f4308edc70a | 3402:c04b1409a596 |
---|---|
70 WHERE NOT erased AND isrs_astext(location) <> ALL($1) | 70 WHERE NOT erased AND isrs_astext(location) <> ALL($1) |
71 RETURNING isrs_astext(location) | 71 RETURNING isrs_astext(location) |
72 ` | 72 ` |
73 | 73 |
74 eraseGaugeSQL = ` | 74 eraseGaugeSQL = ` |
75 UPDATE waterway.gauges SET | 75 WITH upd AS ( |
76 erased = true, | 76 UPDATE waterway.gauges SET |
77 -- Set enddate of old entry to new startdate in case of overlap: | 77 erased = true |
78 validity = validity - $2 | 78 WHERE isrs_astext(location) = $1 |
79 WHERE isrs_astext(location) = $1 | 79 AND NOT erased |
80 AND NOT erased | 80 -- Don't touch old entry if validity did not change: will be updated |
81 -- Don't touch old entry if validity did not change: will be updated | 81 AND validity <> $2 |
82 AND validity <> $2 | 82 RETURNING 1 |
83 ) | |
84 -- Decide whether a new version will be INSERTed | |
85 SELECT EXISTS(SELECT 1 FROM upd) | |
86 OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) | |
83 ` | 87 ` |
84 | 88 |
85 insertGaugeSQL = ` | 89 insertGaugeSQL = ` |
86 INSERT INTO waterway.gauges ( | 90 INSERT INTO waterway.gauges ( |
87 location, | 91 location, |
105 $12, | 109 $12, |
106 $13, | 110 $13, |
107 $14, | 111 $14, |
108 $15, | 112 $15, |
109 $16 | 113 $16 |
110 -- Exclusion constraints are not supported as arbiters. | 114 ) |
111 -- Thus we need to DO NOTHING here and use an extra UPDATE statement | 115 ` |
112 ) ON CONFLICT DO NOTHING | 116 |
113 RETURNING 1 | 117 moveGMSQL = ` |
114 ` | 118 UPDATE waterway.gauge_measurements |
119 -- Associate measurements to matching gauge version | |
120 SET validity = $2 | |
121 WHERE isrs_astext(location) = $1 | |
122 AND measure_date <@ CAST($2 AS tstzrange) | |
123 ` | |
124 | |
125 fixValiditySQL = ` | |
126 UPDATE waterway.gauges SET | |
127 -- Set enddate of old entry to new startdate in case of overlap: | |
128 validity = validity - $2 | |
129 WHERE isrs_astext(location) = $1 | |
130 AND validity && $2 | |
131 AND erased | |
132 ` | |
133 | |
115 updateGaugeSQL = ` | 134 updateGaugeSQL = ` |
116 UPDATE waterway.gauges SET | 135 UPDATE waterway.gauges SET |
117 objname = $6, | 136 objname = $6, |
118 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), | 137 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), |
119 applicability_from_km = $9, | 138 applicability_from_km = $9, |
176 "wtwgag") | 195 "wtwgag") |
177 if err != nil { | 196 if err != nil { |
178 return nil, err | 197 return nil, err |
179 } | 198 } |
180 | 199 |
181 var eraseGaugeStmt, insertStmt, updateStmt, | 200 var eraseGaugeStmt, insertStmt, moveGMStmt, fixValidityStmt, updateStmt, |
182 deleteReferenceWaterLevelsStmt, | 201 deleteReferenceWaterLevelsStmt, |
183 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt | 202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt |
184 for _, x := range []struct { | 203 for _, x := range []struct { |
185 sql string | 204 sql string |
186 stmt **sql.Stmt | 205 stmt **sql.Stmt |
187 }{ | 206 }{ |
188 {eraseGaugeSQL, &eraseGaugeStmt}, | 207 {eraseGaugeSQL, &eraseGaugeStmt}, |
189 {insertGaugeSQL, &insertStmt}, | 208 {insertGaugeSQL, &insertStmt}, |
209 {moveGMSQL, &moveGMStmt}, | |
210 {fixValiditySQL, &fixValidityStmt}, | |
190 {updateGaugeSQL, &updateStmt}, | 211 {updateGaugeSQL, &updateStmt}, |
191 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, | 212 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, |
192 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, | 213 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, |
193 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, | 214 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, |
194 } { | 215 } { |
292 if err != nil { | 313 if err != nil { |
293 return nil, err | 314 return nil, err |
294 } | 315 } |
295 defer tx.Rollback() | 316 defer tx.Rollback() |
296 | 317 |
297 // Mark old entries of gauge as erased, if applicable | 318 // Mark old entry of gauge as erased, if applicable |
298 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, | 319 var isNew bool |
320 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, | |
299 code.String(), | 321 code.String(), |
300 validity, | 322 validity, |
301 ); err != nil { | 323 ).Scan(&isNew) |
324 switch { | |
325 case err != nil: | |
302 feedback.Warn(handleError(err).Error()) | 326 feedback.Warn(handleError(err).Error()) |
303 if err2 := tx.Rollback(); err2 != nil { | 327 if err2 := tx.Rollback(); err2 != nil { |
304 return nil, err2 | 328 return nil, err2 |
305 } | 329 } |
306 unchanged++ | 330 unchanged++ |
307 continue | 331 continue |
308 } | 332 case isNew: |
309 | 333 // insert gauge version entry |
310 // Try to insert gauge entry | 334 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, |
311 var dummy int | 335 code.CountryCode, |
312 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, | 336 code.LoCode, |
313 code.CountryCode, | 337 code.FairwaySection, |
314 code.LoCode, | 338 code.Orc, |
315 code.FairwaySection, | 339 code.Hectometre, |
316 code.Orc, | 340 dr.Objname.Loc, |
317 code.Hectometre, | 341 dr.Lon, dr.Lat, |
318 dr.Objname.Loc, | 342 from, |
319 dr.Lon, dr.Lat, | 343 to, |
320 from, | 344 &validity, |
321 to, | 345 dr.Zeropoint, |
322 &validity, | 346 geodref, |
323 dr.Zeropoint, | 347 &dateInfo, |
324 geodref, | 348 source, |
325 &dateInfo, | 349 time.Time(*dr.Lastupdate), |
326 source, | 350 ); err != nil { |
327 time.Time(*dr.Lastupdate), | 351 feedback.Warn(handleError(err).Error()) |
328 ).Scan(&dummy) | 352 if err2 := tx.Rollback(); err2 != nil { |
329 switch { | 353 return nil, err2 |
330 case err == sql.ErrNoRows: | 354 } |
331 // Assume constraint conflict, try to update | 355 unchanged++ |
356 continue | |
357 } | |
358 // Move gauge measurements to new matching gauge version, | |
359 // if applicable | |
360 if _, err = tx.StmtContext(ctx, moveGMStmt).ExecContext(ctx, | |
361 code.String(), | |
362 &validity, | |
363 ); err != nil { | |
364 feedback.Warn(handleError(err).Error()) | |
365 if err2 := tx.Rollback(); err2 != nil { | |
366 return nil, err2 | |
367 } | |
368 unchanged++ | |
369 continue | |
370 } | |
371 // Set end of validity of old version to start of new version | |
372 // in case of overlap | |
373 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( | |
374 ctx, | |
375 code.String(), | |
376 &validity, | |
377 ); err != nil { | |
378 feedback.Warn(handleError(err).Error()) | |
379 if err2 := tx.Rollback(); err2 != nil { | |
380 return nil, err2 | |
381 } | |
382 unchanged++ | |
383 continue | |
384 } | |
385 feedback.Info("insert new version") | |
386 case !isNew: | |
387 // try to update | |
388 var dummy int | |
332 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, | 389 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, |
333 code.CountryCode, | 390 code.CountryCode, |
334 code.LoCode, | 391 code.LoCode, |
335 code.FairwaySection, | 392 code.FairwaySection, |
336 code.Orc, | 393 code.Orc, |
390 delRef, code) | 447 delRef, code) |
391 } | 448 } |
392 if err := rwls.Err(); err != nil { | 449 if err := rwls.Err(); err != nil { |
393 return nil, err | 450 return nil, err |
394 } | 451 } |
395 case err != nil: | |
396 feedback.Warn(handleError(err).Error()) | |
397 if err2 := tx.Rollback(); err2 != nil { | |
398 return nil, err2 | |
399 } | |
400 unchanged++ | |
401 continue | |
402 default: | |
403 feedback.Info("insert new version") | |
404 } | 452 } |
405 | 453 |
406 // "Upsert" reference water levels | 454 // "Upsert" reference water levels |
407 for _, wl := range []struct { | 455 for _, wl := range []struct { |
408 level **erdms.RisreflevelcodeType | 456 level **erdms.RisreflevelcodeType |