Mercurial > gemma
comparison pkg/imports/queue.go @ 3221:899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 09 May 2019 15:07:09 +0200 |
parents | 4acbee65275d |
children | 1a0985083c06 |
comparison
equal
deleted
inserted
replaced
3220:56b297592c0a | 3221:899914a18d7e |
---|---|
105 | 105 |
106 type importQueue struct { | 106 type importQueue struct { |
107 signalChan chan struct{} | 107 signalChan chan struct{} |
108 creatorsMu sync.Mutex | 108 creatorsMu sync.Mutex |
109 creators map[JobKind]JobCreator | 109 creators map[JobKind]JobCreator |
110 usedDeps map[string]struct{} | 110 usedDeps map[string]int |
111 } | 111 } |
112 | 112 |
113 var iqueue = importQueue{ | 113 var iqueue = importQueue{ |
114 signalChan: make(chan struct{}), | 114 signalChan: make(chan struct{}), |
115 creators: map[JobKind]JobCreator{}, | 115 creators: map[JobKind]JobCreator{}, |
116 usedDeps: map[string]struct{}{}, | 116 usedDeps: map[string]int{}, |
117 } | 117 } |
118 | 118 |
119 var ( | 119 var ( |
120 // ImportStateNames is a list of the states a job can be in. | 120 // ImportStateNames is a list of the states a job can be in. |
121 ImportStateNames = []string{ | 121 ImportStateNames = []string{ |
436 var which []string | 436 var which []string |
437 | 437 |
438 q.creatorsMu.Lock() | 438 q.creatorsMu.Lock() |
439 nextCreator: | 439 nextCreator: |
440 for kind, jc := range q.creators { | 440 for kind, jc := range q.creators { |
441 deps := jc.Depends() | 441 // only test exclusive locks |
442 for l := range deps { | 442 for _, d := range jc.Depends()[0] { |
443 for _, d := range deps[l] { | 443 if q.usedDeps[d] > 0 { |
444 if _, found := q.usedDeps[d]; found { | 444 continue nextCreator |
445 continue nextCreator | |
446 } | |
447 } | 445 } |
448 } | 446 } |
449 which = append(which, string(kind)) | 447 which = append(which, string(kind)) |
450 } | 448 } |
451 q.creatorsMu.Unlock() | 449 q.creatorsMu.Unlock() |
593 continue | 591 continue |
594 } | 592 } |
595 | 593 |
596 // Lock dependencies. | 594 // Lock dependencies. |
597 q.creatorsMu.Lock() | 595 q.creatorsMu.Lock() |
598 deps := jc.Depends() | 596 for _, deps := range jc.Depends() { |
599 for l := range deps { | 597 for _, d := range deps { |
600 for _, d := range deps[l] { | 598 q.usedDeps[d]++ |
601 q.usedDeps[d] = struct{}{} | |
602 } | 599 } |
603 } | 600 } |
604 q.creatorsMu.Unlock() | 601 q.creatorsMu.Unlock() |
605 | 602 |
606 go func(jc JobCreator, idj *idJob) { | 603 go func(jc JobCreator, idj *idJob) { |
607 | 604 |
608 // Unlock the dependencies. | 605 // Unlock the dependencies. |
609 defer func() { | 606 defer func() { |
610 q.creatorsMu.Lock() | 607 q.creatorsMu.Lock() |
611 deps := jc.Depends() | 608 for _, deps := range jc.Depends() { |
612 for l := range deps { | 609 for _, d := range deps { |
613 for _, d := range deps[l] { | 610 q.usedDeps[d]-- |
614 delete(q.usedDeps, d) | |
615 } | 611 } |
616 } | 612 } |
617 q.creatorsMu.Unlock() | 613 q.creatorsMu.Unlock() |
618 select { | 614 select { |
619 case q.signalChan <- struct{}{}: | 615 case q.signalChan <- struct{}{}: |