diff pkg/controllers/importqueue.go @ 5123:eeb45e3e0a5a queued-stage-done

Added mechanism to have sync import jobs on import queue. Review jobs are now sync with a controller waiting for 20 secs before returning. If all reviews return earlier the controller extists earlier, too. If one or more decisions took longer they are run in background till they are decided and the the controller returns a error message for these imports that the process is st still running.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 26 Mar 2020 22:24:45 +0100
parents c0ceec7e6f85
children 52e3980e3462
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Thu Mar 26 14:41:23 2020 +0100
+++ b/pkg/controllers/importqueue.go	Thu Mar 26 22:24:45 2020 +0100
@@ -23,6 +23,7 @@
 	"net/http"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/gorilla/mux"
@@ -598,21 +599,46 @@
 
 	results := make([]reviewResult, len(rs))
 
-	for i := range rs {
-		rev := &rs[i]
-		msg, err := decideImport(req, rev.ID, string(rev.State))
-		var errString string
-		if err != nil {
-			errString = err.Error()
-		}
-		results[i] = reviewResult{
-			ID:      rev.ID,
-			Message: msg,
-			Error:   errString,
-		}
+	for i := range results {
+		results[i].ID = rs[i].ID
+		results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID)
 	}
 
-	return mw.JSONResult{Result: results}, nil
+	var wg sync.WaitGroup
+	var mu sync.Mutex
+
+	for i := range rs {
+		wg.Add(1)
+		go func(idx int) {
+			defer wg.Done()
+			rev := &rs[idx]
+			msg, err := decideImport(req, rev.ID, string(rev.State))
+			mu.Lock()
+			if err != nil {
+				results[idx].Error = err.Error()
+			}
+			results[idx].Message = msg
+			mu.Unlock()
+		}(i)
+	}
+
+	done := make(chan struct{})
+	go func() {
+		defer close(done)
+		wg.Wait()
+	}()
+
+	select {
+	case <-time.After(20 * time.Second):
+	case <-done:
+	}
+
+	out := make([]reviewResult, len(rs))
+	mu.Lock()
+	copy(out, results)
+	mu.Unlock()
+
+	return mw.JSONResult{Result: out}, nil
 }
 
 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) {
@@ -641,23 +667,16 @@
 	id int64,
 	state string,
 ) (message string, err error) {
-	ctx := req.Context()
-
-	accepted := state == "accepted"
 
 	session, _ := auth.GetSession(req)
 	reviewer := session.User
 
+	ctx := req.Context()
+	accepted := state == "accepted"
+
 	if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
-		err = mw.JSONError{
-			Code:    http.StatusBadRequest,
-			Message: err.Error(),
-		}
-		return
+		return "", err
 	}
 
-	message = fmt.Sprintf(
-		"Requested import #%d to be %s.", id, state)
-
-	return
+	return fmt.Sprintf("Import #%d is %s.", id, state), nil
 }