changeset 3225:1a0985083c06

Import queue: Fixed exclusive writers.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 09 May 2019 16:09:46 +0200
parents bfde4f8dd323
children d84c64c0f510
files pkg/imports/queue.go
diffstat 1 files changed, 38 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Thu May 09 15:50:07 2019 +0200
+++ b/pkg/imports/queue.go	Thu May 09 16:09:46 2019 +0200
@@ -101,7 +101,10 @@
 	}
 )
 
-const pollDuration = time.Second * 10
+const (
+	pollDuration = time.Second * 10
+	runExclusive = -66666
+)
 
 type importQueue struct {
 	signalChan chan struct{}
@@ -306,6 +309,30 @@
 	return d
 }
 
+func (q *importQueue) lockDependencies(jc JobCreator) {
+	deps := jc.Depends()
+	q.creatorsMu.Lock()
+	defer q.creatorsMu.Unlock()
+	for _, d := range deps[0] {
+		q.usedDeps[d] = runExclusive
+	}
+	for _, d := range deps[1] {
+		q.usedDeps[d]++
+	}
+}
+
+func (q *importQueue) unlockDependencies(jc JobCreator) {
+	deps := jc.Depends()
+	q.creatorsMu.Lock()
+	defer q.creatorsMu.Unlock()
+	for _, d := range deps[0] {
+		q.usedDeps[d] = 0
+	}
+	for _, d := range deps[1] {
+		q.usedDeps[d]--
+	}
+}
+
 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
@@ -438,9 +465,14 @@
 	q.creatorsMu.Lock()
 nextCreator:
 	for kind, jc := range q.creators {
-		// only test exclusive locks
-		for _, d := range jc.Depends()[0] {
-			if q.usedDeps[d] > 0 {
+		deps := jc.Depends()
+		for _, d := range deps[0] {
+			if q.usedDeps[d] != 0 {
+				continue nextCreator
+			}
+		}
+		for _, d := range deps[1] {
+			if q.usedDeps[d] == runExclusive {
 				continue nextCreator
 			}
 		}
@@ -592,25 +624,13 @@
 		}
 
 		// Lock dependencies.
-		q.creatorsMu.Lock()
-		for _, deps := range jc.Depends() {
-			for _, d := range deps {
-				q.usedDeps[d]++
-			}
-		}
-		q.creatorsMu.Unlock()
+		q.lockDependencies(jc)
 
 		go func(jc JobCreator, idj *idJob) {
 
 			// Unlock the dependencies.
 			defer func() {
-				q.creatorsMu.Lock()
-				for _, deps := range jc.Depends() {
-					for _, d := range deps {
-						q.usedDeps[d]--
-					}
-				}
-				q.creatorsMu.Unlock()
+				q.unlockDependencies(jc)
 				select {
 				case q.signalChan <- struct{}{}:
 				default: