Mercurial > gemma
changeset 3225:1a0985083c06
Import queue: Fixed exclusive writers.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 09 May 2019 16:09:46 +0200 |
parents | bfde4f8dd323 |
children | d84c64c0f510 |
files | pkg/imports/queue.go |
diffstat | 1 files changed, 38 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Thu May 09 15:50:07 2019 +0200 +++ b/pkg/imports/queue.go Thu May 09 16:09:46 2019 +0200 @@ -101,7 +101,10 @@ } ) -const pollDuration = time.Second * 10 +const ( + pollDuration = time.Second * 10 + runExclusive = -66666 +) type importQueue struct { signalChan chan struct{} @@ -306,6 +309,30 @@ return d } +func (q *importQueue) lockDependencies(jc JobCreator) { + deps := jc.Depends() + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + for _, d := range deps[0] { + q.usedDeps[d] = runExclusive + } + for _, d := range deps[1] { + q.usedDeps[d]++ + } +} + +func (q *importQueue) unlockDependencies(jc JobCreator) { + deps := jc.Depends() + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + for _, d := range deps[0] { + q.usedDeps[d] = 0 + } + for _, d := range deps[1] { + q.usedDeps[d]-- + } +} + func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() @@ -438,9 +465,14 @@ q.creatorsMu.Lock() nextCreator: for kind, jc := range q.creators { - // only test exclusive locks - for _, d := range jc.Depends()[0] { - if q.usedDeps[d] > 0 { + deps := jc.Depends() + for _, d := range deps[0] { + if q.usedDeps[d] != 0 { + continue nextCreator + } + } + for _, d := range deps[1] { + if q.usedDeps[d] == runExclusive { continue nextCreator } } @@ -592,25 +624,13 @@ } // Lock dependencies. - q.creatorsMu.Lock() - for _, deps := range jc.Depends() { - for _, d := range deps { - q.usedDeps[d]++ - } - } - q.creatorsMu.Unlock() + q.lockDependencies(jc) go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { - q.creatorsMu.Lock() - for _, deps := range jc.Depends() { - for _, d := range deps { - q.usedDeps[d]-- - } - } - q.creatorsMu.Unlock() + q.unlockDependencies(jc) select { case q.signalChan <- struct{}{}: default: