comparison pkg/imports/queue.go @ 5153:adf7b9f1273b

Send the info if an import job was re-scheduled to sync callers.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 01 Apr 2020 15:18:12 +0200
parents 6910c1cad1fb
children 4f0869b85038
comparison
equal deleted inserted replaced
5152:482046e5b63b 5153:adf7b9f1273b
15 15
16 import ( 16 import (
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "encoding/json" 19 "encoding/json"
20 "errors"
20 "fmt" 21 "fmt"
21 "log" 22 "log"
22 "runtime/debug" 23 "runtime/debug"
23 "sort" 24 "sort"
24 "strings" 25 "strings"
121 const ( 122 const (
122 ReviewJobSuffix = "#review" 123 ReviewJobSuffix = "#review"
123 reviewJobRetries = 10 124 reviewJobRetries = 10
124 reviewJobWait = time.Minute 125 reviewJobWait = time.Minute
125 ) 126 )
127
128 var ErrRetrying = errors.New("retrying")
126 129
127 type importQueue struct { 130 type importQueue struct {
128 cmdCh chan func(*importQueue) 131 cmdCh chan func(*importQueue)
129 132
130 creatorsMu sync.Mutex 133 creatorsMu sync.Mutex
692 return err 695 return err
693 }); err != nil { 696 }); err != nil {
694 return err 697 return err
695 } 698 }
696 699
697 <-done 700 _, retry := <-done
701 if retry {
702 return ErrRetrying
703 }
698 return nil 704 return nil
699 } 705 }
700 706
701 func DecideImport( 707 func DecideImport(
702 ctx context.Context, 708 ctx context.Context,
929 // Lock dependencies. 935 // Lock dependencies.
930 q.lockDependencies(jc) 936 q.lockDependencies(jc)
931 937
932 go func(jc JobCreator, idj *idJob) { 938 go func(jc JobCreator, idj *idJob) {
933 939
940 var retry bool
941
934 defer func() { 942 defer func() {
935 // Unlock the dependencies. 943 // Unlock the dependencies.
936 q.unlockDependencies(jc) 944 q.unlockDependencies(jc)
937 // Unlock waiting. 945 // Unlock waiting.
938 q.cmdCh <- func(q *importQueue) { 946 q.cmdCh <- func(q *importQueue) {
939 if w := q.waiting[idj.id]; w != nil { 947 if w := q.waiting[idj.id]; w != nil {
940 log.Printf("info: unlock waiting %d\n", idj.id) 948 log.Printf("info: unlock waiting %d\n", idj.id)
941 close(w) 949 if retry {
950 w <- struct{}{}
951 } else {
952 close(w)
953 }
942 delete(q.waiting, idj.id) 954 delete(q.waiting, idj.id)
943 } 955 }
944 } 956 }
945 }() 957 }()
946 958
968 summary, err = job.Do(ctx, idj.id, conn, feedback) 980 summary, err = job.Do(ctx, idj.id, conn, feedback)
969 return err 981 return err
970 }) 982 })
971 })() 983 })()
972 984
973 var unchanged, retry bool 985 var unchanged bool
974 if v, ok := errDo.(UnchangedError); ok { 986 if v, ok := errDo.(UnchangedError); ok {
975 feedback.Info("unchanged: %s", v.Error()) 987 feedback.Info("unchanged: %s", v.Error())
976 unchanged = true 988 unchanged = true
977 } else if errDo != nil { 989 } else if errDo != nil {
978 feedback.Error("error in import: %v", 990 feedback.Error("error in import: %v",