comparison pkg/imports/bn.go @ 3705:7006b92c0334

Handle updates (vs. historized and new versions) separately. We need this distinction as updated data currently can not be reviewed. More precisely: it can not be declined after review, as the old data is updated in place. The current exclusion from the review is a workaround and not meant to be the final solution. Note that there are additional minor problems, like the fact that the updated data is not counted as changed data for the import.
author Sascha Wilde <wilde@intevation.de>
date Wed, 19 Jun 2019 17:00:08 +0200
parents b07511ff859e
children aa7bede70b96
comparison
equal deleted inserted replaced
3703:b07511ff859e 3705:7006b92c0334
9 // Software engineering by Intevation GmbH 9 // Software engineering by Intevation GmbH
10 // 10 //
11 // Author(s): 11 // Author(s):
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13 // * Tom Gottfried <tom.gottfried@intevation.de> 13 // * Tom Gottfried <tom.gottfried@intevation.de>
14 // * Sascha Wilde <sascha.wilde@intevation.de>
14 15
15 package imports 16 package imports
16 17
17 import ( 18 import (
18 "context" 19 "context"
85 $12, 86 $12,
86 $13, 87 $13,
87 $14 88 $14
88 FROM waterway.gauges 89 FROM waterway.gauges
89 WHERE location = isrs_fromText($3) AND validity && $2 90 WHERE location = isrs_fromText($3) AND validity && $2
90 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET 91 RETURNING id
91 gauge_location = EXCLUDED.gauge_location, 92 `
92 gauge_validity = EXCLUDED.gauge_validity, 93
93 objnam = EXCLUDED.objnam, 94 updateBottleneckSQL = `
94 nobjnm = EXCLUDED.nobjnm, 95 WITH
95 stretch = EXCLUDED.stretch, 96 bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))),
96 area = EXCLUDED.area, 97 r AS (SELECT isrsrange(
97 rb = EXCLUDED.rb, 98 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
98 lb = EXCLUDED.lb, 99 (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
99 responsible_country = EXCLUDED.responsible_country, 100 UPDATE waterway.bottlenecks SET (
100 revisiting_time = EXCLUDED.revisiting_time, 101 bottleneck_id,
101 limiting = EXCLUDED.limiting, 102 validity,
102 date_info = EXCLUDED.date_info, 103 gauge_location,
103 source_organization = EXCLUDED.source_organization 104 gauge_validity,
105 objnam,
106 nobjnm,
107 stretch,
108 area,
109 rb,
110 lb,
111 responsible_country,
112 revisiting_time,
113 limiting,
114 date_info,
115 source_organization
116 ) = ( SELECT
117 $2,
118 validity * $3, -- intersections with gauge validity ranges
119 location,
120 validity,
121 $5,
122 $6,
123 (SELECT r FROM r),
124 ISRSrange_area(
125 ISRSrange_axis((SELECT r FROM r),
126 $16),
127 (SELECT ST_Collect(CAST(area AS geometry))
128 FROM waterway.waterway_area)),
129 $9,
130 $10,
131 $11,
132 $12::smallint,
133 $13,
134 $14::timestamptz,
135 $15
136 FROM waterway.gauges
137 WHERE location = isrs_fromText($4) AND validity && $3 )
138 WHERE id=$1
104 RETURNING id 139 RETURNING id
105 ` 140 `
106 141
107 findExactMatchBottleneckSQL = ` 142 findExactMatchBottleneckSQL = `
108 WITH 143 WITH
146 FROM waterway.gauges 181 FROM waterway.gauges
147 WHERE location = isrs_fromText($3) AND validity && $2 182 WHERE location = isrs_fromText($3) AND validity && $2
148 ) 183 )
149 ` 184 `
150 185
186 findMatchBottleneckSQL = `
187 SELECT id FROM waterway.bottlenecks
188 WHERE (
189 bottleneck_id,
190 validity,
191 staging_done
192 ) = ( SELECT
193 $1,
194 validity * $2, -- intersections with gauge validity ranges
195 true
196 FROM waterway.gauges
197 WHERE location = isrs_fromText($3) AND validity && $2
198 )
199 `
151 // Alignment with gauge validity might have generated new entries 200 // Alignment with gauge validity might have generated new entries
152 // for the same time range. Thus, remove the old ones 201 // for the same time range. Thus, remove the old ones
153 deleteObsoleteBNSQL = ` 202 deleteObsoleteBNSQL = `
154 DELETE FROM waterway.bottlenecks 203 DELETE FROM waterway.bottlenecks
155 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3) 204 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
278 return nil, err 327 return nil, err
279 } 328 }
280 329
281 feedback.Info("Found %d bottlenecks for import", len(bns)) 330 feedback.Info("Found %d bottlenecks for import", len(bns))
282 331
283 var insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, 332 var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt,
284 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, 333 deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt,
285 trackStmt *sql.Stmt 334 insertMaterialStmt, trackStmt *sql.Stmt
286 335
287 for _, x := range []struct { 336 for _, x := range []struct {
288 sql string 337 sql string
289 stmt **sql.Stmt 338 stmt **sql.Stmt
290 }{ 339 }{
291 {insertBottleneckSQL, &insertStmt}, 340 {insertBottleneckSQL, &insertStmt},
341 {updateBottleneckSQL, &updateStmt},
292 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, 342 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
343 {findMatchBottleneckSQL, &findMatchingBNStmt},
293 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt}, 344 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
294 {fixBNValiditySQL, &fixValidityStmt}, 345 {fixBNValiditySQL, &fixValidityStmt},
295 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, 346 {deleteBottleneckMaterialSQL, &deleteMaterialStmt},
296 {insertBottleneckMaterialSQL, &insertMaterialStmt}, 347 {insertBottleneckMaterialSQL, &insertMaterialStmt},
297 {trackImportSQL, &trackStmt}, 348 {trackImportSQL, &trackStmt},
308 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) 359 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
309 360
310 for _, bn := range bns { 361 for _, bn := range bns {
311 if err := storeBottleneck( 362 if err := storeBottleneck(
312 ctx, importID, conn, feedback, bn, &nids, tolerance, 363 ctx, importID, conn, feedback, bn, &nids, tolerance,
313 insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, 364 insertStmt, updateStmt,
314 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, 365 findExactMatchingBNStmt, findMatchingBNStmt,
366 deleteObsoleteBNStmt, fixValidityStmt,
367 deleteMaterialStmt, insertMaterialStmt,
315 trackStmt); err != nil { 368 trackStmt); err != nil {
316 return nil, err 369 return nil, err
317 } 370 }
318 } 371 }
319 if len(nids) == 0 { 372 if len(nids) == 0 {
336 conn *sql.Conn, 389 conn *sql.Conn,
337 feedback Feedback, 390 feedback Feedback,
338 bn *ifbn.BottleNeckType, 391 bn *ifbn.BottleNeckType,
339 nids *[]string, 392 nids *[]string,
340 tolerance float64, 393 tolerance float64,
341 insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt, 394 insertStmt, updateStmt,
342 fixValidityStmt, deleteMaterialStmt, insertMaterialStmt, 395 findExactMatchingBNStmt, findMatchingBNStmt,
396 deleteObsoleteBNStmt, fixValidityStmt,
397 deleteMaterialStmt, insertMaterialStmt,
343 trackStmt *sql.Stmt, 398 trackStmt *sql.Stmt,
344 ) error { 399 ) error {
345 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) 400 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
346 401
347 var tfrom, tto pgtype.Timestamptz 402 var tfrom, tto pgtype.Timestamptz
462 if bns.Next() { 517 if bns.Next() {
463 feedback.Info("unchanged") 518 feedback.Info("unchanged")
464 return nil 519 return nil
465 } 520 }
466 521
522 // Check if an bottleneck with the same identity
523 // (bottleneck_id,validity) already exists:
524 // Check if an bottleneck identical to the one we would insert already
525 // exists:
526 var existing_bn_id *int64
527 err = findMatchingBNStmt.QueryRowContext(ctx,
528 bn.Bottleneck_id,
529 &validity,
530 bn.Fk_g_fid,
531 ).Scan(&existing_bn_id)
532 switch {
533 case err == sql.ErrNoRows:
534 existing_bn_id = nil
535 case err != nil:
536 // This is unexpected and propably a serious error
537 return err
538 }
539
467 tx, err := conn.BeginTx(ctx, nil) 540 tx, err := conn.BeginTx(ctx, nil)
468 if err != nil { 541 if err != nil {
469 return err 542 return err
470 } 543 }
471 defer tx.Rollback() 544 defer tx.Rollback()
472 545
473 var bnIds []int64 546 var bnIds []int64
474 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, 547 if existing_bn_id != nil {
475 bn.Bottleneck_id, 548 feedback.Info("Bottelneck '%s' "+
476 &validity, 549 "with matching validity already exists:"+
477 bn.Fk_g_fid, 550 "UPDATING", bn.Bottleneck_id)
478 bn.OBJNAM, 551 // Updating existnig BN data:
479 bn.NOBJNM, 552 bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx,
480 bn.From_ISRS, bn.To_ISRS, 553 existing_bn_id,
481 rb, 554 bn.Bottleneck_id,
482 lb, 555 &validity,
483 country, 556 bn.Fk_g_fid,
484 revisitingTime, 557 bn.OBJNAM,
485 limiting, 558 bn.NOBJNM,
486 bn.Date_Info, 559 bn.From_ISRS, bn.To_ISRS,
487 bn.Source, 560 rb,
488 tolerance, 561 lb,
489 ) 562 country,
563 revisitingTime,
564 limiting,
565 bn.Date_Info,
566 bn.Source,
567 tolerance,
568 )
569 } else {
570 // New BN data:
571 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
572 bn.Bottleneck_id,
573 &validity,
574 bn.Fk_g_fid,
575 bn.OBJNAM,
576 bn.NOBJNM,
577 bn.From_ISRS, bn.To_ISRS,
578 rb,
579 lb,
580 country,
581 revisitingTime,
582 limiting,
583 bn.Date_Info,
584 bn.Source,
585 tolerance,
586 )
587 }
490 if err != nil { 588 if err != nil {
491 feedback.Warn(handleError(err).Error()) 589 feedback.Warn(handleError(err).Error())
492 return nil 590 return nil
493 } 591 }
494 defer bns.Close() 592 defer bns.Close()
570 feedback.Warn(handleError(err).Error()) 668 feedback.Warn(handleError(err).Error())
571 return nil 669 return nil
572 } 670 }
573 } 671 }
574 672
575 for _, nid := range bnIds { 673 // Only add new BN data to tracking for staging review.
576 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 674 //
577 ctx, importID, "waterway.bottlenecks", nid, 675 // FIXME: Review for updated bottlenecks is currently not possible, as
578 ); err != nil { 676 // the update is done instantly in place.
579 return err 677 if existing_bn_id == nil {
580 } 678 for _, nid := range bnIds {
581 } 679 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
680 ctx, importID, "waterway.bottlenecks", nid,
681 ); err != nil {
682 return err
683 }
684 }
685 }
686
582 if err = tx.Commit(); err != nil { 687 if err = tx.Commit(); err != nil {
583 return err 688 return err
584 } 689 }
585 *nids = append(*nids, bn.Bottleneck_id) 690 // See above...
691 if existing_bn_id == nil {
692 *nids = append(*nids, bn.Bottleneck_id)
693 }
586 return nil 694 return nil
587 } 695 }