comparison pkg/imports/queue.go @ 1003:d789f19877f4 persistent-import-queue

Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 17:38:07 +0200
parents 14425e35e3c2
children fcf016ebdef4
comparison
equal deleted inserted replaced
1001:e2860eff5d03 1003:d789f19877f4
5 "database/sql" 5 "database/sql"
6 "fmt" 6 "fmt"
7 "log" 7 "log"
8 "runtime/debug" 8 "runtime/debug"
9 "sync" 9 "sync"
10 "time"
10 11
11 "gemma.intevation.de/gemma/pkg/auth" 12 "gemma.intevation.de/gemma/pkg/auth"
12 "gemma.intevation.de/gemma/pkg/config" 13 "gemma.intevation.de/gemma/pkg/config"
13 ) 14 )
14 15
34 user string 35 user string
35 data string 36 data string
36 } 37 }
37 ) 38 )
38 39
40 const pollDuration = time.Second * 10
41
39 var ( 42 var (
40 queueCond = sync.NewCond(new(sync.Mutex)) 43 signalChan = make(chan struct{})
41 44
42 creatorsMu sync.Mutex 45 creatorsMu sync.Mutex
43 creators = map[JobKind]JobCreator{} 46 creators = map[JobKind]JobCreator{}
44 ) 47 )
45 48
104 return creators[kind] 107 return creators[kind]
105 } 108 }
106 109
107 func AddJob(kind JobKind, user, data string) (int64, error) { 110 func AddJob(kind JobKind, user, data string) (int64, error) {
108 ctx := context.Background() 111 ctx := context.Background()
109 queueCond.L.Lock()
110 defer queueCond.L.Unlock()
111 var id int64 112 var id int64
112 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { 113 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
113 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) 114 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
114 }) 115 })
115 if err == nil { 116 if err == nil {
116 queueCond.Signal() 117 select {
118 case signalChan <- struct{}{}:
119 default:
120 }
117 } 121 }
118 return id, err 122 return id, err
119 } 123 }
120 124
121 type logFeedback int64 125 type logFeedback int64
220 } 224 }
221 225
222 func importLoop() { 226 func importLoop() {
223 config.WaitReady() 227 config.WaitReady()
224 for { 228 for {
225 queueCond.L.Lock()
226
227 var idj *idJob 229 var idj *idJob
228 var err error 230 var err error
229 231
230 for idj == nil { 232 for {
231 if idj, err = fetchJob(); err != nil { 233 if idj, err = fetchJob(); err != nil {
232 log.Printf("db error: %v\n", err) 234 log.Printf("db error: %v\n", err)
233 queueCond.Wait()
234 } else if idj == nil {
235 queueCond.Wait()
236 } 235 }
237 } 236 if idj != nil {
238 237 break
239 queueCond.L.Unlock() 238 }
239 select {
240 case <-signalChan:
241 case <-time.After(pollDuration):
242 }
243 }
240 244
241 log.Printf("starting import job %d\n", idj.id) 245 log.Printf("starting import job %d\n", idj.id)
242 246
243 jc := jobCreator(idj.kind) 247 jc := jobCreator(idj.kind)
244 if jc == nil { 248 if jc == nil {
264 errCleanup := survive(job.CleanUp)() 268 errCleanup := survive(job.CleanUp)()
265 if errCleanup != nil { 269 if errCleanup != nil {
266 feedback.Error("error cleanup: %v\n", errCleanup) 270 feedback.Error("error cleanup: %v\n", errCleanup)
267 } 271 }
268 272
273 var state string
269 if errDo != nil || errCleanup != nil { 274 if errDo != nil || errCleanup != nil {
270 err = updateState(idj.id, "failed") 275 state = "failed"
271 } else { 276 } else {
272 err = updateState(idj.id, "successful") 277 state = "successful"
273 } 278 }
274 if err != nil { 279 if err := updateState(idj.id, state); err != nil {
275 log.Printf("setting state of job %d failed: %v\n", idj.id, err) 280 log.Printf("setting state of job %d failed: %v\n", idj.id, err)
276 } 281 }
277 } 282 log.Printf("job %d finished: %s\n", idj.id, state)
278 } 283 }
284 }