comparison pkg/imports/queue.go @ 987:3841509f6e9e

Store job id alongside to job in job queue.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 19 Oct 2018 18:09:26 +0200
parents 7934b5c1a910
children 7dfd3db94e6d
comparison
equal deleted inserted replaced
986:c34a2a643f5e 987:3841509f6e9e
15 User() string 15 User() string
16 Do(conn *sql.Conn) error 16 Do(conn *sql.Conn) error
17 CleanUp() error 17 CleanUp() error
18 } 18 }
19 19
20 type idJob struct {
21 id int64
22 job Job
23 }
24
20 var ( 25 var (
21 queueCond = sync.NewCond(new(sync.Mutex)) 26 queueCond = sync.NewCond(new(sync.Mutex))
22 queue = list.New() 27 queue = list.New()
23 28
24 jobID int64 29 jobID int64
30 35
31 func AddJob(job Job) int64 { 36 func AddJob(job Job) int64 {
32 id := atomic.AddInt64(&jobID, 1) 37 id := atomic.AddInt64(&jobID, 1)
33 queueCond.L.Lock() 38 queueCond.L.Lock()
34 defer queueCond.L.Unlock() 39 defer queueCond.L.Unlock()
35 queue.PushBack(job) 40 queue.PushBack(idJob{id, job})
36 queueCond.Signal() 41 queueCond.Signal()
37 return id 42 return id
38 } 43 }
39 44
40 func importLoop() { 45 func importLoop() {
41 for { 46 for {
42 var job Job 47 var idj idJob
43 queueCond.L.Lock() 48 queueCond.L.Lock()
44 for queue.Len() == 0 { 49 for queue.Len() == 0 {
45 queueCond.Wait() 50 queueCond.Wait()
46 } 51 }
47 job = queue.Remove(queue.Front()).(Job) 52 idj = queue.Remove(queue.Front()).(idJob)
48 queueCond.L.Unlock() 53 queueCond.L.Unlock()
49 54
50 if err := auth.RunAs(job.User(), context.Background(), job.Do); err != nil { 55 log.Printf("starting import job %d\n", idj.id)
51 log.Printf("import error: %v\n", err) 56
57 if err := auth.RunAs(idj.job.User(), context.Background(), idj.job.Do); err != nil {
58 log.Printf("import error (job %d): %v\n", idj.id, err)
52 } 59 }
53 if err := job.CleanUp(); err != nil { 60 if err := idj.job.CleanUp(); err != nil {
54 log.Printf("cleanup error: %v\n", err) 61 log.Printf("cleanup error (job %d): %v\n", idj.id, err)
55 } 62 }
56 } 63 }
57 } 64 }