Mercurial > gemma
comparison pkg/imports/queue.go @ 1003:d789f19877f4 persistent-import-queue
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 17:38:07 +0200 |
parents | 14425e35e3c2 |
children | fcf016ebdef4 |
comparison
equal
deleted
inserted
replaced
1001:e2860eff5d03 | 1003:d789f19877f4 |
---|---|
5 "database/sql" | 5 "database/sql" |
6 "fmt" | 6 "fmt" |
7 "log" | 7 "log" |
8 "runtime/debug" | 8 "runtime/debug" |
9 "sync" | 9 "sync" |
10 "time" | |
10 | 11 |
11 "gemma.intevation.de/gemma/pkg/auth" | 12 "gemma.intevation.de/gemma/pkg/auth" |
12 "gemma.intevation.de/gemma/pkg/config" | 13 "gemma.intevation.de/gemma/pkg/config" |
13 ) | 14 ) |
14 | 15 |
34 user string | 35 user string |
35 data string | 36 data string |
36 } | 37 } |
37 ) | 38 ) |
38 | 39 |
40 const pollDuration = time.Second * 10 | |
41 | |
39 var ( | 42 var ( |
40 queueCond = sync.NewCond(new(sync.Mutex)) | 43 signalChan = make(chan struct{}) |
41 | 44 |
42 creatorsMu sync.Mutex | 45 creatorsMu sync.Mutex |
43 creators = map[JobKind]JobCreator{} | 46 creators = map[JobKind]JobCreator{} |
44 ) | 47 ) |
45 | 48 |
104 return creators[kind] | 107 return creators[kind] |
105 } | 108 } |
106 | 109 |
107 func AddJob(kind JobKind, user, data string) (int64, error) { | 110 func AddJob(kind JobKind, user, data string) (int64, error) { |
108 ctx := context.Background() | 111 ctx := context.Background() |
109 queueCond.L.Lock() | |
110 defer queueCond.L.Unlock() | |
111 var id int64 | 112 var id int64 |
112 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | 113 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
113 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) | 114 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) |
114 }) | 115 }) |
115 if err == nil { | 116 if err == nil { |
116 queueCond.Signal() | 117 select { |
118 case signalChan <- struct{}{}: | |
119 default: | |
120 } | |
117 } | 121 } |
118 return id, err | 122 return id, err |
119 } | 123 } |
120 | 124 |
121 type logFeedback int64 | 125 type logFeedback int64 |
220 } | 224 } |
221 | 225 |
222 func importLoop() { | 226 func importLoop() { |
223 config.WaitReady() | 227 config.WaitReady() |
224 for { | 228 for { |
225 queueCond.L.Lock() | |
226 | |
227 var idj *idJob | 229 var idj *idJob |
228 var err error | 230 var err error |
229 | 231 |
230 for idj == nil { | 232 for { |
231 if idj, err = fetchJob(); err != nil { | 233 if idj, err = fetchJob(); err != nil { |
232 log.Printf("db error: %v\n", err) | 234 log.Printf("db error: %v\n", err) |
233 queueCond.Wait() | |
234 } else if idj == nil { | |
235 queueCond.Wait() | |
236 } | 235 } |
237 } | 236 if idj != nil { |
238 | 237 break |
239 queueCond.L.Unlock() | 238 } |
239 select { | |
240 case <-signalChan: | |
241 case <-time.After(pollDuration): | |
242 } | |
243 } | |
240 | 244 |
241 log.Printf("starting import job %d\n", idj.id) | 245 log.Printf("starting import job %d\n", idj.id) |
242 | 246 |
243 jc := jobCreator(idj.kind) | 247 jc := jobCreator(idj.kind) |
244 if jc == nil { | 248 if jc == nil { |
264 errCleanup := survive(job.CleanUp)() | 268 errCleanup := survive(job.CleanUp)() |
265 if errCleanup != nil { | 269 if errCleanup != nil { |
266 feedback.Error("error cleanup: %v\n", errCleanup) | 270 feedback.Error("error cleanup: %v\n", errCleanup) |
267 } | 271 } |
268 | 272 |
273 var state string | |
269 if errDo != nil || errCleanup != nil { | 274 if errDo != nil || errCleanup != nil { |
270 err = updateState(idj.id, "failed") | 275 state = "failed" |
271 } else { | 276 } else { |
272 err = updateState(idj.id, "successful") | 277 state = "successful" |
273 } | 278 } |
274 if err != nil { | 279 if err := updateState(idj.id, state); err != nil { |
275 log.Printf("setting state of job %d failed: %v\n", idj.id, err) | 280 log.Printf("setting state of job %d failed: %v\n", idj.id, err) |
276 } | 281 } |
277 } | 282 log.Printf("job %d finished: %s\n", idj.id, state) |
278 } | 283 } |
284 } |