comparison pkg/imports/bn.go @ 4113:f39d20427e89 request_hist_bns

WIP: Reworked bottleneck import: - all available bottlenecks from the service are loaded (including historic data) - kudos to Tom Gottfried - Bottlenecks which are identical to an exiting (non staged) one are recognized as "unchanged" and ignored. - All remaining Bottlenecks are inserted into Staging area - On "staging done" all existing bottlenecks with conflicting (bottleneck_id,validity) are deleted (replaced by the new bottlenecks). Known Issues: - Sometimes the identification of unchanged bottlenecks does not work, needs debugging. - The front end does not handle historic data correctly (we need to define what "correctly" should be). - Save points should be used to distinguish ignored errors from fatal ones.
author Sascha Wilde <wilde@intevation.de>
date Wed, 31 Jul 2019 17:04:31 +0200
parents 861760675497
children 0cf0008070db
comparison
equal deleted inserted replaced
4111:692aba3e8b85 4113:f39d20427e89
88 $14 88 $14
89 ) 89 )
90 RETURNING id 90 RETURNING id
91 ` 91 `
92 92
93 updateBottleneckSQL = `
94 WITH
95 bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))),
96 r AS (SELECT isrsrange(
97 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
98 (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
99 UPDATE waterway.bottlenecks SET (
100 bottleneck_id,
101 validity,
102 gauge_location,
103 objnam,
104 nobjnm,
105 stretch,
106 area,
107 rb,
108 lb,
109 responsible_country,
110 revisiting_time,
111 limiting,
112 date_info,
113 source_organization
114 ) = (
115 $2,
116 $3::tstzrange,
117 isrs_fromText($4),
118 $5,
119 $6,
120 (SELECT r FROM r),
121 ISRSrange_area(
122 ISRSrange_axis((SELECT r FROM r),
123 $16),
124 (SELECT ST_Collect(CAST(area AS geometry))
125 FROM waterway.waterway_area)),
126 $9,
127 $10,
128 $11,
129 $12::smallint,
130 $13,
131 $14::timestamptz,
132 $15
133 )
134 WHERE id=$1
135 RETURNING id
136 `
137
138 findExactMatchBottleneckSQL = ` 93 findExactMatchBottleneckSQL = `
139 WITH 94 WITH
140 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), 95 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
141 r AS (SELECT isrsrange( 96 r AS (SELECT isrsrange(
142 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), 97 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
173 $14, 128 $14,
174 true 129 true
175 ) 130 )
176 ` 131 `
177 132
178 findMatchBottleneckSQL = ` 133 findIntersectingBottleneckSQL = `
179 SELECT id FROM waterway.bottlenecks 134 SELECT id FROM waterway.bottlenecks
180 WHERE ( 135 WHERE (bottleneck_id, staging_done) = ($1, true)
181 bottleneck_id, 136 AND $2::tstzrange && validity
182 validity,
183 staging_done
184 ) = (
185 $1,
186 $2::tstzrange,
187 true
188 )
189 `
190 // FIXME: Is this still neede wtih the new simplified historization
191 // model? My intuition is: no it isn't and should be removed, but we
192 // should double check before doing so... [sw]
193 //
194 // Alignment with gauge validity might have generated new entries
195 // for the same time range. Thus, remove the old ones
196 deleteObsoleteBNSQL = `
197 DELETE FROM waterway.bottlenecks
198 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
199 `
200
201 fixBNValiditySQL = `
202 UPDATE waterway.bottlenecks SET
203 -- Set enddate of old entry to new startdate in case of overlap:
204 validity = validity - $2
205 WHERE bottleneck_id = $1
206 AND validity && $2 AND NOT validity <@ $2
207 `
208
209 deleteBottleneckMaterialSQL = `
210 WITH del AS (
211 DELETE FROM waterway.bottlenecks_riverbed_materials
212 WHERE bottleneck_id = ANY($1)
213 AND riverbed <> ALL($2)
214 RETURNING riverbed)
215 SELECT DISTINCT riverbed FROM del
216 ` 137 `
217 138
218 insertBottleneckMaterialSQL = ` 139 insertBottleneckMaterialSQL = `
219 INSERT INTO waterway.bottlenecks_riverbed_materials ( 140 INSERT INTO waterway.bottlenecks_riverbed_materials (
220 bottleneck_id, 141 bottleneck_id,
222 ) SELECT * 143 ) SELECT *
223 FROM unnest(CAST($1 AS int[])) AS bns, 144 FROM unnest(CAST($1 AS int[])) AS bns,
224 unnest(CAST($2 AS varchar[])) AS materials 145 unnest(CAST($2 AS varchar[])) AS materials
225 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING 146 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
226 ` 147 `
148
149 bnStageDoneDeleteSQL = `
150 DELETE FROM waterway.bottlenecks WHERE id IN (
151 SELECT key
152 FROM import.track_imports
153 WHERE import_id = $1
154 AND relation = 'waterway.bottlenecks'::regclass
155 AND deletion
156 )`
157
158 bnStageDoneSQL = `
159 UPDATE waterway.bottlenecks SET staging_done = true
160 WHERE id IN (
161 SELECT key
162 FROM import.track_imports
163 WHERE import_id = $1
164 AND relation = 'waterway.bottlenecks'::regclass
165 AND NOT deletion
166 )`
227 ) 167 )
228 168
229 type bnJobCreator struct{} 169 type bnJobCreator struct{}
230 170
231 func init() { 171 func init() {
242 return [2][]string{ 182 return [2][]string{
243 {"bottlenecks", "bottlenecks_riverbed_materials"}, 183 {"bottlenecks", "bottlenecks_riverbed_materials"},
244 {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"}, 184 {"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
245 } 185 }
246 } 186 }
247
248 const (
249 bnStageDoneSQL = `
250 UPDATE waterway.bottlenecks SET staging_done = true
251 WHERE id IN (
252 SELECT key from import.track_imports
253 WHERE import_id = $1 AND
254 relation = 'waterway.bottlenecks'::regclass)`
255 )
256 187
257 // StageDone moves the imported bottleneck out of the staging area. 188 // StageDone moves the imported bottleneck out of the staging area.
258 func (bnJobCreator) StageDone( 189 func (bnJobCreator) StageDone(
259 ctx context.Context, 190 ctx context.Context,
260 tx *sql.Tx, 191 tx *sql.Tx,
261 id int64, 192 id int64,
262 ) error { 193 ) error {
263 _, err := tx.ExecContext(ctx, bnStageDoneSQL, id) 194 _, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
195 if err == nil {
196 _, err = tx.ExecContext(ctx, bnStageDoneSQL, id)
197 }
264 return err 198 return err
265 } 199 }
266 200
267 // CleanUp of a bottleneck import is a NOP. 201 // CleanUp of a bottleneck import is a NOP.
268 func (*Bottleneck) CleanUp() error { return nil } 202 func (*Bottleneck) CleanUp() error { return nil }
323 return nil, err 257 return nil, err
324 } 258 }
325 259
326 feedback.Info("Found %d bottlenecks for import", len(bns)) 260 feedback.Info("Found %d bottlenecks for import", len(bns))
327 261
328 var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt, 262 var insertStmt, findExactMatchingBNStmt, findIntersectingBNStmt,
329 deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt,
330 insertMaterialStmt, trackStmt *sql.Stmt 263 insertMaterialStmt, trackStmt *sql.Stmt
331 264
332 for _, x := range []struct { 265 for _, x := range []struct {
333 sql string 266 sql string
334 stmt **sql.Stmt 267 stmt **sql.Stmt
335 }{ 268 }{
336 {insertBottleneckSQL, &insertStmt}, 269 {insertBottleneckSQL, &insertStmt},
337 {updateBottleneckSQL, &updateStmt},
338 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt}, 270 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
339 {findMatchBottleneckSQL, &findMatchingBNStmt}, 271 {findIntersectingBottleneckSQL, &findIntersectingBNStmt},
340 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
341 {fixBNValiditySQL, &fixValidityStmt},
342 {deleteBottleneckMaterialSQL, &deleteMaterialStmt},
343 {insertBottleneckMaterialSQL, &insertMaterialStmt}, 272 {insertBottleneckMaterialSQL, &insertMaterialStmt},
344 {trackImportSQL, &trackStmt}, 273 {trackImportDeletionSQL, &trackStmt},
345 } { 274 } {
346 var err error 275 var err error
347 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { 276 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
348 return nil, err 277 return nil, err
349 } 278 }
350 defer (*x.stmt).Close() 279 defer (*x.stmt).Close()
351 } 280 }
352 281
353 var nids []string 282 var nids []string
283 seenOldBnIds := make(map[int64]bool)
354 284
355 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) 285 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
356 286
357 for _, bn := range bns { 287 for _, bn := range bns {
358 if err := storeBottleneck( 288 if err := storeBottleneck(
359 ctx, importID, conn, feedback, bn, &nids, tolerance, 289 ctx, importID, conn, feedback, bn,
360 insertStmt, updateStmt, 290 &nids, &seenOldBnIds, tolerance,
361 findExactMatchingBNStmt, findMatchingBNStmt, 291 insertStmt,
362 deleteObsoleteBNStmt, fixValidityStmt, 292 findExactMatchingBNStmt,
363 deleteMaterialStmt, insertMaterialStmt, 293 findIntersectingBNStmt,
294 insertMaterialStmt,
364 trackStmt); err != nil { 295 trackStmt); err != nil {
365 return nil, err 296 return nil, err
366 } 297 }
367 } 298 }
368 if len(nids) == 0 { 299 if len(nids) == 0 {
384 importID int64, 315 importID int64,
385 conn *sql.Conn, 316 conn *sql.Conn,
386 feedback Feedback, 317 feedback Feedback,
387 bn *ifbn.BottleNeckType, 318 bn *ifbn.BottleNeckType,
388 nids *[]string, 319 nids *[]string,
320 seenOldBnIds *map[int64]bool,
389 tolerance float64, 321 tolerance float64,
390 insertStmt, updateStmt, 322 insertStmt,
391 findExactMatchingBNStmt, findMatchingBNStmt, 323 findExactMatchingBNStmt, findIntersectingBNStmt,
392 deleteObsoleteBNStmt, fixValidityStmt, 324 insertMaterialStmt,
393 deleteMaterialStmt, insertMaterialStmt,
394 trackStmt *sql.Stmt, 325 trackStmt *sql.Stmt,
395 ) error { 326 ) error {
396 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) 327 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
397 328
398 var tfrom, tto pgtype.Timestamptz 329 var tfrom, tto pgtype.Timestamptz
408 // start date, and maybe even inspecting the details of changed 339 // start date, and maybe even inspecting the details of changed
409 // data for hints wether an update or historisation with a new 340 // data for hints wether an update or historisation with a new
410 // version is advisable. 341 // version is advisable.
411 // 342 //
412 // Never the less, the current solution "just works" for the 343 // Never the less, the current solution "just works" for the
413 // rtime being... -- sw 344 // time being and reflects the upstream systems... -- sw
414 feedback.Warn("No validity information, assuming infinite validity.") 345 feedback.Warn("No validity information, assuming infinite validity.")
415 tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)) 346 tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
416 uBound = pgtype.Unbounded 347 uBound = pgtype.Unbounded
417 } else { 348 } else {
418 const ( 349 const (
508 revisitingTime, 439 revisitingTime,
509 limiting, 440 limiting,
510 bn.Date_Info, 441 bn.Date_Info,
511 bn.Source, 442 bn.Source,
512 ) 443 )
513
514 if err != nil { 444 if err != nil {
515 return err 445 return err
516 } 446 }
517 defer bns.Close() 447 defer bns.Close()
448 // We could check if the materials are also matching -- but per
449 // specification the Date_Info would hvae to change on that kind of
450 // change anyway. So actualy we ar alreayd checking more in dpth than
451 // required.
518 if bns.Next() { 452 if bns.Next() {
519 feedback.Info("unchanged") 453 feedback.Info("unchanged")
520 return nil 454 return nil
521 } 455 }
522 456
523 // Additional Information, only of interest when we change something, so 457 // Additional Information, only of interest when we change something, so
524 // it can be used for debugging if something goes wrong... 458 // it can be used for debugging if something goes wrong...
525 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) 459 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
526 460
527 // Check if an bottleneck with the same identity
528 // (bottleneck_id,validity) already exists:
529 // Check if an bottleneck identical to the one we would insert already
530 // exists:
531 var existing_bn_id *int64
532 err = findMatchingBNStmt.QueryRowContext(ctx,
533 bn.Bottleneck_id,
534 &validity,
535 ).Scan(&existing_bn_id)
536 switch {
537 case err == sql.ErrNoRows:
538 existing_bn_id = nil
539 case err != nil:
540 // This is unexpected and propably a serious error
541 return err
542 }
543
544 tx, err := conn.BeginTx(ctx, nil) 461 tx, err := conn.BeginTx(ctx, nil)
545 if err != nil { 462 if err != nil {
546 return err 463 return err
547 } 464 }
548 defer tx.Rollback() 465 defer tx.Rollback()
549 466
467 // Check if the new bottleneck intersects with the validity of existing
468 // for the same bottleneck_id we consider this an update and mark the
469 // old data for deletion.
470 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext(
471 ctx, bn.Bottleneck_id, &validity,
472 )
473 if err != nil {
474 return err
475 }
476 defer bns.Close()
477
478 // Mark old intersecting bottleneck data for deletion. Don't worry about
479 // materials, they will be deleted via cascading.
480 var oldBnIds []int64
481 for bns.Next() {
482 var oldID int64
483 err := bns.Scan(&oldID)
484 if err != nil {
485 return err
486 }
487 oldBnIds = append(oldBnIds, oldID)
488 }
489
490 switch {
491 case len(oldBnIds) == 1:
492 feedback.Info("Bottelneck '%s' "+
493 "with intersecting validity already exists: "+
494 "UPDATING", bn.Bottleneck_id)
495 case len(oldBnIds) > 1:
496 // This case is unexpected and should only happen when historic
497 // data in the bottleneck service was changed subsequently...
498 // We handle it gracefully anyway, but warn.
499 feedback.Warn("More than one Bottelneck '%s' "+
500 "with intersecting validity already exists: "+
501 "REPLACING all of them!", bn.Bottleneck_id)
502 }
503
504 for _, oldID := range oldBnIds {
505 // It is possible, that two new bottlenecks intersect with the
506 // same old noe, therefor we have to handle duplicates in
507 // oldBnIds.
508 if !(*seenOldBnIds)[oldID] {
509 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
510 ctx, importID, "waterway.bottlenecks", oldID, true,
511 ); err != nil {
512 return err
513 }
514 (*seenOldBnIds)[oldID] = true
515 }
516 }
517
550 var bnIds []int64 518 var bnIds []int64
551 if existing_bn_id != nil { 519 // Add new BN data:
552 feedback.Info("Bottelneck '%s' "+ 520 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
553 "with matching validity already exists:"+ 521 bn.Bottleneck_id,
554 "UPDATING", bn.Bottleneck_id) 522 &validity,
555 // Updating existnig BN data: 523 bn.Fk_g_fid,
556 bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx, 524 bn.OBJNAM,
557 existing_bn_id, 525 bn.NOBJNM,
558 bn.Bottleneck_id, 526 bn.From_ISRS, bn.To_ISRS,
559 &validity, 527 rb,
560 bn.Fk_g_fid, 528 lb,
561 bn.OBJNAM, 529 country,
562 bn.NOBJNM, 530 revisitingTime,
563 bn.From_ISRS, bn.To_ISRS, 531 limiting,
564 rb, 532 bn.Date_Info,
565 lb, 533 bn.Source,
566 country, 534 tolerance)
567 revisitingTime,
568 limiting,
569 bn.Date_Info,
570 bn.Source,
571 tolerance,
572 )
573 } else {
574 // New BN data:
575 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
576 bn.Bottleneck_id,
577 &validity,
578 bn.Fk_g_fid,
579 bn.OBJNAM,
580 bn.NOBJNM,
581 bn.From_ISRS, bn.To_ISRS,
582 rb,
583 lb,
584 country,
585 revisitingTime,
586 limiting,
587 bn.Date_Info,
588 bn.Source,
589 tolerance,
590 )
591 }
592 if err != nil { 535 if err != nil {
593 feedback.Warn(pgxutils.ReadableError{err}.Error()) 536 feedback.Warn(pgxutils.ReadableError{err}.Error())
594 return nil 537 return nil
595 } 538 }
596 defer bns.Close() 539 defer bns.Close()
609 feedback.Warn( 552 feedback.Warn(
610 "No gauge matching '%s' or given time available", bn.Fk_g_fid) 553 "No gauge matching '%s' or given time available", bn.Fk_g_fid)
611 return nil 554 return nil
612 } 555 }
613 556
614 // Remove obsolete bottleneck version entries 557 // Add new materials
615 var pgBnIds pgtype.Int8Array 558 var (
559 pgBnIds pgtype.Int8Array
560 pgMaterials pgtype.VarcharArray
561 )
616 pgBnIds.Set(bnIds) 562 pgBnIds.Set(bnIds)
617 if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx, 563 pgMaterials.Set(materials)
618 bn.Bottleneck_id,
619 &validity,
620 &pgBnIds,
621 ); err != nil {
622 feedback.Warn(pgxutils.ReadableError{err}.Error())
623 if err2 := tx.Rollback(); err2 != nil {
624 return err2
625 }
626 return nil
627 }
628
629 // Set end of validity of old version to start of new version
630 // in case of overlap
631 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
632 bn.Bottleneck_id,
633 validity,
634 ); err != nil {
635 feedback.Warn(pgxutils.ReadableError{err}.Error())
636 if err2 := tx.Rollback(); err2 != nil {
637 return err2
638 }
639 return nil
640 }
641 564
642 if materials != nil { 565 if materials != nil {
643 // Remove obsolete riverbed materials
644 var pgMaterials pgtype.VarcharArray
645 pgMaterials.Set(materials)
646 mtls, err := tx.StmtContext(ctx,
647 deleteMaterialStmt).QueryContext(ctx,
648 &pgBnIds,
649 &pgMaterials,
650 )
651 if err != nil {
652 return err
653 }
654 defer mtls.Close()
655 for mtls.Next() {
656 var delMat string
657 if err := mtls.Scan(&delMat); err != nil {
658 return err
659 }
660 feedback.Warn("Removed riverbed material %s", delMat)
661 }
662 if err := mtls.Err(); err != nil {
663 return err
664 }
665
666 // Insert riverbed materials 566 // Insert riverbed materials
667 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, 567 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
668 &pgBnIds, 568 &pgBnIds,
669 &pgMaterials, 569 &pgMaterials,
670 ); err != nil { 570 ); err != nil {
673 return nil 573 return nil
674 } 574 }
675 } 575 }
676 576
677 // Only add new BN data to tracking for staging review. 577 // Only add new BN data to tracking for staging review.
678 // 578 for _, nid := range bnIds {
679 // FIXME: Review for updated bottlenecks is currently not possible, as 579 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
680 // the update is done instantly in place. 580 ctx, importID, "waterway.bottlenecks", nid, false,
681 if existing_bn_id == nil { 581 ); err != nil {
682 for _, nid := range bnIds { 582 return err
683 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
684 ctx, importID, "waterway.bottlenecks", nid,
685 ); err != nil {
686 return err
687 }
688 } 583 }
689 } 584 }
690 585
691 if err = tx.Commit(); err != nil { 586 if err = tx.Commit(); err != nil {
692 return err 587 return err
693 } 588 }
694 // See above... 589
695 if existing_bn_id == nil { 590 *nids = append(*nids, bn.Bottleneck_id)
696 *nids = append(*nids, bn.Bottleneck_id)
697 }
698 return nil 591 return nil
699 } 592 }