# HG changeset patch # User Sascha L. Teichmann # Date 1571165364 -7200 # Node ID 2690933e404cd8396fb402ccb1d3d4d4783ab373 # Parent 976aedc195e5dca030879aa35946d7e3ebab6553 Limit the number of waiting diff calculations to 5. If this limit is exceeded a 429 Too Many Requests error code is returned. diff -r 976aedc195e5 -r 2690933e404c pkg/controllers/diff.go --- a/pkg/controllers/diff.go Tue Oct 15 18:25:40 2019 +0200 +++ b/pkg/controllers/diff.go Tue Oct 15 20:49:24 2019 +0200 @@ -33,6 +33,14 @@ ) const ( + // maxNumberWorkers are processing diffs at a time. + maxNumberWorkers = 2 + // maxQueueLength are the number of diffs to + // be wait for when idle. + maxQueueLength = 5 +) + +const ( contourTolerance = 0.1 contourStep = 0.1 ) @@ -103,6 +111,8 @@ } ) +var errBufferFull = errors.New("buffer full") + var diffCalcMng = startDiffCalculationManager() func diffCalcHash(dci *models.DiffCalculationInput) string { @@ -115,7 +125,7 @@ dcm := diffCalculationManager{ waiting: make(map[string][]chan diffResult), cmds: make(chan func(*diffCalculationManager)), - jobs: make(chan *models.DiffCalculationInput), + jobs: make(chan *models.DiffCalculationInput, maxQueueLength), } go dcm.run() @@ -124,8 +134,8 @@ func (dcm *diffCalculationManager) run() { n := runtime.NumCPU() - if n > 2 { - n = 2 + if n > maxNumberWorkers { + n = maxNumberWorkers } for i := 0; i < n; i++ { go dcm.calculate() @@ -356,7 +366,8 @@ } func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult { - ch := make(chan diffResult) + // buffer to prevent blocking even if receiver is dead. + ch := make(chan diffResult, 1) dcm.cmds <- func(dcm *diffCalculationManager) { // When already running add to list of waitings @@ -383,17 +394,22 @@ case err == sql.ErrNoRows: // We need to calculate it. case err != nil: // error - go func() { ch <- diffResult{err: err} }() + ch <- diffResult{err: err} return default: // we have it - go func() { ch <- diffResult{id: id} }() + ch <- diffResult{id: id} return } // add to waiting dcm.waiting[key] = []chan diffResult{ch} // feed to workers - dcm.jobs <- dci + select { + case dcm.jobs <- dci: + default: // buffer is full + delete(dcm.waiting, key) + ch <- diffResult{err: errBufferFull} + } } return ch @@ -432,12 +448,18 @@ select { case res := <-resultCh: - if res.err != nil { + switch { + case res.err == errBufferFull: + err = mw.JSONError{ + Code: http.StatusTooManyRequests, + Message: "Too many differences are waiting to be processed", + } + case res.err != nil: err = res.err - return - } - jr = mw.JSONResult{ - Result: map[string]int64{"id": res.id}, + default: + jr = mw.JSONResult{ + Result: map[string]int64{"id": res.id}, + } } case <-ctx.Done(): log.Println("request canceled")