comparison pkg/imports/bn.go @ 3666:db87f34805fb

Align bottleneck validity at gauges Ensuring the validity of a bottleneck version is always contained by the validity of the referenced gauge version allows to reliably determine matching reference values of the gauge at a point in time. Since this implies that a bottleneck version might be cut into more than one time ranges, the concept of having only one non-erased version is no longer applicable and replaced by using the 'current' version of a bottleneck. Fairway availability data are always kept with the 'current' bottleneck version to have them at hand alltogether for analyses over longer time ranges.
author Tom Gottfried <tom@intevation.de>
date Sat, 15 Jun 2019 14:36:50 +0200
parents 29ef6d41e4af
children 0227670dedd5
comparison
equal deleted inserted replaced
3665:29ef6d41e4af 3666:db87f34805fb
41 41
42 // BNJobKind is the import queue type identifier. 42 // BNJobKind is the import queue type identifier.
43 const BNJobKind JobKind = "bn" 43 const BNJobKind JobKind = "bn"
44 44
45 const ( 45 const (
46 hasBottleneckSQL = `
47 WITH upd AS (
48 UPDATE waterway.bottlenecks SET
49 erased = true
50 WHERE bottleneck_id = $1
51 AND NOT erased
52 -- Don't touch old entry if new validity contains old: will be updated
53 AND NOT validity <@ $2
54 RETURNING 1
55 )
56 -- Decide whether a new version will be INSERTed
57 SELECT EXISTS(SELECT 1 FROM upd)
58 OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1)
59 `
60
61 insertBottleneckSQL = ` 46 insertBottleneckSQL = `
62 WITH 47 WITH
63 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))), 48 bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
64 r AS (SELECT isrsrange( 49 r AS (SELECT isrsrange(
65 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY), 50 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
78 responsible_country, 63 responsible_country,
79 revisiting_time, 64 revisiting_time,
80 limiting, 65 limiting,
81 date_info, 66 date_info,
82 source_organization 67 source_organization
83 ) VALUES ( 68 ) SELECT
84 $1, 69 $1,
85 $2, 70 validity * $2, -- intersections with gauge validity ranges
86 isrs_fromText($3), 71 location,
87 COALESCE( 72 validity,
88 (SELECT validity FROM waterway.gauges
89 WHERE location = isrs_fromText($3)
90 AND validity @> lower(CAST($2 AS tstzrange))),
91 tstzrange(NULL, NULL)),
92 $4, 73 $4,
93 $5, 74 $5,
94 (SELECT r FROM r), 75 (SELECT r FROM r),
95 ISRSrange_area( 76 ISRSrange_area(
96 ISRSrange_axis((SELECT r FROM r), 77 ISRSrange_axis((SELECT r FROM r),
102 $10, 83 $10,
103 $11, 84 $11,
104 $12, 85 $12,
105 $13, 86 $13,
106 $14 87 $14
107 ) 88 FROM waterway.gauges
108 RETURNING id` 89 WHERE location = isrs_fromText($3) AND validity && $2
90 ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
91 gauge_location = EXCLUDED.gauge_location,
92 gauge_validity = EXCLUDED.gauge_validity,
93 objnam = EXCLUDED.objnam,
94 nobjnm = EXCLUDED.nobjnm,
95 stretch = EXCLUDED.stretch,
96 area = EXCLUDED.area,
97 rb = EXCLUDED.rb,
98 lb = EXCLUDED.lb,
99 responsible_country = EXCLUDED.responsible_country,
100 revisiting_time = EXCLUDED.revisiting_time,
101 limiting = EXCLUDED.limiting,
102 date_info = EXCLUDED.date_info,
103 source_organization = EXCLUDED.source_organization
104 RETURNING id
105 `
106
107 // Alignment with gauge validity might have generated new entries
108 // for the same time range. Thus, remove the old ones
109 deleteObsoleteBNSQL = `
110 DELETE FROM waterway.bottlenecks
111 WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
112 `
109 113
110 fixBNValiditySQL = ` 114 fixBNValiditySQL = `
111 UPDATE waterway.bottlenecks SET 115 UPDATE waterway.bottlenecks SET
112 -- Set enddate of old entry to new startdate in case of overlap: 116 -- Set enddate of old entry to new startdate in case of overlap:
113 validity = validity - $2 117 validity = validity - $2
114 WHERE bottleneck_id = $1 118 WHERE bottleneck_id = $1
115 AND validity && $2 119 AND validity && $2 AND NOT validity <@ $2
116 AND erased
117 ` 120 `
118 121
119 updateBottleneckSQL = `
120 WITH
121 bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))),
122 r AS (SELECT isrsrange(
123 (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
124 (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
125 UPDATE waterway.bottlenecks b SET
126 gauge_location = isrs_fromtext($2),
127 gauge_validity = COALESCE(
128 (SELECT validity FROM waterway.gauges g
129 WHERE g.location = isrs_fromText($2)
130 AND g.validity @> lower(b.validity)),
131 tstzrange(NULL, NULL)),
132 objnam = $3,
133 nobjnm = $4,
134 stretch = (SELECT r FROM r),
135 area = ISRSrange_area(
136 ISRSrange_axis((SELECT r FROM r), $14),
137 (SELECT ST_Collect(CAST(area AS geometry))
138 FROM waterway.waterway_area)),
139 rb = $7,
140 lb = $8,
141 responsible_country = $9,
142 revisiting_time = $10,
143 limiting = $11,
144 date_info = $12,
145 source_organization = $13,
146 validity = $15
147 WHERE bottleneck_id = $1
148 AND NOT erased
149 AND $12 > date_info
150 RETURNING id
151 `
152
153 deleteBottleneckMaterialSQL = ` 122 deleteBottleneckMaterialSQL = `
154 DELETE FROM waterway.bottlenecks_riverbed_materials 123 WITH del AS (
155 WHERE bottleneck_id = $1 124 DELETE FROM waterway.bottlenecks_riverbed_materials
156 AND riverbed <> ALL($2) 125 WHERE bottleneck_id = ANY($1)
157 RETURNING riverbed 126 AND riverbed <> ALL($2)
127 RETURNING riverbed)
128 SELECT DISTINCT riverbed FROM del
158 ` 129 `
159 130
160 insertBottleneckMaterialSQL = ` 131 insertBottleneckMaterialSQL = `
161 INSERT INTO waterway.bottlenecks_riverbed_materials ( 132 INSERT INTO waterway.bottlenecks_riverbed_materials (
162 bottleneck_id, 133 bottleneck_id,
163 riverbed 134 riverbed
164 ) VALUES ( 135 ) SELECT *
165 $1, 136 FROM unnest(CAST($1 AS int[])) AS bns,
166 $2 137 unnest(CAST($2 AS varchar[])) AS materials
167 ) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING 138 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
168 ` 139 `
169 ) 140 )
170 141
171 type bnJobCreator struct{} 142 type bnJobCreator struct{}
172 143
263 return nil, err 234 return nil, err
264 } 235 }
265 236
266 feedback.Info("Found %d bottlenecks for import", len(bns)) 237 feedback.Info("Found %d bottlenecks for import", len(bns))
267 238
268 var hasStmt, insertStmt, fixValidityStmt, updateStmt, 239 var insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
269 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt 240 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt
270 241
271 for _, x := range []struct { 242 for _, x := range []struct {
272 sql string 243 sql string
273 stmt **sql.Stmt 244 stmt **sql.Stmt
274 }{ 245 }{
275 {hasBottleneckSQL, &hasStmt},
276 {insertBottleneckSQL, &insertStmt}, 246 {insertBottleneckSQL, &insertStmt},
247 {deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
277 {fixBNValiditySQL, &fixValidityStmt}, 248 {fixBNValiditySQL, &fixValidityStmt},
278 {updateBottleneckSQL, &updateStmt},
279 {deleteBottleneckMaterialSQL, &deleteMaterialStmt}, 249 {deleteBottleneckMaterialSQL, &deleteMaterialStmt},
280 {insertBottleneckMaterialSQL, &insertMaterialStmt}, 250 {insertBottleneckMaterialSQL, &insertMaterialStmt},
281 {trackImportSQL, &trackStmt}, 251 {trackImportSQL, &trackStmt},
282 } { 252 } {
283 var err error 253 var err error
292 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) 262 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
293 263
294 for _, bn := range bns { 264 for _, bn := range bns {
295 if err := storeBottleneck( 265 if err := storeBottleneck(
296 ctx, importID, conn, feedback, bn, &nids, tolerance, 266 ctx, importID, conn, feedback, bn, &nids, tolerance,
297 hasStmt, insertStmt, fixValidityStmt, updateStmt, 267 insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
298 deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil { 268 deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil {
299 return nil, err 269 return nil, err
300 } 270 }
301 } 271 }
302 if len(nids) == 0 { 272 if len(nids) == 0 {
319 conn *sql.Conn, 289 conn *sql.Conn,
320 feedback Feedback, 290 feedback Feedback,
321 bn *ifbn.BottleNeckType, 291 bn *ifbn.BottleNeckType,
322 nids *[]string, 292 nids *[]string,
323 tolerance float64, 293 tolerance float64,
324 hasStmt, insertStmt, fixValidityStmt, updateStmt, 294 insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
325 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt, 295 deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt,
326 ) error { 296 ) error {
327 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) 297 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
328 298
329 if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { 299 if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
406 if err != nil { 376 if err != nil {
407 return err 377 return err
408 } 378 }
409 defer tx.Rollback() 379 defer tx.Rollback()
410 380
411 var isNew bool 381 var bnIds []int64
412 var nid int64 382 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
413 err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx,
414 bn.Bottleneck_id, 383 bn.Bottleneck_id,
415 validity, 384 &validity,
416 ).Scan(&isNew) 385 bn.Fk_g_fid,
417 switch { 386 bn.OBJNAM,
418 case err != nil: 387 bn.NOBJNM,
388 bn.From_ISRS, bn.To_ISRS,
389 rb,
390 lb,
391 country,
392 revisitingTime,
393 limiting,
394 bn.Date_Info,
395 bn.Source,
396 tolerance,
397 )
398 if err != nil {
399 feedback.Warn(handleError(err).Error())
400 return nil
401 }
402 defer bns.Close()
403 for bns.Next() {
404 var nid int64
405 if err := bns.Scan(&nid); err != nil {
406 return err
407 }
408 bnIds = append(bnIds, nid)
409 }
410 if err := bns.Err(); err != nil {
411 return err
412 }
413 if len(bnIds) == 0 {
414 feedback.Warn(
415 "No gauge matching '%s' or given time available", bn.Fk_g_fid)
416 return nil
417 }
418
419 // Remove obsolete bottleneck version entries
420 var pgBnIds pgtype.Int8Array
421 pgBnIds.Set(bnIds)
422 if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
423 bn.Bottleneck_id,
424 &validity,
425 &pgBnIds,
426 ); err != nil {
419 feedback.Warn(handleError(err).Error()) 427 feedback.Warn(handleError(err).Error())
420 if err2 := tx.Rollback(); err2 != nil { 428 if err2 := tx.Rollback(); err2 != nil {
421 return err2 429 return err2
422 } 430 }
423 return nil 431 return nil
424 case isNew:
425 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(
426 ctx,
427 bn.Bottleneck_id,
428 &validity,
429 bn.Fk_g_fid,
430 bn.OBJNAM,
431 bn.NOBJNM,
432 bn.From_ISRS, bn.To_ISRS,
433 rb,
434 lb,
435 country,
436 revisitingTime,
437 limiting,
438 bn.Date_Info,
439 bn.Source,
440 tolerance,
441 ).Scan(&nid)
442 if err != nil {
443 feedback.Warn(handleError(err).Error())
444 return nil
445 }
446 feedback.Info("insert new version")
447 case !isNew:
448 // try to update
449 err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
450 bn.Bottleneck_id,
451 bn.Fk_g_fid,
452 bn.OBJNAM,
453 bn.NOBJNM,
454 bn.From_ISRS, bn.To_ISRS,
455 rb,
456 lb,
457 country,
458 revisitingTime,
459 limiting,
460 bn.Date_Info,
461 bn.Source,
462 tolerance,
463 &validity,
464 ).Scan(&nid)
465 switch {
466 case err == sql.ErrNoRows:
467 feedback.Info("unchanged")
468 if err := tx.Rollback(); err != nil {
469 return err
470 }
471 return nil
472 case err != nil:
473 feedback.Warn(handleError(err).Error())
474 if err := tx.Rollback(); err != nil {
475 return err
476 }
477 return nil
478 default:
479 feedback.Info("update")
480
481 // Remove obsolete riverbed materials
482 var pgMaterials pgtype.VarcharArray
483 pgMaterials.Set(materials)
484 mtls, err := tx.StmtContext(ctx,
485 deleteMaterialStmt).QueryContext(ctx,
486 nid,
487 &pgMaterials,
488 )
489 if err != nil {
490 return err
491 }
492 defer mtls.Close()
493 for mtls.Next() {
494 var delMat string
495 if err := mtls.Scan(&delMat); err != nil {
496 return err
497 }
498 feedback.Warn("Removed riverbed material %s", delMat)
499 }
500 if err := mtls.Err(); err != nil {
501 return err
502 }
503 }
504 } 432 }
505 433
506 // Set end of validity of old version to start of new version 434 // Set end of validity of old version to start of new version
507 // in case of overlap 435 // in case of overlap
508 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx, 436 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
514 return err2 442 return err2
515 } 443 }
516 return nil 444 return nil
517 } 445 }
518 446
519 // Insert riverbed materials
520 if materials != nil { 447 if materials != nil {
521 for _, mat := range materials { 448 // Remove obsolete riverbed materials
522 if _, err := tx.StmtContext(ctx, 449 var pgMaterials pgtype.VarcharArray
523 insertMaterialStmt).ExecContext( 450 pgMaterials.Set(materials)
524 ctx, nid, mat); err != nil { 451 mtls, err := tx.StmtContext(ctx,
525 feedback.Warn("Failed to insert riverbed material '%s'", mat) 452 deleteMaterialStmt).QueryContext(ctx,
526 feedback.Warn(handleError(err).Error()) 453 &pgBnIds,
454 &pgMaterials,
455 )
456 if err != nil {
457 return err
458 }
459 defer mtls.Close()
460 for mtls.Next() {
461 var delMat string
462 if err := mtls.Scan(&delMat); err != nil {
463 return err
527 } 464 }
528 } 465 feedback.Warn("Removed riverbed material %s", delMat)
529 } 466 }
530 467 if err := mtls.Err(); err != nil {
531 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 468 return err
532 ctx, importID, "waterway.bottlenecks", nid, 469 }
533 ); err != nil { 470
534 return err 471 // Insert riverbed materials
472 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
473 &pgBnIds,
474 &pgMaterials,
475 ); err != nil {
476 feedback.Warn("Failed to insert riverbed materials")
477 feedback.Warn(handleError(err).Error())
478 return nil
479 }
480 }
481
482 for _, nid := range bnIds {
483 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
484 ctx, importID, "waterway.bottlenecks", nid,
485 ); err != nil {
486 return err
487 }
535 } 488 }
536 if err = tx.Commit(); err != nil { 489 if err = tx.Commit(); err != nil {
537 return err 490 return err
538 } 491 }
539 *nids = append(*nids, bn.Bottleneck_id) 492 *nids = append(*nids, bn.Bottleneck_id)