Mercurial > gemma
comparison pkg/imports/bn.go @ 3666:db87f34805fb
Align bottleneck validity at gauges
Ensuring the validity of a bottleneck version is always contained
by the validity of the referenced gauge version allows to reliably
determine matching reference values of the gauge at a point in time.
Since this implies that a bottleneck version might be cut into more
than one time ranges, the concept of having only one non-erased
version is no longer applicable and replaced by using the 'current'
version of a bottleneck.
Fairway availability data are always kept with the 'current'
bottleneck version to have them at hand alltogether for analyses
over longer time ranges.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Sat, 15 Jun 2019 14:36:50 +0200 |
parents | 29ef6d41e4af |
children | 0227670dedd5 |
comparison
equal
deleted
inserted
replaced
3665:29ef6d41e4af | 3666:db87f34805fb |
---|---|
41 | 41 |
42 // BNJobKind is the import queue type identifier. | 42 // BNJobKind is the import queue type identifier. |
43 const BNJobKind JobKind = "bn" | 43 const BNJobKind JobKind = "bn" |
44 | 44 |
45 const ( | 45 const ( |
46 hasBottleneckSQL = ` | |
47 WITH upd AS ( | |
48 UPDATE waterway.bottlenecks SET | |
49 erased = true | |
50 WHERE bottleneck_id = $1 | |
51 AND NOT erased | |
52 -- Don't touch old entry if new validity contains old: will be updated | |
53 AND NOT validity <@ $2 | |
54 RETURNING 1 | |
55 ) | |
56 -- Decide whether a new version will be INSERTed | |
57 SELECT EXISTS(SELECT 1 FROM upd) | |
58 OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1) | |
59 ` | |
60 | |
61 insertBottleneckSQL = ` | 46 insertBottleneckSQL = ` |
62 WITH | 47 WITH |
63 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), | 48 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), |
64 r AS (SELECT isrsrange( | 49 r AS (SELECT isrsrange( |
65 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), | 50 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), |
78 responsible_country, | 63 responsible_country, |
79 revisiting_time, | 64 revisiting_time, |
80 limiting, | 65 limiting, |
81 date_info, | 66 date_info, |
82 source_organization | 67 source_organization |
83 ) VALUES ( | 68 ) SELECT |
84 $1, | 69 $1, |
85 $2, | 70 validity * $2, -- intersections with gauge validity ranges |
86 isrs_fromText($3), | 71 location, |
87 COALESCE( | 72 validity, |
88 (SELECT validity FROM waterway.gauges | |
89 WHERE location = isrs_fromText($3) | |
90 AND validity @> lower(CAST($2 AS tstzrange))), | |
91 tstzrange(NULL, NULL)), | |
92 $4, | 73 $4, |
93 $5, | 74 $5, |
94 (SELECT r FROM r), | 75 (SELECT r FROM r), |
95 ISRSrange_area( | 76 ISRSrange_area( |
96 ISRSrange_axis((SELECT r FROM r), | 77 ISRSrange_axis((SELECT r FROM r), |
102 $10, | 83 $10, |
103 $11, | 84 $11, |
104 $12, | 85 $12, |
105 $13, | 86 $13, |
106 $14 | 87 $14 |
107 ) | 88 FROM waterway.gauges |
108 RETURNING id` | 89 WHERE location = isrs_fromText($3) AND validity && $2 |
90 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET | |
91 gauge_location = EXCLUDED.gauge_location, | |
92 gauge_validity = EXCLUDED.gauge_validity, | |
93 objnam = EXCLUDED.objnam, | |
94 nobjnm = EXCLUDED.nobjnm, | |
95 stretch = EXCLUDED.stretch, | |
96 area = EXCLUDED.area, | |
97 rb = EXCLUDED.rb, | |
98 lb = EXCLUDED.lb, | |
99 responsible_country = EXCLUDED.responsible_country, | |
100 revisiting_time = EXCLUDED.revisiting_time, | |
101 limiting = EXCLUDED.limiting, | |
102 date_info = EXCLUDED.date_info, | |
103 source_organization = EXCLUDED.source_organization | |
104 RETURNING id | |
105 ` | |
106 | |
107 // Alignment with gauge validity might have generated new entries | |
108 // for the same time range. Thus, remove the old ones | |
109 deleteObsoleteBNSQL = ` | |
110 DELETE FROM waterway.bottlenecks | |
111 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) | |
112 ` | |
109 | 113 |
110 fixBNValiditySQL = ` | 114 fixBNValiditySQL = ` |
111 UPDATE waterway.bottlenecks SET | 115 UPDATE waterway.bottlenecks SET |
112 -- Set enddate of old entry to new startdate in case of overlap: | 116 -- Set enddate of old entry to new startdate in case of overlap: |
113 validity = validity - $2 | 117 validity = validity - $2 |
114 WHERE bottleneck_id = $1 | 118 WHERE bottleneck_id = $1 |
115 AND validity && $2 | 119 AND validity && $2 AND NOT validity <@ $2 |
116 AND erased | |
117 ` | 120 ` |
118 | 121 |
119 updateBottleneckSQL = ` | |
120 WITH | |
121 bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))), | |
122 r AS (SELECT isrsrange( | |
123 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), | |
124 (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) | |
125 UPDATE waterway.bottlenecks b SET | |
126 gauge_location = isrs_fromtext($2), | |
127 gauge_validity = COALESCE( | |
128 (SELECT validity FROM waterway.gauges g | |
129 WHERE g.location = isrs_fromText($2) | |
130 AND g.validity @> lower(b.validity)), | |
131 tstzrange(NULL, NULL)), | |
132 objnam = $3, | |
133 nobjnm = $4, | |
134 stretch = (SELECT r FROM r), | |
135 area = ISRSrange_area( | |
136 ISRSrange_axis((SELECT r FROM r), $14), | |
137 (SELECT ST_Collect(CAST(area AS geometry)) | |
138 FROM waterway.waterway_area)), | |
139 rb = $7, | |
140 lb = $8, | |
141 responsible_country = $9, | |
142 revisiting_time = $10, | |
143 limiting = $11, | |
144 date_info = $12, | |
145 source_organization = $13, | |
146 validity = $15 | |
147 WHERE bottleneck_id = $1 | |
148 AND NOT erased | |
149 AND $12 > date_info | |
150 RETURNING id | |
151 ` | |
152 | |
153 deleteBottleneckMaterialSQL = ` | 122 deleteBottleneckMaterialSQL = ` |
154 DELETE FROM waterway.bottlenecks_riverbed_materials | 123 WITH del AS ( |
155 WHERE bottleneck_id = $1 | 124 DELETE FROM waterway.bottlenecks_riverbed_materials |
156 AND riverbed <> ALL($2) | 125 WHERE bottleneck_id = ANY($1) |
157 RETURNING riverbed | 126 AND riverbed <> ALL($2) |
127 RETURNING riverbed) | |
128 SELECT DISTINCT riverbed FROM del | |
158 ` | 129 ` |
159 | 130 |
160 insertBottleneckMaterialSQL = ` | 131 insertBottleneckMaterialSQL = ` |
161 INSERT INTO waterway.bottlenecks_riverbed_materials ( | 132 INSERT INTO waterway.bottlenecks_riverbed_materials ( |
162 bottleneck_id, | 133 bottleneck_id, |
163 riverbed | 134 riverbed |
164 ) VALUES ( | 135 ) SELECT * |
165 $1, | 136 FROM unnest(CAST($1 AS int[])) AS bns, |
166 $2 | 137 unnest(CAST($2 AS varchar[])) AS materials |
167 ) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING | 138 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING |
168 ` | 139 ` |
169 ) | 140 ) |
170 | 141 |
171 type bnJobCreator struct{} | 142 type bnJobCreator struct{} |
172 | 143 |
263 return nil, err | 234 return nil, err |
264 } | 235 } |
265 | 236 |
266 feedback.Info("Found %d bottlenecks for import", len(bns)) | 237 feedback.Info("Found %d bottlenecks for import", len(bns)) |
267 | 238 |
268 var hasStmt, insertStmt, fixValidityStmt, updateStmt, | 239 var insertStmt, deleteObsoleteBNStmt, fixValidityStmt, |
269 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt | 240 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt |
270 | 241 |
271 for _, x := range []struct { | 242 for _, x := range []struct { |
272 sql string | 243 sql string |
273 stmt **sql.Stmt | 244 stmt **sql.Stmt |
274 }{ | 245 }{ |
275 {hasBottleneckSQL, &hasStmt}, | |
276 {insertBottleneckSQL, &insertStmt}, | 246 {insertBottleneckSQL, &insertStmt}, |
247 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, | |
277 {fixBNValiditySQL, &fixValidityStmt}, | 248 {fixBNValiditySQL, &fixValidityStmt}, |
278 {updateBottleneckSQL, &updateStmt}, | |
279 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, | 249 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, |
280 {insertBottleneckMaterialSQL, &insertMaterialStmt}, | 250 {insertBottleneckMaterialSQL, &insertMaterialStmt}, |
281 {trackImportSQL, &trackStmt}, | 251 {trackImportSQL, &trackStmt}, |
282 } { | 252 } { |
283 var err error | 253 var err error |
292 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) | 262 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) |
293 | 263 |
294 for _, bn := range bns { | 264 for _, bn := range bns { |
295 if err := storeBottleneck( | 265 if err := storeBottleneck( |
296 ctx, importID, conn, feedback, bn, &nids, tolerance, | 266 ctx, importID, conn, feedback, bn, &nids, tolerance, |
297 hasStmt, insertStmt, fixValidityStmt, updateStmt, | 267 insertStmt, deleteObsoleteBNStmt, fixValidityStmt, |
298 deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { | 268 deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { |
299 return nil, err | 269 return nil, err |
300 } | 270 } |
301 } | 271 } |
302 if len(nids) == 0 { | 272 if len(nids) == 0 { |
319 conn *sql.Conn, | 289 conn *sql.Conn, |
320 feedback Feedback, | 290 feedback Feedback, |
321 bn *ifbn.BottleNeckType, | 291 bn *ifbn.BottleNeckType, |
322 nids *[]string, | 292 nids *[]string, |
323 tolerance float64, | 293 tolerance float64, |
324 hasStmt, insertStmt, fixValidityStmt, updateStmt, | 294 insertStmt, deleteObsoleteBNStmt, fixValidityStmt, |
325 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, | 295 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, |
326 ) error { | 296 ) error { |
327 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) | 297 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) |
328 | 298 |
329 if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { | 299 if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { |
406 if err != nil { | 376 if err != nil { |
407 return err | 377 return err |
408 } | 378 } |
409 defer tx.Rollback() | 379 defer tx.Rollback() |
410 | 380 |
411 var isNew bool | 381 var bnIds []int64 |
412 var nid int64 | 382 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, |
413 err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx, | |
414 bn.Bottleneck_id, | 383 bn.Bottleneck_id, |
415 validity, | 384 &validity, |
416 ).Scan(&isNew) | 385 bn.Fk_g_fid, |
417 switch { | 386 bn.OBJNAM, |
418 case err != nil: | 387 bn.NOBJNM, |
388 bn.From_ISRS, bn.To_ISRS, | |
389 rb, | |
390 lb, | |
391 country, | |
392 revisitingTime, | |
393 limiting, | |
394 bn.Date_Info, | |
395 bn.Source, | |
396 tolerance, | |
397 ) | |
398 if err != nil { | |
399 feedback.Warn(handleError(err).Error()) | |
400 return nil | |
401 } | |
402 defer bns.Close() | |
403 for bns.Next() { | |
404 var nid int64 | |
405 if err := bns.Scan(&nid); err != nil { | |
406 return err | |
407 } | |
408 bnIds = append(bnIds, nid) | |
409 } | |
410 if err := bns.Err(); err != nil { | |
411 return err | |
412 } | |
413 if len(bnIds) == 0 { | |
414 feedback.Warn( | |
415 "No gauge matching '%s' or given time available", bn.Fk_g_fid) | |
416 return nil | |
417 } | |
418 | |
419 // Remove obsolete bottleneck version entries | |
420 var pgBnIds pgtype.Int8Array | |
421 pgBnIds.Set(bnIds) | |
422 if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, | |
423 bn.Bottleneck_id, | |
424 &validity, | |
425 &pgBnIds, | |
426 ); err != nil { | |
419 feedback.Warn(handleError(err).Error()) | 427 feedback.Warn(handleError(err).Error()) |
420 if err2 := tx.Rollback(); err2 != nil { | 428 if err2 := tx.Rollback(); err2 != nil { |
421 return err2 | 429 return err2 |
422 } | 430 } |
423 return nil | 431 return nil |
424 case isNew: | |
425 err = tx.StmtContext(ctx, insertStmt).QueryRowContext( | |
426 ctx, | |
427 bn.Bottleneck_id, | |
428 &validity, | |
429 bn.Fk_g_fid, | |
430 bn.OBJNAM, | |
431 bn.NOBJNM, | |
432 bn.From_ISRS, bn.To_ISRS, | |
433 rb, | |
434 lb, | |
435 country, | |
436 revisitingTime, | |
437 limiting, | |
438 bn.Date_Info, | |
439 bn.Source, | |
440 tolerance, | |
441 ).Scan(&nid) | |
442 if err != nil { | |
443 feedback.Warn(handleError(err).Error()) | |
444 return nil | |
445 } | |
446 feedback.Info("insert new version") | |
447 case !isNew: | |
448 // try to update | |
449 err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, | |
450 bn.Bottleneck_id, | |
451 bn.Fk_g_fid, | |
452 bn.OBJNAM, | |
453 bn.NOBJNM, | |
454 bn.From_ISRS, bn.To_ISRS, | |
455 rb, | |
456 lb, | |
457 country, | |
458 revisitingTime, | |
459 limiting, | |
460 bn.Date_Info, | |
461 bn.Source, | |
462 tolerance, | |
463 &validity, | |
464 ).Scan(&nid) | |
465 switch { | |
466 case err == sql.ErrNoRows: | |
467 feedback.Info("unchanged") | |
468 if err := tx.Rollback(); err != nil { | |
469 return err | |
470 } | |
471 return nil | |
472 case err != nil: | |
473 feedback.Warn(handleError(err).Error()) | |
474 if err := tx.Rollback(); err != nil { | |
475 return err | |
476 } | |
477 return nil | |
478 default: | |
479 feedback.Info("update") | |
480 | |
481 // Remove obsolete riverbed materials | |
482 var pgMaterials pgtype.VarcharArray | |
483 pgMaterials.Set(materials) | |
484 mtls, err := tx.StmtContext(ctx, | |
485 deleteMaterialStmt).QueryContext(ctx, | |
486 nid, | |
487 &pgMaterials, | |
488 ) | |
489 if err != nil { | |
490 return err | |
491 } | |
492 defer mtls.Close() | |
493 for mtls.Next() { | |
494 var delMat string | |
495 if err := mtls.Scan(&delMat); err != nil { | |
496 return err | |
497 } | |
498 feedback.Warn("Removed riverbed material %s", delMat) | |
499 } | |
500 if err := mtls.Err(); err != nil { | |
501 return err | |
502 } | |
503 } | |
504 } | 432 } |
505 | 433 |
506 // Set end of validity of old version to start of new version | 434 // Set end of validity of old version to start of new version |
507 // in case of overlap | 435 // in case of overlap |
508 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, | 436 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, |
514 return err2 | 442 return err2 |
515 } | 443 } |
516 return nil | 444 return nil |
517 } | 445 } |
518 | 446 |
519 // Insert riverbed materials | |
520 if materials != nil { | 447 if materials != nil { |
521 for _, mat := range materials { | 448 // Remove obsolete riverbed materials |
522 if _, err := tx.StmtContext(ctx, | 449 var pgMaterials pgtype.VarcharArray |
523 insertMaterialStmt).ExecContext( | 450 pgMaterials.Set(materials) |
524 ctx, nid, mat); err != nil { | 451 mtls, err := tx.StmtContext(ctx, |
525 feedback.Warn("Failed to insert riverbed material '%s'", mat) | 452 deleteMaterialStmt).QueryContext(ctx, |
526 feedback.Warn(handleError(err).Error()) | 453 &pgBnIds, |
454 &pgMaterials, | |
455 ) | |
456 if err != nil { | |
457 return err | |
458 } | |
459 defer mtls.Close() | |
460 for mtls.Next() { | |
461 var delMat string | |
462 if err := mtls.Scan(&delMat); err != nil { | |
463 return err | |
527 } | 464 } |
528 } | 465 feedback.Warn("Removed riverbed material %s", delMat) |
529 } | 466 } |
530 | 467 if err := mtls.Err(); err != nil { |
531 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | 468 return err |
532 ctx, importID, "waterway.bottlenecks", nid, | 469 } |
533 ); err != nil { | 470 |
534 return err | 471 // Insert riverbed materials |
472 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, | |
473 &pgBnIds, | |
474 &pgMaterials, | |
475 ); err != nil { | |
476 feedback.Warn("Failed to insert riverbed materials") | |
477 feedback.Warn(handleError(err).Error()) | |
478 return nil | |
479 } | |
480 } | |
481 | |
482 for _, nid := range bnIds { | |
483 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | |
484 ctx, importID, "waterway.bottlenecks", nid, | |
485 ); err != nil { | |
486 return err | |
487 } | |
535 } | 488 } |
536 if err = tx.Commit(); err != nil { | 489 if err = tx.Commit(); err != nil { |
537 return err | 490 return err |
538 } | 491 } |
539 *nids = append(*nids, bn.Bottleneck_id) | 492 *nids = append(*nids, bn.Bottleneck_id) |