Mercurial > gemma
diff pkg/controllers/importqueue.go @ 5123:eeb45e3e0a5a queued-stage-done
Added mechanism to have sync import jobs on import queue.
Review jobs are now sync with a controller waiting for 20 secs before returning.
If all reviews return earlier the controller extists earlier, too.
If one or more decisions took longer they are run in background till they are
decided and the the controller returns a error message for these imports
that the process is st still running.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 22:24:45 +0100 |
parents | c0ceec7e6f85 |
children | 52e3980e3462 |
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go Thu Mar 26 14:41:23 2020 +0100 +++ b/pkg/controllers/importqueue.go Thu Mar 26 22:24:45 2020 +0100 @@ -23,6 +23,7 @@ "net/http" "strconv" "strings" + "sync" "time" "github.com/gorilla/mux" @@ -598,21 +599,46 @@ results := make([]reviewResult, len(rs)) - for i := range rs { - rev := &rs[i] - msg, err := decideImport(req, rev.ID, string(rev.State)) - var errString string - if err != nil { - errString = err.Error() - } - results[i] = reviewResult{ - ID: rev.ID, - Message: msg, - Error: errString, - } + for i := range results { + results[i].ID = rs[i].ID + results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID) } - return mw.JSONResult{Result: results}, nil + var wg sync.WaitGroup + var mu sync.Mutex + + for i := range rs { + wg.Add(1) + go func(idx int) { + defer wg.Done() + rev := &rs[idx] + msg, err := decideImport(req, rev.ID, string(rev.State)) + mu.Lock() + if err != nil { + results[idx].Error = err.Error() + } + results[idx].Message = msg + mu.Unlock() + }(i) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-time.After(20 * time.Second): + case <-done: + } + + out := make([]reviewResult, len(rs)) + mu.Lock() + copy(out, results) + mu.Unlock() + + return mw.JSONResult{Result: out}, nil } func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { @@ -641,23 +667,16 @@ id int64, state string, ) (message string, err error) { - ctx := req.Context() - - accepted := state == "accepted" session, _ := auth.GetSession(req) reviewer := session.User + ctx := req.Context() + accepted := state == "accepted" + if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { - err = mw.JSONError{ - Code: http.StatusBadRequest, - Message: err.Error(), - } - return + return "", err } - message = fmt.Sprintf( - "Requested import #%d to be %s.", id, state) - - return + return fmt.Sprintf("Import #%d is %s.", id, state), nil }