comparison pkg/imports/queue.go @ 1005:fcf016ebdef4 persistent-import-queue

Re-enqueue import jobs in state running if the the gemma server starts. These are jobs that where started but did not finish before the server died before.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 18:14:16 +0200
parents d789f19877f4
children 8f23ec811afb
comparison
equal deleted inserted replaced
1004:7a4d233be077 1005:fcf016ebdef4
46 creators = map[JobKind]JobCreator{} 46 creators = map[JobKind]JobCreator{}
47 ) 47 )
48 48
49 const ( 49 const (
50 queueUser = "sys_admin" 50 queueUser = "sys_admin"
51
52 reEnqueueRunningSQL = `
53 UPDATE waterway.imports SET state = 'queued'::waterway.import_state
54 WHERE state = 'running'::waterway.import_state
55 `
51 56
52 insertJobSQL = ` 57 insertJobSQL = `
53 INSERT INTO waterway.imports ( 58 INSERT INTO waterway.imports (
54 kind, 59 kind,
55 username, 60 username,
162 }() 167 }()
163 return <-errCh 168 return <-errCh
164 } 169 }
165 } 170 }
166 171
172 func reEnqueueRunning() error {
173 ctx := context.Background()
174 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
175 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
176 return err
177 })
178 }
179
167 func fetchJob() (*idJob, error) { 180 func fetchJob() (*idJob, error) {
168 var ji idJob 181 var ji idJob
169 ctx := context.Background() 182 ctx := context.Background()
170 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { 183 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
171 tx, err := conn.BeginTx(ctx, nil) 184 tx, err := conn.BeginTx(ctx, nil)
223 return err 236 return err
224 } 237 }
225 238
226 func importLoop() { 239 func importLoop() {
227 config.WaitReady() 240 config.WaitReady()
241 // re-enqueue the jobs that are in state running.
242 // They where in progess when the server went down.
243 if err := reEnqueueRunning(); err != nil {
244 log.Printf("re-enquing failed: %v", err)
245 }
246
228 for { 247 for {
229 var idj *idJob 248 var idj *idJob
230 var err error 249 var err error
231 250
232 for { 251 for {