comparison pkg/imports/bn.go @ 4119:0cf0008070db request_hist_bns

bottleneck import: Use a global transaction with savepoints.
author Sascha Wilde <wilde@intevation.de>
date Wed, 31 Jul 2019 19:08:35 +0200
parents f39d20427e89
children 14706384a464
comparison
equal deleted inserted replaced
4114:b9ddc6cdc871 4119:0cf0008070db
282 var nids []string 282 var nids []string
283 seenOldBnIds := make(map[int64]bool) 283 seenOldBnIds := make(map[int64]bool)
284 284
285 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) 285 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
286 286
287 tx, err := conn.BeginTx(ctx, nil)
288 if err != nil {
289 return nil, err
290 }
291 defer tx.Rollback()
292
287 for _, bn := range bns { 293 for _, bn := range bns {
288 if err := storeBottleneck( 294 if err := storeBottleneck(
289 ctx, importID, conn, feedback, bn, 295 tx, ctx, importID, feedback, bn,
290 &nids, &seenOldBnIds, tolerance, 296 &nids, &seenOldBnIds, tolerance,
291 insertStmt, 297 insertStmt,
292 findExactMatchingBNStmt, 298 findExactMatchingBNStmt,
293 findIntersectingBNStmt, 299 findIntersectingBNStmt,
294 insertMaterialStmt, 300 insertMaterialStmt,
295 trackStmt); err != nil { 301 trackStmt); err != nil {
296 return nil, err 302 return nil, err
297 } 303 }
298 } 304 }
305 if err = tx.Commit(); err != nil {
306 return nil, err
307 }
308
299 if len(nids) == 0 { 309 if len(nids) == 0 {
300 return nil, UnchangedError("No new bottlenecks inserted") 310 return nil, UnchangedError("No new bottlenecks inserted")
301 } 311 }
302 312
303 feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) 313 feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
309 } 319 }
310 return &summary, nil 320 return &summary, nil
311 } 321 }
312 322
313 func storeBottleneck( 323 func storeBottleneck(
324 tx *sql.Tx,
314 ctx context.Context, 325 ctx context.Context,
315 importID int64, 326 importID int64,
316 conn *sql.Conn,
317 feedback Feedback, 327 feedback Feedback,
318 bn *ifbn.BottleNeckType, 328 bn *ifbn.BottleNeckType,
319 nids *[]string, 329 nids *[]string,
320 seenOldBnIds *map[int64]bool, 330 seenOldBnIds *map[int64]bool,
321 tolerance float64, 331 tolerance float64,
424 } 434 }
425 } 435 }
426 436
427 // Check if an bottleneck identical to the one we would insert already 437 // Check if an bottleneck identical to the one we would insert already
428 // exists: 438 // exists:
429 bns, err := findExactMatchingBNStmt.QueryContext(ctx, 439 bns, err := tx.StmtContext(ctx, findExactMatchingBNStmt).QueryContext(
440 ctx,
430 bn.Bottleneck_id, 441 bn.Bottleneck_id,
431 &validity, 442 &validity,
432 bn.Fk_g_fid, 443 bn.Fk_g_fid,
433 bn.OBJNAM, 444 bn.OBJNAM,
434 bn.NOBJNM, 445 bn.NOBJNM,
456 467
457 // Additional Information, only of interest when we change something, so 468 // Additional Information, only of interest when we change something, so
458 // it can be used for debugging if something goes wrong... 469 // it can be used for debugging if something goes wrong...
459 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) 470 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
460 471
461 tx, err := conn.BeginTx(ctx, nil)
462 if err != nil {
463 return err
464 }
465 defer tx.Rollback()
466
467 // Check if the new bottleneck intersects with the validity of existing 472 // Check if the new bottleneck intersects with the validity of existing
468 // for the same bottleneck_id we consider this an update and mark the 473 // for the same bottleneck_id we consider this an update and mark the
469 // old data for deletion. 474 // old data for deletion.
470 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext( 475 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext(
471 ctx, bn.Bottleneck_id, &validity, 476 ctx, bn.Bottleneck_id, &validity,
515 } 520 }
516 } 521 }
517 522
518 var bnIds []int64 523 var bnIds []int64
519 // Add new BN data: 524 // Add new BN data:
520 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, 525 savepoint := Savepoint(ctx, tx, "insert_bottlenck")
521 bn.Bottleneck_id, 526
522 &validity, 527 err = savepoint(func() error {
523 bn.Fk_g_fid, 528 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
524 bn.OBJNAM, 529 bn.Bottleneck_id,
525 bn.NOBJNM, 530 &validity,
526 bn.From_ISRS, bn.To_ISRS, 531 bn.Fk_g_fid,
527 rb, 532 bn.OBJNAM,
528 lb, 533 bn.NOBJNM,
529 country, 534 bn.From_ISRS, bn.To_ISRS,
530 revisitingTime, 535 rb,
531 limiting, 536 lb,
532 bn.Date_Info, 537 country,
533 bn.Source, 538 revisitingTime,
534 tolerance) 539 limiting,
540 bn.Date_Info,
541 bn.Source,
542 tolerance)
543 if err != nil {
544 return err
545 }
546 defer bns.Close()
547 for bns.Next() {
548 var nid int64
549 if err := bns.Scan(&nid); err != nil {
550 return err
551 }
552 bnIds = append(bnIds, nid)
553 }
554 if err := bns.Err(); err != nil {
555 return err
556 }
557
558 // Add new materials
559 if len(bnIds) > 0 && materials != nil {
560 var (
561 pgBnIds pgtype.Int8Array
562 pgMaterials pgtype.VarcharArray
563 )
564 pgBnIds.Set(bnIds)
565 pgMaterials.Set(materials)
566
567 // Insert riverbed materials
568 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
569
570 &pgBnIds,
571 &pgMaterials,
572 ); err != nil {
573 return err
574 }
575 }
576 return nil
577 })
535 if err != nil { 578 if err != nil {
536 feedback.Warn(pgxutils.ReadableError{err}.Error()) 579 feedback.Warn(pgxutils.ReadableError{err}.Error())
537 return nil 580 return nil
538 }
539 defer bns.Close()
540 for bns.Next() {
541 var nid int64
542 if err := bns.Scan(&nid); err != nil {
543 return err
544 }
545 bnIds = append(bnIds, nid)
546 }
547 if err := bns.Err(); err != nil {
548 feedback.Warn(pgxutils.ReadableError{err}.Error())
549 return nil
550 }
551 if len(bnIds) == 0 {
552 feedback.Warn(
553 "No gauge matching '%s' or given time available", bn.Fk_g_fid)
554 return nil
555 }
556
557 // Add new materials
558 var (
559 pgBnIds pgtype.Int8Array
560 pgMaterials pgtype.VarcharArray
561 )
562 pgBnIds.Set(bnIds)
563 pgMaterials.Set(materials)
564
565 if materials != nil {
566 // Insert riverbed materials
567 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
568 &pgBnIds,
569 &pgMaterials,
570 ); err != nil {
571 feedback.Warn("Failed to insert riverbed materials")
572 feedback.Warn(pgxutils.ReadableError{err}.Error())
573 return nil
574 }
575 } 581 }
576 582
577 // Only add new BN data to tracking for staging review. 583 // Only add new BN data to tracking for staging review.
578 for _, nid := range bnIds { 584 for _, nid := range bnIds {
579 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 585 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
581 ); err != nil { 587 ); err != nil {
582 return err 588 return err
583 } 589 }
584 } 590 }
585 591
586 if err = tx.Commit(); err != nil {
587 return err
588 }
589
590 *nids = append(*nids, bn.Bottleneck_id) 592 *nids = append(*nids, bn.Bottleneck_id)
591 return nil 593 return nil
592 } 594 }