changeset 5123:eeb45e3e0a5a queued-stage-done

Added mechanism to have sync import jobs on import queue. Review jobs are now sync with a controller waiting for 20 secs before returning. If all reviews return earlier the controller extists earlier, too. If one or more decisions took longer they are run in background till they are decided and the the controller returns a error message for these imports that the process is st still running.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 26 Mar 2020 22:24:45 +0100
parents 0b6b62d247e8
children 6910c1cad1fb
files pkg/controllers/importqueue.go pkg/imports/queue.go
diffstat 2 files changed, 131 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Thu Mar 26 14:41:23 2020 +0100
+++ b/pkg/controllers/importqueue.go	Thu Mar 26 22:24:45 2020 +0100
@@ -23,6 +23,7 @@
 	"net/http"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/gorilla/mux"
@@ -598,21 +599,46 @@
 
 	results := make([]reviewResult, len(rs))
 
-	for i := range rs {
-		rev := &rs[i]
-		msg, err := decideImport(req, rev.ID, string(rev.State))
-		var errString string
-		if err != nil {
-			errString = err.Error()
-		}
-		results[i] = reviewResult{
-			ID:      rev.ID,
-			Message: msg,
-			Error:   errString,
-		}
+	for i := range results {
+		results[i].ID = rs[i].ID
+		results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID)
 	}
 
-	return mw.JSONResult{Result: results}, nil
+	var wg sync.WaitGroup
+	var mu sync.Mutex
+
+	for i := range rs {
+		wg.Add(1)
+		go func(idx int) {
+			defer wg.Done()
+			rev := &rs[idx]
+			msg, err := decideImport(req, rev.ID, string(rev.State))
+			mu.Lock()
+			if err != nil {
+				results[idx].Error = err.Error()
+			}
+			results[idx].Message = msg
+			mu.Unlock()
+		}(i)
+	}
+
+	done := make(chan struct{})
+	go func() {
+		defer close(done)
+		wg.Wait()
+	}()
+
+	select {
+	case <-time.After(20 * time.Second):
+	case <-done:
+	}
+
+	out := make([]reviewResult, len(rs))
+	mu.Lock()
+	copy(out, results)
+	mu.Unlock()
+
+	return mw.JSONResult{Result: out}, nil
 }
 
 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) {
@@ -641,23 +667,16 @@
 	id int64,
 	state string,
 ) (message string, err error) {
-	ctx := req.Context()
-
-	accepted := state == "accepted"
 
 	session, _ := auth.GetSession(req)
 	reviewer := session.User
 
+	ctx := req.Context()
+	accepted := state == "accepted"
+
 	if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
-		err = mw.JSONError{
-			Code:    http.StatusBadRequest,
-			Message: err.Error(),
-		}
-		return
+		return "", err
 	}
 
-	message = fmt.Sprintf(
-		"Requested import #%d to be %s.", id, state)
-
-	return
+	return fmt.Sprintf("Import #%d is %s.", id, state), nil
 }
--- a/pkg/imports/queue.go	Thu Mar 26 14:41:23 2020 +0100
+++ b/pkg/imports/queue.go	Thu Mar 26 22:24:45 2020 +0100
@@ -125,16 +125,21 @@
 )
 
 type importQueue struct {
-	signalChan chan struct{}
+	cmdCh chan func(*importQueue)
+
 	creatorsMu sync.Mutex
 	creators   map[JobKind]JobCreator
 	usedDeps   map[string]int
+
+	waiting map[int64]chan struct{}
 }
 
 var iqueue = importQueue{
-	signalChan: make(chan struct{}),
-	creators:   map[JobKind]JobCreator{},
-	usedDeps:   map[string]int{},
+	cmdCh: make(chan func(*importQueue)),
+
+	creators: map[JobKind]JobCreator{},
+	usedDeps: map[string]int{},
+	waiting:  make(map[int64]chan struct{}),
 }
 
 var (
@@ -493,7 +498,8 @@
 	user string,
 	sendEmail bool,
 	data string,
-) (int64, error) {
+	sync bool,
+) (int64, chan struct{}, error) {
 
 	var id int64
 	if due.IsZero() {
@@ -509,32 +515,40 @@
 	var wr pgtype.Interval
 	if waitRetry != nil {
 		if err := wr.Set(*waitRetry); err != nil {
-			return 0, err
+			return 0, nil, err
 		}
 	} else {
 		wr = pgtype.Interval{Status: pgtype.Null}
 	}
 
-	ctx := context.Background()
-	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		return conn.QueryRowContext(
-			ctx,
-			insertJobSQL,
-			string(kind),
-			due,
-			tl,
-			&wr,
-			user,
-			sendEmail,
-			data).Scan(&id)
-	})
-	if err == nil {
-		select {
-		case q.signalChan <- struct{}{}:
-		default:
-		}
+	errCh := make(chan error)
+	var done chan struct{}
+
+	q.cmdCh <- func(q *importQueue) {
+		ctx := context.Background()
+		errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+			err := conn.QueryRowContext(
+				ctx,
+				insertJobSQL,
+				string(kind),
+				due,
+				tl,
+				&wr,
+				user,
+				sendEmail,
+				data).Scan(&id)
+
+			if err == nil && sync {
+				log.Printf("info: register wait for %d\n", id)
+				done = make(chan struct{})
+				q.waiting[id] = done
+			}
+
+			return err
+		})
 	}
-	return id, err
+
+	return id, done, <-errCh
 }
 
 // AddJob adds a job to the global import queue to be executed
@@ -550,14 +564,16 @@
 	sendEmail bool,
 	data string,
 ) (int64, error) {
-	return iqueue.addJob(
+	id, _, err := iqueue.addJob(
 		kind,
 		due,
 		triesLeft,
 		waitRetry,
 		user,
 		sendEmail,
-		data)
+		data,
+		false)
+	return id, err
 }
 
 const (
@@ -593,7 +609,7 @@
 	id int64,
 	accepted bool,
 	reviewer string,
-) error {
+) (chan struct{}, error) {
 	var (
 		pending bool
 		kind    string
@@ -601,16 +617,16 @@
 
 	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
 	case err == sql.ErrNoRows:
-		return fmt.Errorf("cannot find import #%d", id)
+		return nil, fmt.Errorf("cannot find import #%d", id)
 	case err != nil:
-		return err
+		return nil, err
 	case !pending:
-		return fmt.Errorf("#%d is not pending", id)
+		return nil, fmt.Errorf("#%d is not pending", id)
 	}
 
 	jc := q.jobCreator(JobKind(kind))
 	if jc == nil {
-		return fmt.Errorf("no job creator for kind '%s'", kind)
+		return nil, fmt.Errorf("no job creator for kind '%s'", kind)
 	}
 
 	r := &reviewedJob{
@@ -619,27 +635,36 @@
 	}
 	serialized, err := common.ToJSONString(r)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	// Try a little harder to persist the decision.
 	tries := reviewJobRetries
 	wait := reviewJobWait
 
-	rID, err := q.addJob(
+	rID, done, err := q.addJob(
 		JobKind(kind+ReviewJobSuffix),
 		time.Now(),
 		&tries,
 		&wait,
 		reviewer,
 		false,
-		serialized)
-	log.Printf("info: add review job %d\n", rID)
+		serialized,
+		true)
 	if err != nil {
-		return err
+		return nil, err
 	}
+	log.Printf("info: add review job %d\n", rID)
 	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
-	return err
+	if err != nil && done != nil {
+		go func() {
+			q.cmdCh <- func(q *importQueue) {
+				delete(q.waiting, rID)
+			}
+		}()
+		done = nil
+	}
+	return done, err
 }
 
 func (q *importQueue) decideImport(
@@ -652,18 +677,25 @@
 		ctx = context.Background()
 	}
 
-	return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
+	var done chan struct{}
+
+	if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
 		tx, err := conn.BeginTx(ctx, nil)
 		if err != nil {
 			return err
 		}
 		defer tx.Rollback()
-		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
+		done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
 		if err == nil {
 			err = tx.Commit()
 		}
 		return err
-	})
+	}); err != nil {
+		return err
+	}
+
+	<-done
+	return nil
 }
 
 func DecideImport(
@@ -879,7 +911,9 @@
 				break
 			}
 			select {
-			case <-q.signalChan:
+			case cmd := <-q.cmdCh:
+				cmd(q)
+
 			case <-time.After(pollDuration):
 			}
 		}
@@ -897,12 +931,16 @@
 
 		go func(jc JobCreator, idj *idJob) {
 
-			// Unlock the dependencies.
 			defer func() {
+				// Unlock the dependencies.
 				q.unlockDependencies(jc)
-				select {
-				case q.signalChan <- struct{}{}:
-				default:
+				// Unlock waiting.
+				q.cmdCh <- func(q *importQueue) {
+					log.Printf("unlock waiting %d\n", idj.id)
+					if w := q.waiting[idj.id]; w != nil {
+						close(w)
+						delete(q.waiting, idj.id)
+					}
 				}
 			}()
 
@@ -976,13 +1014,14 @@
 			}
 
 			if retry {
-				nid, err := q.addJob(
+				nid, _, err := q.addJob(
 					idj.kind,
 					idj.nextDue(),
 					idj.triesLeftPointer(),
 					idj.waitRetryPointer(),
 					idj.user, idj.sendEmail,
-					idj.data)
+					idj.data,
+					false)
 				if err != nil {
 					log.Printf("error: retry enqueue failed: %v\n", err)
 				} else {