Mercurial > gemma
comparison pkg/imports/queue.go @ 1010:8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 23 Oct 2018 10:14:41 +0200 |
parents | fcf016ebdef4 |
children | a244b18cb916 |
comparison
equal
deleted
inserted
replaced
1009:76e9296d6191 | 1010:8f23ec811afb |
---|---|
239 func importLoop() { | 239 func importLoop() { |
240 config.WaitReady() | 240 config.WaitReady() |
241 // re-enqueue the jobs that are in state running. | 241 // re-enqueue the jobs that are in state running. |
242 // They where in progess when the server went down. | 242 // They where in progess when the server went down. |
243 if err := reEnqueueRunning(); err != nil { | 243 if err := reEnqueueRunning(); err != nil { |
244 log.Printf("re-enquing failed: %v", err) | 244 log.Printf("re-enqueuing failed: %v", err) |
245 } | 245 } |
246 | 246 |
247 for { | 247 for { |
248 var idj *idJob | 248 var idj *idJob |
249 var err error | 249 var err error |
259 case <-signalChan: | 259 case <-signalChan: |
260 case <-time.After(pollDuration): | 260 case <-time.After(pollDuration): |
261 } | 261 } |
262 } | 262 } |
263 | 263 |
264 log.Printf("starting import job %d\n", idj.id) | 264 log.Printf("starting import #%d\n", idj.id) |
265 | 265 |
266 jc := jobCreator(idj.kind) | 266 jc := jobCreator(idj.kind) |
267 if jc == nil { | 267 if jc == nil { |
268 errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) | 268 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
269 continue | 269 continue |
270 } | 270 } |
271 | 271 |
272 job, err := jc(idj.kind, idj.data) | 272 job, err := jc(idj.kind, idj.data) |
273 if err != nil { | 273 if err != nil { |
274 errorAndFail(idj.id, "Faild to create job: %v", err) | 274 errorAndFail(idj.id, "failed to create job for import #%d: %v", |
275 idj.id, err) | |
275 continue | 276 continue |
276 } | 277 } |
277 | 278 |
278 feedback := logFeedback(idj.id) | 279 feedback := logFeedback(idj.id) |
280 | |
281 feedback.Info("import #%d started\n", idj.id) | |
279 | 282 |
280 errDo := survive(func() error { | 283 errDo := survive(func() error { |
281 return auth.RunAs(idj.user, context.Background(), | 284 return auth.RunAs(idj.user, context.Background(), |
282 func(conn *sql.Conn) error { return job.Do(conn, feedback) }) | 285 func(conn *sql.Conn) error { return job.Do(conn, feedback) }) |
283 })() | 286 })() |
296 state = "successful" | 299 state = "successful" |
297 } | 300 } |
298 if err := updateState(idj.id, state); err != nil { | 301 if err := updateState(idj.id, state); err != nil { |
299 log.Printf("setting state of job %d failed: %v\n", idj.id, err) | 302 log.Printf("setting state of job %d failed: %v\n", idj.id, err) |
300 } | 303 } |
301 log.Printf("job %d finished: %s\n", idj.id, state) | 304 log.Printf("import #%d finished: %s\n", idj.id, state) |
302 } | 305 } |
303 } | 306 } |