Mercurial > gemma
comparison pkg/imports/bn.go @ 3705:7006b92c0334
Handle updates (vs. historized and new versions) separately.
We need this distinction as updated data currently can not be
reviewed. More precisely: it can not be declined after review, as the
old data is updated in place.
The current exclusion from the review is a workaround and not meant to
be the final solution. Note that there are additional minor problems,
like the fact that the updated data is not counted as changed data for
the import.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 19 Jun 2019 17:00:08 +0200 |
parents | b07511ff859e |
children | aa7bede70b96 |
comparison
equal
deleted
inserted
replaced
3703:b07511ff859e | 3705:7006b92c0334 |
---|---|
9 // Software engineering by Intevation GmbH | 9 // Software engineering by Intevation GmbH |
10 // | 10 // |
11 // Author(s): | 11 // Author(s): |
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
13 // * Tom Gottfried <tom.gottfried@intevation.de> | 13 // * Tom Gottfried <tom.gottfried@intevation.de> |
14 // * Sascha Wilde <sascha.wilde@intevation.de> | |
14 | 15 |
15 package imports | 16 package imports |
16 | 17 |
17 import ( | 18 import ( |
18 "context" | 19 "context" |
85 $12, | 86 $12, |
86 $13, | 87 $13, |
87 $14 | 88 $14 |
88 FROM waterway.gauges | 89 FROM waterway.gauges |
89 WHERE location = isrs_fromText($3) AND validity && $2 | 90 WHERE location = isrs_fromText($3) AND validity && $2 |
90 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET | 91 RETURNING id |
91 gauge_location = EXCLUDED.gauge_location, | 92 ` |
92 gauge_validity = EXCLUDED.gauge_validity, | 93 |
93 objnam = EXCLUDED.objnam, | 94 updateBottleneckSQL = ` |
94 nobjnm = EXCLUDED.nobjnm, | 95 WITH |
95 stretch = EXCLUDED.stretch, | 96 bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))), |
96 area = EXCLUDED.area, | 97 r AS (SELECT isrsrange( |
97 rb = EXCLUDED.rb, | 98 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), |
98 lb = EXCLUDED.lb, | 99 (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r) |
99 responsible_country = EXCLUDED.responsible_country, | 100 UPDATE waterway.bottlenecks SET ( |
100 revisiting_time = EXCLUDED.revisiting_time, | 101 bottleneck_id, |
101 limiting = EXCLUDED.limiting, | 102 validity, |
102 date_info = EXCLUDED.date_info, | 103 gauge_location, |
103 source_organization = EXCLUDED.source_organization | 104 gauge_validity, |
105 objnam, | |
106 nobjnm, | |
107 stretch, | |
108 area, | |
109 rb, | |
110 lb, | |
111 responsible_country, | |
112 revisiting_time, | |
113 limiting, | |
114 date_info, | |
115 source_organization | |
116 ) = ( SELECT | |
117 $2, | |
118 validity * $3, -- intersections with gauge validity ranges | |
119 location, | |
120 validity, | |
121 $5, | |
122 $6, | |
123 (SELECT r FROM r), | |
124 ISRSrange_area( | |
125 ISRSrange_axis((SELECT r FROM r), | |
126 $16), | |
127 (SELECT ST_Collect(CAST(area AS geometry)) | |
128 FROM waterway.waterway_area)), | |
129 $9, | |
130 $10, | |
131 $11, | |
132 $12::smallint, | |
133 $13, | |
134 $14::timestamptz, | |
135 $15 | |
136 FROM waterway.gauges | |
137 WHERE location = isrs_fromText($4) AND validity && $3 ) | |
138 WHERE id=$1 | |
104 RETURNING id | 139 RETURNING id |
105 ` | 140 ` |
106 | 141 |
107 findExactMatchBottleneckSQL = ` | 142 findExactMatchBottleneckSQL = ` |
108 WITH | 143 WITH |
146 FROM waterway.gauges | 181 FROM waterway.gauges |
147 WHERE location = isrs_fromText($3) AND validity && $2 | 182 WHERE location = isrs_fromText($3) AND validity && $2 |
148 ) | 183 ) |
149 ` | 184 ` |
150 | 185 |
186 findMatchBottleneckSQL = ` | |
187 SELECT id FROM waterway.bottlenecks | |
188 WHERE ( | |
189 bottleneck_id, | |
190 validity, | |
191 staging_done | |
192 ) = ( SELECT | |
193 $1, | |
194 validity * $2, -- intersections with gauge validity ranges | |
195 true | |
196 FROM waterway.gauges | |
197 WHERE location = isrs_fromText($3) AND validity && $2 | |
198 ) | |
199 ` | |
151 // Alignment with gauge validity might have generated new entries | 200 // Alignment with gauge validity might have generated new entries |
152 // for the same time range. Thus, remove the old ones | 201 // for the same time range. Thus, remove the old ones |
153 deleteObsoleteBNSQL = ` | 202 deleteObsoleteBNSQL = ` |
154 DELETE FROM waterway.bottlenecks | 203 DELETE FROM waterway.bottlenecks |
155 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) | 204 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) |
278 return nil, err | 327 return nil, err |
279 } | 328 } |
280 | 329 |
281 feedback.Info("Found %d bottlenecks for import", len(bns)) | 330 feedback.Info("Found %d bottlenecks for import", len(bns)) |
282 | 331 |
283 var insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, | 332 var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, |
284 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, | 333 deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt, |
285 trackStmt *sql.Stmt | 334 insertMaterialStmt, trackStmt *sql.Stmt |
286 | 335 |
287 for _, x := range []struct { | 336 for _, x := range []struct { |
288 sql string | 337 sql string |
289 stmt **sql.Stmt | 338 stmt **sql.Stmt |
290 }{ | 339 }{ |
291 {insertBottleneckSQL, &insertStmt}, | 340 {insertBottleneckSQL, &insertStmt}, |
341 {updateBottleneckSQL, &updateStmt}, | |
292 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, | 342 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, |
343 {findMatchBottleneckSQL, &findMatchingBNStmt}, | |
293 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, | 344 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, |
294 {fixBNValiditySQL, &fixValidityStmt}, | 345 {fixBNValiditySQL, &fixValidityStmt}, |
295 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, | 346 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, |
296 {insertBottleneckMaterialSQL, &insertMaterialStmt}, | 347 {insertBottleneckMaterialSQL, &insertMaterialStmt}, |
297 {trackImportSQL, &trackStmt}, | 348 {trackImportSQL, &trackStmt}, |
308 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) | 359 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) |
309 | 360 |
310 for _, bn := range bns { | 361 for _, bn := range bns { |
311 if err := storeBottleneck( | 362 if err := storeBottleneck( |
312 ctx, importID, conn, feedback, bn, &nids, tolerance, | 363 ctx, importID, conn, feedback, bn, &nids, tolerance, |
313 insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, | 364 insertStmt, updateStmt, |
314 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, | 365 findExactMatchingBNStmt, findMatchingBNStmt, |
366 deleteObsoleteBNStmt, fixValidityStmt, | |
367 deleteMaterialStmt, insertMaterialStmt, | |
315 trackStmt); err != nil { | 368 trackStmt); err != nil { |
316 return nil, err | 369 return nil, err |
317 } | 370 } |
318 } | 371 } |
319 if len(nids) == 0 { | 372 if len(nids) == 0 { |
336 conn *sql.Conn, | 389 conn *sql.Conn, |
337 feedback Feedback, | 390 feedback Feedback, |
338 bn *ifbn.BottleNeckType, | 391 bn *ifbn.BottleNeckType, |
339 nids *[]string, | 392 nids *[]string, |
340 tolerance float64, | 393 tolerance float64, |
341 insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, | 394 insertStmt, updateStmt, |
342 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, | 395 findExactMatchingBNStmt, findMatchingBNStmt, |
396 deleteObsoleteBNStmt, fixValidityStmt, | |
397 deleteMaterialStmt, insertMaterialStmt, | |
343 trackStmt *sql.Stmt, | 398 trackStmt *sql.Stmt, |
344 ) error { | 399 ) error { |
345 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) | 400 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) |
346 | 401 |
347 var tfrom, tto pgtype.Timestamptz | 402 var tfrom, tto pgtype.Timestamptz |
462 if bns.Next() { | 517 if bns.Next() { |
463 feedback.Info("unchanged") | 518 feedback.Info("unchanged") |
464 return nil | 519 return nil |
465 } | 520 } |
466 | 521 |
522 // Check if an bottleneck with the same identity | |
523 // (bottleneck_id,validity) already exists: | |
524 // Check if an bottleneck identical to the one we would insert already | |
525 // exists: | |
526 var existing_bn_id *int64 | |
527 err = findMatchingBNStmt.QueryRowContext(ctx, | |
528 bn.Bottleneck_id, | |
529 &validity, | |
530 bn.Fk_g_fid, | |
531 ).Scan(&existing_bn_id) | |
532 switch { | |
533 case err == sql.ErrNoRows: | |
534 existing_bn_id = nil | |
535 case err != nil: | |
536 // This is unexpected and propably a serious error | |
537 return err | |
538 } | |
539 | |
467 tx, err := conn.BeginTx(ctx, nil) | 540 tx, err := conn.BeginTx(ctx, nil) |
468 if err != nil { | 541 if err != nil { |
469 return err | 542 return err |
470 } | 543 } |
471 defer tx.Rollback() | 544 defer tx.Rollback() |
472 | 545 |
473 var bnIds []int64 | 546 var bnIds []int64 |
474 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, | 547 if existing_bn_id != nil { |
475 bn.Bottleneck_id, | 548 feedback.Info("Bottelneck '%s' "+ |
476 &validity, | 549 "with matching validity already exists:"+ |
477 bn.Fk_g_fid, | 550 "UPDATING", bn.Bottleneck_id) |
478 bn.OBJNAM, | 551 // Updating existnig BN data: |
479 bn.NOBJNM, | 552 bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, |
480 bn.From_ISRS, bn.To_ISRS, | 553 existing_bn_id, |
481 rb, | 554 bn.Bottleneck_id, |
482 lb, | 555 &validity, |
483 country, | 556 bn.Fk_g_fid, |
484 revisitingTime, | 557 bn.OBJNAM, |
485 limiting, | 558 bn.NOBJNM, |
486 bn.Date_Info, | 559 bn.From_ISRS, bn.To_ISRS, |
487 bn.Source, | 560 rb, |
488 tolerance, | 561 lb, |
489 ) | 562 country, |
563 revisitingTime, | |
564 limiting, | |
565 bn.Date_Info, | |
566 bn.Source, | |
567 tolerance, | |
568 ) | |
569 } else { | |
570 // New BN data: | |
571 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, | |
572 bn.Bottleneck_id, | |
573 &validity, | |
574 bn.Fk_g_fid, | |
575 bn.OBJNAM, | |
576 bn.NOBJNM, | |
577 bn.From_ISRS, bn.To_ISRS, | |
578 rb, | |
579 lb, | |
580 country, | |
581 revisitingTime, | |
582 limiting, | |
583 bn.Date_Info, | |
584 bn.Source, | |
585 tolerance, | |
586 ) | |
587 } | |
490 if err != nil { | 588 if err != nil { |
491 feedback.Warn(handleError(err).Error()) | 589 feedback.Warn(handleError(err).Error()) |
492 return nil | 590 return nil |
493 } | 591 } |
494 defer bns.Close() | 592 defer bns.Close() |
570 feedback.Warn(handleError(err).Error()) | 668 feedback.Warn(handleError(err).Error()) |
571 return nil | 669 return nil |
572 } | 670 } |
573 } | 671 } |
574 | 672 |
575 for _, nid := range bnIds { | 673 // Only add new BN data to tracking for staging review. |
576 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | 674 // |
577 ctx, importID, "waterway.bottlenecks", nid, | 675 // FIXME: Review for updated bottlenecks is currently not possible, as |
578 ); err != nil { | 676 // the update is done instantly in place. |
579 return err | 677 if existing_bn_id == nil { |
580 } | 678 for _, nid := range bnIds { |
581 } | 679 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( |
680 ctx, importID, "waterway.bottlenecks", nid, | |
681 ); err != nil { | |
682 return err | |
683 } | |
684 } | |
685 } | |
686 | |
582 if err = tx.Commit(); err != nil { | 687 if err = tx.Commit(); err != nil { |
583 return err | 688 return err |
584 } | 689 } |
585 *nids = append(*nids, bn.Bottleneck_id) | 690 // See above... |
691 if existing_bn_id == nil { | |
692 *nids = append(*nids, bn.Bottleneck_id) | |
693 } | |
586 return nil | 694 return nil |
587 } | 695 } |