Mercurial > gemma
comparison pkg/controllers/importqueue.go @ 5123:eeb45e3e0a5a queued-stage-done
Added mechanism to have sync import jobs on import queue.
Review jobs are now sync with a controller waiting for 20 secs before returning.
If all reviews return earlier the controller extists earlier, too.
If one or more decisions took longer they are run in background till they are
decided and the the controller returns a error message for these imports
that the process is st still running.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 22:24:45 +0100 |
parents | c0ceec7e6f85 |
children | 52e3980e3462 |
comparison
equal
deleted
inserted
replaced
5122:0b6b62d247e8 | 5123:eeb45e3e0a5a |
---|---|
21 "fmt" | 21 "fmt" |
22 "log" | 22 "log" |
23 "net/http" | 23 "net/http" |
24 "strconv" | 24 "strconv" |
25 "strings" | 25 "strings" |
26 "sync" | |
26 "time" | 27 "time" |
27 | 28 |
28 "github.com/gorilla/mux" | 29 "github.com/gorilla/mux" |
29 | 30 |
30 "gemma.intevation.de/gemma/pkg/auth" | 31 "gemma.intevation.de/gemma/pkg/auth" |
596 Error string `json:"error,omitempty"` | 597 Error string `json:"error,omitempty"` |
597 } | 598 } |
598 | 599 |
599 results := make([]reviewResult, len(rs)) | 600 results := make([]reviewResult, len(rs)) |
600 | 601 |
602 for i := range results { | |
603 results[i].ID = rs[i].ID | |
604 results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID) | |
605 } | |
606 | |
607 var wg sync.WaitGroup | |
608 var mu sync.Mutex | |
609 | |
601 for i := range rs { | 610 for i := range rs { |
602 rev := &rs[i] | 611 wg.Add(1) |
603 msg, err := decideImport(req, rev.ID, string(rev.State)) | 612 go func(idx int) { |
604 var errString string | 613 defer wg.Done() |
605 if err != nil { | 614 rev := &rs[idx] |
606 errString = err.Error() | 615 msg, err := decideImport(req, rev.ID, string(rev.State)) |
607 } | 616 mu.Lock() |
608 results[i] = reviewResult{ | 617 if err != nil { |
609 ID: rev.ID, | 618 results[idx].Error = err.Error() |
610 Message: msg, | 619 } |
611 Error: errString, | 620 results[idx].Message = msg |
612 } | 621 mu.Unlock() |
613 } | 622 }(i) |
614 | 623 } |
615 return mw.JSONResult{Result: results}, nil | 624 |
625 done := make(chan struct{}) | |
626 go func() { | |
627 defer close(done) | |
628 wg.Wait() | |
629 }() | |
630 | |
631 select { | |
632 case <-time.After(20 * time.Second): | |
633 case <-done: | |
634 } | |
635 | |
636 out := make([]reviewResult, len(rs)) | |
637 mu.Lock() | |
638 copy(out, results) | |
639 mu.Unlock() | |
640 | |
641 return mw.JSONResult{Result: out}, nil | |
616 } | 642 } |
617 | 643 |
618 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { | 644 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { |
619 | 645 |
620 vars := mux.Vars(req) | 646 vars := mux.Vars(req) |
639 func decideImport( | 665 func decideImport( |
640 req *http.Request, | 666 req *http.Request, |
641 id int64, | 667 id int64, |
642 state string, | 668 state string, |
643 ) (message string, err error) { | 669 ) (message string, err error) { |
644 ctx := req.Context() | |
645 | |
646 accepted := state == "accepted" | |
647 | 670 |
648 session, _ := auth.GetSession(req) | 671 session, _ := auth.GetSession(req) |
649 reviewer := session.User | 672 reviewer := session.User |
650 | 673 |
674 ctx := req.Context() | |
675 accepted := state == "accepted" | |
676 | |
651 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { | 677 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { |
652 err = mw.JSONError{ | 678 return "", err |
653 Code: http.StatusBadRequest, | 679 } |
654 Message: err.Error(), | 680 |
655 } | 681 return fmt.Sprintf("Import #%d is %s.", id, state), nil |
656 return | 682 } |
657 } | |
658 | |
659 message = fmt.Sprintf( | |
660 "Requested import #%d to be %s.", id, state) | |
661 | |
662 return | |
663 } |