comparison pkg/imports/queue.go @ 5123:eeb45e3e0a5a queued-stage-done

Added mechanism to have sync import jobs on import queue. Review jobs are now sync with a controller waiting for 20 secs before returning. If all reviews return earlier the controller extists earlier, too. If one or more decisions took longer they are run in background till they are decided and the the controller returns a error message for these imports that the process is st still running.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 26 Mar 2020 22:24:45 +0100
parents 0b6b62d247e8
children 6910c1cad1fb
comparison
equal deleted inserted replaced
5122:0b6b62d247e8 5123:eeb45e3e0a5a
123 reviewJobRetries = 10 123 reviewJobRetries = 10
124 reviewJobWait = time.Minute 124 reviewJobWait = time.Minute
125 ) 125 )
126 126
127 type importQueue struct { 127 type importQueue struct {
128 signalChan chan struct{} 128 cmdCh chan func(*importQueue)
129
129 creatorsMu sync.Mutex 130 creatorsMu sync.Mutex
130 creators map[JobKind]JobCreator 131 creators map[JobKind]JobCreator
131 usedDeps map[string]int 132 usedDeps map[string]int
133
134 waiting map[int64]chan struct{}
132 } 135 }
133 136
134 var iqueue = importQueue{ 137 var iqueue = importQueue{
135 signalChan: make(chan struct{}), 138 cmdCh: make(chan func(*importQueue)),
136 creators: map[JobKind]JobCreator{}, 139
137 usedDeps: map[string]int{}, 140 creators: map[JobKind]JobCreator{},
141 usedDeps: map[string]int{},
142 waiting: make(map[int64]chan struct{}),
138 } 143 }
139 144
140 var ( 145 var (
141 // ImportStateNames is a list of the states a job can be in. 146 // ImportStateNames is a list of the states a job can be in.
142 ImportStateNames = []string{ 147 ImportStateNames = []string{
491 triesLeft *int, 496 triesLeft *int,
492 waitRetry *time.Duration, 497 waitRetry *time.Duration,
493 user string, 498 user string,
494 sendEmail bool, 499 sendEmail bool,
495 data string, 500 data string,
496 ) (int64, error) { 501 sync bool,
502 ) (int64, chan struct{}, error) {
497 503
498 var id int64 504 var id int64
499 if due.IsZero() { 505 if due.IsZero() {
500 due = time.Now() 506 due = time.Now()
501 } 507 }
507 } 513 }
508 514
509 var wr pgtype.Interval 515 var wr pgtype.Interval
510 if waitRetry != nil { 516 if waitRetry != nil {
511 if err := wr.Set(*waitRetry); err != nil { 517 if err := wr.Set(*waitRetry); err != nil {
512 return 0, err 518 return 0, nil, err
513 } 519 }
514 } else { 520 } else {
515 wr = pgtype.Interval{Status: pgtype.Null} 521 wr = pgtype.Interval{Status: pgtype.Null}
516 } 522 }
517 523
518 ctx := context.Background() 524 errCh := make(chan error)
519 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { 525 var done chan struct{}
520 return conn.QueryRowContext( 526
521 ctx, 527 q.cmdCh <- func(q *importQueue) {
522 insertJobSQL, 528 ctx := context.Background()
523 string(kind), 529 errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
524 due, 530 err := conn.QueryRowContext(
525 tl, 531 ctx,
526 &wr, 532 insertJobSQL,
527 user, 533 string(kind),
528 sendEmail, 534 due,
529 data).Scan(&id) 535 tl,
530 }) 536 &wr,
531 if err == nil { 537 user,
532 select { 538 sendEmail,
533 case q.signalChan <- struct{}{}: 539 data).Scan(&id)
534 default: 540
535 } 541 if err == nil && sync {
536 } 542 log.Printf("info: register wait for %d\n", id)
537 return id, err 543 done = make(chan struct{})
544 q.waiting[id] = done
545 }
546
547 return err
548 })
549 }
550
551 return id, done, <-errCh
538 } 552 }
539 553
540 // AddJob adds a job to the global import queue to be executed 554 // AddJob adds a job to the global import queue to be executed
541 // as soon as possible after due. 555 // as soon as possible after due.
542 // This is gone in a separate Go routine 556 // This is gone in a separate Go routine
548 waitRetry *time.Duration, 562 waitRetry *time.Duration,
549 user string, 563 user string,
550 sendEmail bool, 564 sendEmail bool,
551 data string, 565 data string,
552 ) (int64, error) { 566 ) (int64, error) {
553 return iqueue.addJob( 567 id, _, err := iqueue.addJob(
554 kind, 568 kind,
555 due, 569 due,
556 triesLeft, 570 triesLeft,
557 waitRetry, 571 waitRetry,
558 user, 572 user,
559 sendEmail, 573 sendEmail,
560 data) 574 data,
575 false)
576 return id, err
561 } 577 }
562 578
563 const ( 579 const (
564 isPendingSQL = ` 580 isPendingSQL = `
565 SELECT 581 SELECT
591 ctx context.Context, 607 ctx context.Context,
592 tx *sql.Tx, 608 tx *sql.Tx,
593 id int64, 609 id int64,
594 accepted bool, 610 accepted bool,
595 reviewer string, 611 reviewer string,
596 ) error { 612 ) (chan struct{}, error) {
597 var ( 613 var (
598 pending bool 614 pending bool
599 kind string 615 kind string
600 ) 616 )
601 617
602 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { 618 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
603 case err == sql.ErrNoRows: 619 case err == sql.ErrNoRows:
604 return fmt.Errorf("cannot find import #%d", id) 620 return nil, fmt.Errorf("cannot find import #%d", id)
605 case err != nil: 621 case err != nil:
606 return err 622 return nil, err
607 case !pending: 623 case !pending:
608 return fmt.Errorf("#%d is not pending", id) 624 return nil, fmt.Errorf("#%d is not pending", id)
609 } 625 }
610 626
611 jc := q.jobCreator(JobKind(kind)) 627 jc := q.jobCreator(JobKind(kind))
612 if jc == nil { 628 if jc == nil {
613 return fmt.Errorf("no job creator for kind '%s'", kind) 629 return nil, fmt.Errorf("no job creator for kind '%s'", kind)
614 } 630 }
615 631
616 r := &reviewedJob{ 632 r := &reviewedJob{
617 ID: id, 633 ID: id,
618 Accepted: accepted, 634 Accepted: accepted,
619 } 635 }
620 serialized, err := common.ToJSONString(r) 636 serialized, err := common.ToJSONString(r)
621 if err != nil { 637 if err != nil {
622 return err 638 return nil, err
623 } 639 }
624 640
625 // Try a little harder to persist the decision. 641 // Try a little harder to persist the decision.
626 tries := reviewJobRetries 642 tries := reviewJobRetries
627 wait := reviewJobWait 643 wait := reviewJobWait
628 644
629 rID, err := q.addJob( 645 rID, done, err := q.addJob(
630 JobKind(kind+ReviewJobSuffix), 646 JobKind(kind+ReviewJobSuffix),
631 time.Now(), 647 time.Now(),
632 &tries, 648 &tries,
633 &wait, 649 &wait,
634 reviewer, 650 reviewer,
635 false, 651 false,
636 serialized) 652 serialized,
653 true)
654 if err != nil {
655 return nil, err
656 }
637 log.Printf("info: add review job %d\n", rID) 657 log.Printf("info: add review job %d\n", rID)
638 if err != nil {
639 return err
640 }
641 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) 658 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
642 return err 659 if err != nil && done != nil {
660 go func() {
661 q.cmdCh <- func(q *importQueue) {
662 delete(q.waiting, rID)
663 }
664 }()
665 done = nil
666 }
667 return done, err
643 } 668 }
644 669
645 func (q *importQueue) decideImport( 670 func (q *importQueue) decideImport(
646 ctx context.Context, 671 ctx context.Context,
647 id int64, 672 id int64,
650 ) error { 675 ) error {
651 if ctx == nil { 676 if ctx == nil {
652 ctx = context.Background() 677 ctx = context.Background()
653 } 678 }
654 679
655 return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { 680 var done chan struct{}
681
682 if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
656 tx, err := conn.BeginTx(ctx, nil) 683 tx, err := conn.BeginTx(ctx, nil)
657 if err != nil { 684 if err != nil {
658 return err 685 return err
659 } 686 }
660 defer tx.Rollback() 687 defer tx.Rollback()
661 err = q.decideImportTx(ctx, tx, id, accepted, reviewer) 688 done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
662 if err == nil { 689 if err == nil {
663 err = tx.Commit() 690 err = tx.Commit()
664 } 691 }
665 return err 692 return err
666 }) 693 }); err != nil {
694 return err
695 }
696
697 <-done
698 return nil
667 } 699 }
668 700
669 func DecideImport( 701 func DecideImport(
670 ctx context.Context, 702 ctx context.Context,
671 id int64, 703 id int64,
877 } 909 }
878 if idj != nil { 910 if idj != nil {
879 break 911 break
880 } 912 }
881 select { 913 select {
882 case <-q.signalChan: 914 case cmd := <-q.cmdCh:
915 cmd(q)
916
883 case <-time.After(pollDuration): 917 case <-time.After(pollDuration):
884 } 918 }
885 } 919 }
886 920
887 log.Printf("info: starting import #%d\n", idj.id) 921 log.Printf("info: starting import #%d\n", idj.id)
895 // Lock dependencies. 929 // Lock dependencies.
896 q.lockDependencies(jc) 930 q.lockDependencies(jc)
897 931
898 go func(jc JobCreator, idj *idJob) { 932 go func(jc JobCreator, idj *idJob) {
899 933
900 // Unlock the dependencies.
901 defer func() { 934 defer func() {
935 // Unlock the dependencies.
902 q.unlockDependencies(jc) 936 q.unlockDependencies(jc)
903 select { 937 // Unlock waiting.
904 case q.signalChan <- struct{}{}: 938 q.cmdCh <- func(q *importQueue) {
905 default: 939 log.Printf("unlock waiting %d\n", idj.id)
940 if w := q.waiting[idj.id]; w != nil {
941 close(w)
942 delete(q.waiting, idj.id)
943 }
906 } 944 }
907 }() 945 }()
908 946
909 job := jc.Create() 947 job := jc.Create()
910 if err := common.FromJSONString(idj.data, job); err != nil { 948 if err := common.FromJSONString(idj.data, job); err != nil {
974 if idj.sendEmail { 1012 if idj.sendEmail {
975 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 1013 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
976 } 1014 }
977 1015
978 if retry { 1016 if retry {
979 nid, err := q.addJob( 1017 nid, _, err := q.addJob(
980 idj.kind, 1018 idj.kind,
981 idj.nextDue(), 1019 idj.nextDue(),
982 idj.triesLeftPointer(), 1020 idj.triesLeftPointer(),
983 idj.waitRetryPointer(), 1021 idj.waitRetryPointer(),
984 idj.user, idj.sendEmail, 1022 idj.user, idj.sendEmail,
985 idj.data) 1023 idj.data,
1024 false)
986 if err != nil { 1025 if err != nil {
987 log.Printf("error: retry enqueue failed: %v\n", err) 1026 log.Printf("error: retry enqueue failed: %v\n", err)
988 } else { 1027 } else {
989 log.Printf("info: re-enqueued job with id %d\n", nid) 1028 log.Printf("info: re-enqueued job with id %d\n", nid)
990 } 1029 }