comparison pkg/imports/queue.go @ 3221:899914a18d7e

Import queue: Implemented multiple reader / one writer strategy when locking ressources.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 09 May 2019 15:07:09 +0200
parents 4acbee65275d
children 1a0985083c06
comparison
equal deleted inserted replaced
3220:56b297592c0a 3221:899914a18d7e
105 105
106 type importQueue struct { 106 type importQueue struct {
107 signalChan chan struct{} 107 signalChan chan struct{}
108 creatorsMu sync.Mutex 108 creatorsMu sync.Mutex
109 creators map[JobKind]JobCreator 109 creators map[JobKind]JobCreator
110 usedDeps map[string]struct{} 110 usedDeps map[string]int
111 } 111 }
112 112
113 var iqueue = importQueue{ 113 var iqueue = importQueue{
114 signalChan: make(chan struct{}), 114 signalChan: make(chan struct{}),
115 creators: map[JobKind]JobCreator{}, 115 creators: map[JobKind]JobCreator{},
116 usedDeps: map[string]struct{}{}, 116 usedDeps: map[string]int{},
117 } 117 }
118 118
119 var ( 119 var (
120 // ImportStateNames is a list of the states a job can be in. 120 // ImportStateNames is a list of the states a job can be in.
121 ImportStateNames = []string{ 121 ImportStateNames = []string{
436 var which []string 436 var which []string
437 437
438 q.creatorsMu.Lock() 438 q.creatorsMu.Lock()
439 nextCreator: 439 nextCreator:
440 for kind, jc := range q.creators { 440 for kind, jc := range q.creators {
441 deps := jc.Depends() 441 // only test exclusive locks
442 for l := range deps { 442 for _, d := range jc.Depends()[0] {
443 for _, d := range deps[l] { 443 if q.usedDeps[d] > 0 {
444 if _, found := q.usedDeps[d]; found { 444 continue nextCreator
445 continue nextCreator
446 }
447 } 445 }
448 } 446 }
449 which = append(which, string(kind)) 447 which = append(which, string(kind))
450 } 448 }
451 q.creatorsMu.Unlock() 449 q.creatorsMu.Unlock()
593 continue 591 continue
594 } 592 }
595 593
596 // Lock dependencies. 594 // Lock dependencies.
597 q.creatorsMu.Lock() 595 q.creatorsMu.Lock()
598 deps := jc.Depends() 596 for _, deps := range jc.Depends() {
599 for l := range deps { 597 for _, d := range deps {
600 for _, d := range deps[l] { 598 q.usedDeps[d]++
601 q.usedDeps[d] = struct{}{}
602 } 599 }
603 } 600 }
604 q.creatorsMu.Unlock() 601 q.creatorsMu.Unlock()
605 602
606 go func(jc JobCreator, idj *idJob) { 603 go func(jc JobCreator, idj *idJob) {
607 604
608 // Unlock the dependencies. 605 // Unlock the dependencies.
609 defer func() { 606 defer func() {
610 q.creatorsMu.Lock() 607 q.creatorsMu.Lock()
611 deps := jc.Depends() 608 for _, deps := range jc.Depends() {
612 for l := range deps { 609 for _, d := range deps {
613 for _, d := range deps[l] { 610 q.usedDeps[d]--
614 delete(q.usedDeps, d)
615 } 611 }
616 } 612 }
617 q.creatorsMu.Unlock() 613 q.creatorsMu.Unlock()
618 select { 614 select {
619 case q.signalChan <- struct{}{}: 615 case q.signalChan <- struct{}{}: