comparison pkg/controllers/importqueue.go @ 5123:eeb45e3e0a5a queued-stage-done

Added mechanism to have sync import jobs on import queue. Review jobs are now sync with a controller waiting for 20 secs before returning. If all reviews return earlier the controller extists earlier, too. If one or more decisions took longer they are run in background till they are decided and the the controller returns a error message for these imports that the process is st still running.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 26 Mar 2020 22:24:45 +0100
parents c0ceec7e6f85
children 52e3980e3462
comparison
equal deleted inserted replaced
5122:0b6b62d247e8 5123:eeb45e3e0a5a
21 "fmt" 21 "fmt"
22 "log" 22 "log"
23 "net/http" 23 "net/http"
24 "strconv" 24 "strconv"
25 "strings" 25 "strings"
26 "sync"
26 "time" 27 "time"
27 28
28 "github.com/gorilla/mux" 29 "github.com/gorilla/mux"
29 30
30 "gemma.intevation.de/gemma/pkg/auth" 31 "gemma.intevation.de/gemma/pkg/auth"
596 Error string `json:"error,omitempty"` 597 Error string `json:"error,omitempty"`
597 } 598 }
598 599
599 results := make([]reviewResult, len(rs)) 600 results := make([]reviewResult, len(rs))
600 601
602 for i := range results {
603 results[i].ID = rs[i].ID
604 results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID)
605 }
606
607 var wg sync.WaitGroup
608 var mu sync.Mutex
609
601 for i := range rs { 610 for i := range rs {
602 rev := &rs[i] 611 wg.Add(1)
603 msg, err := decideImport(req, rev.ID, string(rev.State)) 612 go func(idx int) {
604 var errString string 613 defer wg.Done()
605 if err != nil { 614 rev := &rs[idx]
606 errString = err.Error() 615 msg, err := decideImport(req, rev.ID, string(rev.State))
607 } 616 mu.Lock()
608 results[i] = reviewResult{ 617 if err != nil {
609 ID: rev.ID, 618 results[idx].Error = err.Error()
610 Message: msg, 619 }
611 Error: errString, 620 results[idx].Message = msg
612 } 621 mu.Unlock()
613 } 622 }(i)
614 623 }
615 return mw.JSONResult{Result: results}, nil 624
625 done := make(chan struct{})
626 go func() {
627 defer close(done)
628 wg.Wait()
629 }()
630
631 select {
632 case <-time.After(20 * time.Second):
633 case <-done:
634 }
635
636 out := make([]reviewResult, len(rs))
637 mu.Lock()
638 copy(out, results)
639 mu.Unlock()
640
641 return mw.JSONResult{Result: out}, nil
616 } 642 }
617 643
618 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { 644 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) {
619 645
620 vars := mux.Vars(req) 646 vars := mux.Vars(req)
639 func decideImport( 665 func decideImport(
640 req *http.Request, 666 req *http.Request,
641 id int64, 667 id int64,
642 state string, 668 state string,
643 ) (message string, err error) { 669 ) (message string, err error) {
644 ctx := req.Context()
645
646 accepted := state == "accepted"
647 670
648 session, _ := auth.GetSession(req) 671 session, _ := auth.GetSession(req)
649 reviewer := session.User 672 reviewer := session.User
650 673
674 ctx := req.Context()
675 accepted := state == "accepted"
676
651 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { 677 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
652 err = mw.JSONError{ 678 return "", err
653 Code: http.StatusBadRequest, 679 }
654 Message: err.Error(), 680
655 } 681 return fmt.Sprintf("Import #%d is %s.", id, state), nil
656 return 682 }
657 }
658
659 message = fmt.Sprintf(
660 "Requested import #%d to be %s.", id, state)
661
662 return
663 }