Mercurial > gemma
comparison pkg/imports/queue.go @ 987:3841509f6e9e
Store job id alongside to job in job queue.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Fri, 19 Oct 2018 18:09:26 +0200 |
parents | 7934b5c1a910 |
children | 7dfd3db94e6d |
comparison
equal
deleted
inserted
replaced
986:c34a2a643f5e | 987:3841509f6e9e |
---|---|
15 User() string | 15 User() string |
16 Do(conn *sql.Conn) error | 16 Do(conn *sql.Conn) error |
17 CleanUp() error | 17 CleanUp() error |
18 } | 18 } |
19 | 19 |
20 type idJob struct { | |
21 id int64 | |
22 job Job | |
23 } | |
24 | |
20 var ( | 25 var ( |
21 queueCond = sync.NewCond(new(sync.Mutex)) | 26 queueCond = sync.NewCond(new(sync.Mutex)) |
22 queue = list.New() | 27 queue = list.New() |
23 | 28 |
24 jobID int64 | 29 jobID int64 |
30 | 35 |
31 func AddJob(job Job) int64 { | 36 func AddJob(job Job) int64 { |
32 id := atomic.AddInt64(&jobID, 1) | 37 id := atomic.AddInt64(&jobID, 1) |
33 queueCond.L.Lock() | 38 queueCond.L.Lock() |
34 defer queueCond.L.Unlock() | 39 defer queueCond.L.Unlock() |
35 queue.PushBack(job) | 40 queue.PushBack(idJob{id, job}) |
36 queueCond.Signal() | 41 queueCond.Signal() |
37 return id | 42 return id |
38 } | 43 } |
39 | 44 |
40 func importLoop() { | 45 func importLoop() { |
41 for { | 46 for { |
42 var job Job | 47 var idj idJob |
43 queueCond.L.Lock() | 48 queueCond.L.Lock() |
44 for queue.Len() == 0 { | 49 for queue.Len() == 0 { |
45 queueCond.Wait() | 50 queueCond.Wait() |
46 } | 51 } |
47 job = queue.Remove(queue.Front()).(Job) | 52 idj = queue.Remove(queue.Front()).(idJob) |
48 queueCond.L.Unlock() | 53 queueCond.L.Unlock() |
49 | 54 |
50 if err := auth.RunAs(job.User(), context.Background(), job.Do); err != nil { | 55 log.Printf("starting import job %d\n", idj.id) |
51 log.Printf("import error: %v\n", err) | 56 |
57 if err := auth.RunAs(idj.job.User(), context.Background(), idj.job.Do); err != nil { | |
58 log.Printf("import error (job %d): %v\n", idj.id, err) | |
52 } | 59 } |
53 if err := job.CleanUp(); err != nil { | 60 if err := idj.job.CleanUp(); err != nil { |
54 log.Printf("cleanup error: %v\n", err) | 61 log.Printf("cleanup error (job %d): %v\n", idj.id, err) |
55 } | 62 } |
56 } | 63 } |
57 } | 64 } |