Mercurial > gemma
comparison pkg/imports/queue.go @ 5153:adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 01 Apr 2020 15:18:12 +0200 |
parents | 6910c1cad1fb |
children | 4f0869b85038 |
comparison
equal
deleted
inserted
replaced
5152:482046e5b63b | 5153:adf7b9f1273b |
---|---|
15 | 15 |
16 import ( | 16 import ( |
17 "context" | 17 "context" |
18 "database/sql" | 18 "database/sql" |
19 "encoding/json" | 19 "encoding/json" |
20 "errors" | |
20 "fmt" | 21 "fmt" |
21 "log" | 22 "log" |
22 "runtime/debug" | 23 "runtime/debug" |
23 "sort" | 24 "sort" |
24 "strings" | 25 "strings" |
121 const ( | 122 const ( |
122 ReviewJobSuffix = "#review" | 123 ReviewJobSuffix = "#review" |
123 reviewJobRetries = 10 | 124 reviewJobRetries = 10 |
124 reviewJobWait = time.Minute | 125 reviewJobWait = time.Minute |
125 ) | 126 ) |
127 | |
128 var ErrRetrying = errors.New("retrying") | |
126 | 129 |
127 type importQueue struct { | 130 type importQueue struct { |
128 cmdCh chan func(*importQueue) | 131 cmdCh chan func(*importQueue) |
129 | 132 |
130 creatorsMu sync.Mutex | 133 creatorsMu sync.Mutex |
692 return err | 695 return err |
693 }); err != nil { | 696 }); err != nil { |
694 return err | 697 return err |
695 } | 698 } |
696 | 699 |
697 <-done | 700 _, retry := <-done |
701 if retry { | |
702 return ErrRetrying | |
703 } | |
698 return nil | 704 return nil |
699 } | 705 } |
700 | 706 |
701 func DecideImport( | 707 func DecideImport( |
702 ctx context.Context, | 708 ctx context.Context, |
929 // Lock dependencies. | 935 // Lock dependencies. |
930 q.lockDependencies(jc) | 936 q.lockDependencies(jc) |
931 | 937 |
932 go func(jc JobCreator, idj *idJob) { | 938 go func(jc JobCreator, idj *idJob) { |
933 | 939 |
940 var retry bool | |
941 | |
934 defer func() { | 942 defer func() { |
935 // Unlock the dependencies. | 943 // Unlock the dependencies. |
936 q.unlockDependencies(jc) | 944 q.unlockDependencies(jc) |
937 // Unlock waiting. | 945 // Unlock waiting. |
938 q.cmdCh <- func(q *importQueue) { | 946 q.cmdCh <- func(q *importQueue) { |
939 if w := q.waiting[idj.id]; w != nil { | 947 if w := q.waiting[idj.id]; w != nil { |
940 log.Printf("info: unlock waiting %d\n", idj.id) | 948 log.Printf("info: unlock waiting %d\n", idj.id) |
941 close(w) | 949 if retry { |
950 w <- struct{}{} | |
951 } else { | |
952 close(w) | |
953 } | |
942 delete(q.waiting, idj.id) | 954 delete(q.waiting, idj.id) |
943 } | 955 } |
944 } | 956 } |
945 }() | 957 }() |
946 | 958 |
968 summary, err = job.Do(ctx, idj.id, conn, feedback) | 980 summary, err = job.Do(ctx, idj.id, conn, feedback) |
969 return err | 981 return err |
970 }) | 982 }) |
971 })() | 983 })() |
972 | 984 |
973 var unchanged, retry bool | 985 var unchanged bool |
974 if v, ok := errDo.(UnchangedError); ok { | 986 if v, ok := errDo.(UnchangedError); ok { |
975 feedback.Info("unchanged: %s", v.Error()) | 987 feedback.Info("unchanged: %s", v.Error()) |
976 unchanged = true | 988 unchanged = true |
977 } else if errDo != nil { | 989 } else if errDo != nil { |
978 feedback.Error("error in import: %v", | 990 feedback.Error("error in import: %v", |