changeset 4681:2690933e404c

Limit the number of waiting diff calculations to 5. If this limit is exceeded a 429 Too Many Requests error code is returned.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 15 Oct 2019 20:49:24 +0200
parents 976aedc195e5
children 899bec1bd1c9
files pkg/controllers/diff.go
diffstat 1 files changed, 34 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/diff.go	Tue Oct 15 18:25:40 2019 +0200
+++ b/pkg/controllers/diff.go	Tue Oct 15 20:49:24 2019 +0200
@@ -33,6 +33,14 @@
 )
 
 const (
+	// maxNumberWorkers are processing diffs at a time.
+	maxNumberWorkers = 2
+	// maxQueueLength are the number of diffs to
+	// be wait for when idle.
+	maxQueueLength = 5
+)
+
+const (
 	contourTolerance = 0.1
 	contourStep      = 0.1
 )
@@ -103,6 +111,8 @@
 	}
 )
 
+var errBufferFull = errors.New("buffer full")
+
 var diffCalcMng = startDiffCalculationManager()
 
 func diffCalcHash(dci *models.DiffCalculationInput) string {
@@ -115,7 +125,7 @@
 	dcm := diffCalculationManager{
 		waiting: make(map[string][]chan diffResult),
 		cmds:    make(chan func(*diffCalculationManager)),
-		jobs:    make(chan *models.DiffCalculationInput),
+		jobs:    make(chan *models.DiffCalculationInput, maxQueueLength),
 	}
 
 	go dcm.run()
@@ -124,8 +134,8 @@
 
 func (dcm *diffCalculationManager) run() {
 	n := runtime.NumCPU()
-	if n > 2 {
-		n = 2
+	if n > maxNumberWorkers {
+		n = maxNumberWorkers
 	}
 	for i := 0; i < n; i++ {
 		go dcm.calculate()
@@ -356,7 +366,8 @@
 }
 
 func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult {
-	ch := make(chan diffResult)
+	// buffer to prevent blocking even if receiver is dead.
+	ch := make(chan diffResult, 1)
 
 	dcm.cmds <- func(dcm *diffCalculationManager) {
 		// When already running add to list of waitings
@@ -383,17 +394,22 @@
 		case err == sql.ErrNoRows:
 			// We need to calculate it.
 		case err != nil: // error
-			go func() { ch <- diffResult{err: err} }()
+			ch <- diffResult{err: err}
 			return
 		default: // we have it
-			go func() { ch <- diffResult{id: id} }()
+			ch <- diffResult{id: id}
 			return
 		}
 
 		// add to waiting
 		dcm.waiting[key] = []chan diffResult{ch}
 		// feed to workers
-		dcm.jobs <- dci
+		select {
+		case dcm.jobs <- dci:
+		default: // buffer is full
+			delete(dcm.waiting, key)
+			ch <- diffResult{err: errBufferFull}
+		}
 	}
 
 	return ch
@@ -432,12 +448,18 @@
 
 	select {
 	case res := <-resultCh:
-		if res.err != nil {
+		switch {
+		case res.err == errBufferFull:
+			err = mw.JSONError{
+				Code:    http.StatusTooManyRequests,
+				Message: "Too many differences are waiting to be processed",
+			}
+		case res.err != nil:
 			err = res.err
-			return
-		}
-		jr = mw.JSONResult{
-			Result: map[string]int64{"id": res.id},
+		default:
+			jr = mw.JSONResult{
+				Result: map[string]int64{"id": res.id},
+			}
 		}
 	case <-ctx.Done():
 		log.Println("request canceled")