Mercurial > gemma
comparison pkg/imports/bn.go @ 4119:0cf0008070db request_hist_bns
bottleneck import: Use a global transaction with savepoints.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 31 Jul 2019 19:08:35 +0200 |
parents | f39d20427e89 |
children | 14706384a464 |
comparison
equal
deleted
inserted
replaced
4114:b9ddc6cdc871 | 4119:0cf0008070db |
---|---|
282 var nids []string | 282 var nids []string |
283 seenOldBnIds := make(map[int64]bool) | 283 seenOldBnIds := make(map[int64]bool) |
284 | 284 |
285 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) | 285 feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) |
286 | 286 |
287 tx, err := conn.BeginTx(ctx, nil) | |
288 if err != nil { | |
289 return nil, err | |
290 } | |
291 defer tx.Rollback() | |
292 | |
287 for _, bn := range bns { | 293 for _, bn := range bns { |
288 if err := storeBottleneck( | 294 if err := storeBottleneck( |
289 ctx, importID, conn, feedback, bn, | 295 tx, ctx, importID, feedback, bn, |
290 &nids, &seenOldBnIds, tolerance, | 296 &nids, &seenOldBnIds, tolerance, |
291 insertStmt, | 297 insertStmt, |
292 findExactMatchingBNStmt, | 298 findExactMatchingBNStmt, |
293 findIntersectingBNStmt, | 299 findIntersectingBNStmt, |
294 insertMaterialStmt, | 300 insertMaterialStmt, |
295 trackStmt); err != nil { | 301 trackStmt); err != nil { |
296 return nil, err | 302 return nil, err |
297 } | 303 } |
298 } | 304 } |
305 if err = tx.Commit(); err != nil { | |
306 return nil, err | |
307 } | |
308 | |
299 if len(nids) == 0 { | 309 if len(nids) == 0 { |
300 return nil, UnchangedError("No new bottlenecks inserted") | 310 return nil, UnchangedError("No new bottlenecks inserted") |
301 } | 311 } |
302 | 312 |
303 feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) | 313 feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start)) |
309 } | 319 } |
310 return &summary, nil | 320 return &summary, nil |
311 } | 321 } |
312 | 322 |
313 func storeBottleneck( | 323 func storeBottleneck( |
324 tx *sql.Tx, | |
314 ctx context.Context, | 325 ctx context.Context, |
315 importID int64, | 326 importID int64, |
316 conn *sql.Conn, | |
317 feedback Feedback, | 327 feedback Feedback, |
318 bn *ifbn.BottleNeckType, | 328 bn *ifbn.BottleNeckType, |
319 nids *[]string, | 329 nids *[]string, |
320 seenOldBnIds *map[int64]bool, | 330 seenOldBnIds *map[int64]bool, |
321 tolerance float64, | 331 tolerance float64, |
424 } | 434 } |
425 } | 435 } |
426 | 436 |
427 // Check if an bottleneck identical to the one we would insert already | 437 // Check if an bottleneck identical to the one we would insert already |
428 // exists: | 438 // exists: |
429 bns, err := findExactMatchingBNStmt.QueryContext(ctx, | 439 bns, err := tx.StmtContext(ctx, findExactMatchingBNStmt).QueryContext( |
440 ctx, | |
430 bn.Bottleneck_id, | 441 bn.Bottleneck_id, |
431 &validity, | 442 &validity, |
432 bn.Fk_g_fid, | 443 bn.Fk_g_fid, |
433 bn.OBJNAM, | 444 bn.OBJNAM, |
434 bn.NOBJNM, | 445 bn.NOBJNM, |
456 | 467 |
457 // Additional Information, only of interest when we change something, so | 468 // Additional Information, only of interest when we change something, so |
458 // it can be used for debugging if something goes wrong... | 469 // it can be used for debugging if something goes wrong... |
459 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) | 470 feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS) |
460 | 471 |
461 tx, err := conn.BeginTx(ctx, nil) | |
462 if err != nil { | |
463 return err | |
464 } | |
465 defer tx.Rollback() | |
466 | |
467 // Check if the new bottleneck intersects with the validity of existing | 472 // Check if the new bottleneck intersects with the validity of existing |
468 // for the same bottleneck_id we consider this an update and mark the | 473 // for the same bottleneck_id we consider this an update and mark the |
469 // old data for deletion. | 474 // old data for deletion. |
470 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext( | 475 bns, err = tx.StmtContext(ctx, findIntersectingBNStmt).QueryContext( |
471 ctx, bn.Bottleneck_id, &validity, | 476 ctx, bn.Bottleneck_id, &validity, |
515 } | 520 } |
516 } | 521 } |
517 | 522 |
518 var bnIds []int64 | 523 var bnIds []int64 |
519 // Add new BN data: | 524 // Add new BN data: |
520 bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx, | 525 savepoint := Savepoint(ctx, tx, "insert_bottlenck") |
521 bn.Bottleneck_id, | 526 |
522 &validity, | 527 err = savepoint(func() error { |
523 bn.Fk_g_fid, | 528 bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx, |
524 bn.OBJNAM, | 529 bn.Bottleneck_id, |
525 bn.NOBJNM, | 530 &validity, |
526 bn.From_ISRS, bn.To_ISRS, | 531 bn.Fk_g_fid, |
527 rb, | 532 bn.OBJNAM, |
528 lb, | 533 bn.NOBJNM, |
529 country, | 534 bn.From_ISRS, bn.To_ISRS, |
530 revisitingTime, | 535 rb, |
531 limiting, | 536 lb, |
532 bn.Date_Info, | 537 country, |
533 bn.Source, | 538 revisitingTime, |
534 tolerance) | 539 limiting, |
540 bn.Date_Info, | |
541 bn.Source, | |
542 tolerance) | |
543 if err != nil { | |
544 return err | |
545 } | |
546 defer bns.Close() | |
547 for bns.Next() { | |
548 var nid int64 | |
549 if err := bns.Scan(&nid); err != nil { | |
550 return err | |
551 } | |
552 bnIds = append(bnIds, nid) | |
553 } | |
554 if err := bns.Err(); err != nil { | |
555 return err | |
556 } | |
557 | |
558 // Add new materials | |
559 if len(bnIds) > 0 && materials != nil { | |
560 var ( | |
561 pgBnIds pgtype.Int8Array | |
562 pgMaterials pgtype.VarcharArray | |
563 ) | |
564 pgBnIds.Set(bnIds) | |
565 pgMaterials.Set(materials) | |
566 | |
567 // Insert riverbed materials | |
568 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, | |
569 | |
570 &pgBnIds, | |
571 &pgMaterials, | |
572 ); err != nil { | |
573 return err | |
574 } | |
575 } | |
576 return nil | |
577 }) | |
535 if err != nil { | 578 if err != nil { |
536 feedback.Warn(pgxutils.ReadableError{err}.Error()) | 579 feedback.Warn(pgxutils.ReadableError{err}.Error()) |
537 return nil | 580 return nil |
538 } | |
539 defer bns.Close() | |
540 for bns.Next() { | |
541 var nid int64 | |
542 if err := bns.Scan(&nid); err != nil { | |
543 return err | |
544 } | |
545 bnIds = append(bnIds, nid) | |
546 } | |
547 if err := bns.Err(); err != nil { | |
548 feedback.Warn(pgxutils.ReadableError{err}.Error()) | |
549 return nil | |
550 } | |
551 if len(bnIds) == 0 { | |
552 feedback.Warn( | |
553 "No gauge matching '%s' or given time available", bn.Fk_g_fid) | |
554 return nil | |
555 } | |
556 | |
557 // Add new materials | |
558 var ( | |
559 pgBnIds pgtype.Int8Array | |
560 pgMaterials pgtype.VarcharArray | |
561 ) | |
562 pgBnIds.Set(bnIds) | |
563 pgMaterials.Set(materials) | |
564 | |
565 if materials != nil { | |
566 // Insert riverbed materials | |
567 if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx, | |
568 &pgBnIds, | |
569 &pgMaterials, | |
570 ); err != nil { | |
571 feedback.Warn("Failed to insert riverbed materials") | |
572 feedback.Warn(pgxutils.ReadableError{err}.Error()) | |
573 return nil | |
574 } | |
575 } | 581 } |
576 | 582 |
577 // Only add new BN data to tracking for staging review. | 583 // Only add new BN data to tracking for staging review. |
578 for _, nid := range bnIds { | 584 for _, nid := range bnIds { |
579 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | 585 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( |
581 ); err != nil { | 587 ); err != nil { |
582 return err | 588 return err |
583 } | 589 } |
584 } | 590 } |
585 | 591 |
586 if err = tx.Commit(); err != nil { | |
587 return err | |
588 } | |
589 | |
590 *nids = append(*nids, bn.Bottleneck_id) | 592 *nids = append(*nids, bn.Bottleneck_id) |
591 return nil | 593 return nil |
592 } | 594 } |