comparison pkg/imports/bn.go @ 4128:14706384a464 request_hist_bns

Cleanup by Sascha L. Teichmann applied.
author Sascha Wilde <wilde@intevation.de>
date Thu, 01 Aug 2019 17:27:40 +0200
parents 0cf0008070db
children 980f12d3c766
comparison
equal deleted inserted replaced
4127:8c62809ea87e 4128:14706384a464
240 } 240 }
241 241
242 return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance) 242 return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
243 } 243 }
244 244
245 type bnStmts struct {
246 insert *sql.Stmt
247 findExactMatch *sql.Stmt
248 findIntersecting *sql.Stmt
249 insertMaterial *sql.Stmt
250 track *sql.Stmt
251 }
252
253 func (bs *bnStmts) close() {
254 for _, s := range []**sql.Stmt{
255 &bs.insert,
256 &bs.findExactMatch,
257 &bs.findIntersecting,
258 &bs.insertMaterial,
259 &bs.track,
260 } {
261 if *s != nil {
262 (*s).Close()
263 *s = nil
264 }
265 }
266 }
267
268 func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error {
269 for _, x := range []struct {
270 sql string
271 stmt **sql.Stmt
272 }{
273 {insertBottleneckSQL, &bs.insert},
274 {findExactMatchBottleneckSQL, &bs.findExactMatch},
275 {findIntersectingBottleneckSQL, &bs.findIntersecting},
276 {insertBottleneckMaterialSQL, &bs.insertMaterial},
277 {trackImportDeletionSQL, &bs.track},
278 } {
279 var err error
280 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
281 return err
282 }
283 }
284 return nil
285 }
286
245 func storeBottlenecks( 287 func storeBottlenecks(
246 ctx context.Context, 288 ctx context.Context,
247 fetch func() ([]*ifbn.BottleNeckType, error), 289 fetch func() ([]*ifbn.BottleNeckType, error),
248 importID int64, 290 importID int64,
249 conn *sql.Conn, 291 conn *sql.Conn,
257 return nil, err 299 return nil, err
258 } 300 }
259 301
260 feedback.Info("Found %d bottlenecks for import", len(bns)) 302 feedback.Info("Found %d bottlenecks for import", len(bns))
261 303
262 var insertStmt, findExactMatchingBNStmt, findIntersectingBNStmt, 304 var bs bnStmts
263 insertMaterialStmt, trackStmt *sql.Stmt 305 defer bs.close()
264 306
265 for _, x := range []struct { 307 if err := bs.prepare(ctx, conn); err != nil {
266 sql string 308 return nil, err
267 stmt **sql.Stmt
268 }{
269 {insertBottleneckSQL, &insertStmt},
270 {findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
271 {findIntersectingBottleneckSQL, &findIntersectingBNStmt},
272 {insertBottleneckMaterialSQL, &insertMaterialStmt},
273 {trackImportDeletionSQL, &trackStmt},
274 } {
275 var err error
276 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
277 return nil, err
278 }
279 defer (*x.stmt).Close()
280 } 309 }
281 310
282 var nids []string 311 var nids []string
283 seenOldBnIds := make(map[int64]bool) 312 seenOldBnIds := make(map[int64]bool)
284 313
291 defer tx.Rollback() 320 defer tx.Rollback()
292 321
293 for _, bn := range bns { 322 for _, bn := range bns {
294 if err := storeBottleneck( 323 if err := storeBottleneck(
295 tx, ctx, importID, feedback, bn, 324 tx, ctx, importID, feedback, bn,
296 &nids, &seenOldBnIds, tolerance, 325 &nids, seenOldBnIds, tolerance,
297 insertStmt, 326 &bs,
298 findExactMatchingBNStmt, 327 ); err != nil {
299 findIntersectingBNStmt,
300 insertMaterialStmt,
301 trackStmt); err != nil {
302 return nil, err 328 return nil, err
303 } 329 }
304 } 330 }
305 if err = tx.Commit(); err != nil { 331 if err = tx.Commit(); err != nil {
306 return nil, err 332 return nil, err
325 ctx context.Context, 351 ctx context.Context,
326 importID int64, 352 importID int64,
327 feedback Feedback, 353 feedback Feedback,
328 bn *ifbn.BottleNeckType, 354 bn *ifbn.BottleNeckType,
329 nids *[]string, 355 nids *[]string,
330 seenOldBnIds *map[int64]bool, 356 seenOldBnIds map[int64]bool,
331 tolerance float64, 357 tolerance float64,
332 insertStmt, 358 bs *bnStmts,
333 findExactMatchingBNStmt, findIntersectingBNStmt,
334 insertMaterialStmt,
335 trackStmt *sql.Stmt,
336 ) error { 359 ) error {
337 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) 360 feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
338 361
339 var tfrom, tto pgtype.Timestamptz 362 var tfrom, tto pgtype.Timestamptz
340 var uBound pgtype.BoundType 363 var uBound pgtype.BoundType
434 } 457 }
435 } 458 }
436 459
437 // Check if an bottleneck identical to the one we would insert already 460 // Check if an bottleneck identical to the one we would insert already
438 // exists: 461 // exists:
439 bns, err := tx.StmtContext(ctx, findExactMatchingBNStmt).QueryContext( 462 var old int64
463 err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext(
440 ctx, 464 ctx,
441 bn.Bottleneck_id, 465 bn.Bottleneck_id,
442 &validity, 466 &validity,
443 bn.Fk_g_fid, 467 bn.Fk_g_fid,
444 bn.OBJNAM, 468 bn.OBJNAM,
449 country, 473 country,
450 revisitingTime, 474 revisitingTime,
451 limiting, 475 limiting,
452 bn.Date_Info, 476 bn.Date_Info,
453 bn.Source, 477 bn.Source,
454 ) 478 ).Scan(&old)
455 if err != nil { 479 switch {
480 case err == sql.ErrNoRows:
481 // We dont have a matching old.
482 case err != nil:
456 return err 483 return err
457 } 484 default:
458 defer bns.Close() 485 // We could check if the materials are also matching -- but per
459 // We could check if the materials are also matching -- but per 486 // specification the Date_Info would hvae to change on that kind of
460 // specification the Date_Info would hvae to change on that kind of 487 // change anyway. So actualy we ar alreayd checking more in dpth than
461 // change anyway. So actualy we ar alreayd checking more in dpth than 488 // required.
462 // required.
463 if bns.Next() {
464 feedback.Info("unchanged") 489 feedback.Info("unchanged")
465 return nil 490 return nil
466 } 491 }
467 492
468 // Additional Information, only of interest when we change something, so 493 // Additional Information, only of interest when we change something, so
470 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) 495 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
471 496
472 // Check if the new bottleneck intersects with the validity of existing 497 // Check if the new bottleneck intersects with the validity of existing
473 // for the same bottleneck_id we consider this an update and mark the 498 // for the same bottleneck_id we consider this an update and mark the
474 // old data for deletion. 499 // old data for deletion.
475 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext( 500 bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext(
476 ctx, bn.Bottleneck_id, &validity, 501 ctx, bn.Bottleneck_id, &validity,
477 ) 502 )
478 if err != nil { 503 if err != nil {
479 return err 504 return err
480 } 505 }
488 err := bns.Scan(&oldID) 513 err := bns.Scan(&oldID)
489 if err != nil { 514 if err != nil {
490 return err 515 return err
491 } 516 }
492 oldBnIds = append(oldBnIds, oldID) 517 oldBnIds = append(oldBnIds, oldID)
518 }
519
520 if err := bns.Err(); err != nil {
521 return err
493 } 522 }
494 523
495 switch { 524 switch {
496 case len(oldBnIds) == 1: 525 case len(oldBnIds) == 1:
497 feedback.Info("Bottelneck '%s' "+ 526 feedback.Info("Bottelneck '%s' "+
508 537
509 for _, oldID := range oldBnIds { 538 for _, oldID := range oldBnIds {
510 // It is possible, that two new bottlenecks intersect with the 539 // It is possible, that two new bottlenecks intersect with the
511 // same old noe, therefor we have to handle duplicates in 540 // same old noe, therefor we have to handle duplicates in
512 // oldBnIds. 541 // oldBnIds.
513 if !(*seenOldBnIds)[oldID] { 542 if !seenOldBnIds[oldID] {
514 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 543 if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
515 ctx, importID, "waterway.bottlenecks", oldID, true, 544 ctx, importID, "waterway.bottlenecks", oldID, true,
516 ); err != nil { 545 ); err != nil {
517 return err 546 return err
518 } 547 }
519 (*seenOldBnIds)[oldID] = true 548 seenOldBnIds[oldID] = true
520 } 549 }
521 } 550 }
522 551
523 var bnIds []int64 552 var bnIds []int64
524 // Add new BN data: 553 // Add new BN data:
525 savepoint := Savepoint(ctx, tx, "insert_bottlenck") 554 savepoint := Savepoint(ctx, tx, "insert_bottlenck")
526 555
527 err = savepoint(func() error { 556 err = savepoint(func() error {
528 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, 557 bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx,
529 bn.Bottleneck_id, 558 bn.Bottleneck_id,
530 &validity, 559 &validity,
531 bn.Fk_g_fid, 560 bn.Fk_g_fid,
532 bn.OBJNAM, 561 bn.OBJNAM,
533 bn.NOBJNM, 562 bn.NOBJNM,
563 ) 592 )
564 pgBnIds.Set(bnIds) 593 pgBnIds.Set(bnIds)
565 pgMaterials.Set(materials) 594 pgMaterials.Set(materials)
566 595
567 // Insert riverbed materials 596 // Insert riverbed materials
568 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, 597 if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx,
569 598
570 &pgBnIds, 599 &pgBnIds,
571 &pgMaterials, 600 &pgMaterials,
572 ); err != nil { 601 ); err != nil {
573 return err 602 return err
580 return nil 609 return nil
581 } 610 }
582 611
583 // Only add new BN data to tracking for staging review. 612 // Only add new BN data to tracking for staging review.
584 for _, nid := range bnIds { 613 for _, nid := range bnIds {
585 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 614 if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
586 ctx, importID, "waterway.bottlenecks", nid, false, 615 ctx, importID, "waterway.bottlenecks", nid, false,
587 ); err != nil { 616 ); err != nil {
588 return err 617 return err
589 } 618 }
590 } 619 }