Mercurial > gemma
comparison pkg/imports/queue.go @ 1005:fcf016ebdef4 persistent-import-queue
Re-enqueue import jobs in state running if the the gemma server starts.
These are jobs that where started but did not finish before the server
died before.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 18:14:16 +0200 |
parents | d789f19877f4 |
children | 8f23ec811afb |
comparison
equal
deleted
inserted
replaced
1004:7a4d233be077 | 1005:fcf016ebdef4 |
---|---|
46 creators = map[JobKind]JobCreator{} | 46 creators = map[JobKind]JobCreator{} |
47 ) | 47 ) |
48 | 48 |
49 const ( | 49 const ( |
50 queueUser = "sys_admin" | 50 queueUser = "sys_admin" |
51 | |
52 reEnqueueRunningSQL = ` | |
53 UPDATE waterway.imports SET state = 'queued'::waterway.import_state | |
54 WHERE state = 'running'::waterway.import_state | |
55 ` | |
51 | 56 |
52 insertJobSQL = ` | 57 insertJobSQL = ` |
53 INSERT INTO waterway.imports ( | 58 INSERT INTO waterway.imports ( |
54 kind, | 59 kind, |
55 username, | 60 username, |
162 }() | 167 }() |
163 return <-errCh | 168 return <-errCh |
164 } | 169 } |
165 } | 170 } |
166 | 171 |
172 func reEnqueueRunning() error { | |
173 ctx := context.Background() | |
174 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | |
175 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) | |
176 return err | |
177 }) | |
178 } | |
179 | |
167 func fetchJob() (*idJob, error) { | 180 func fetchJob() (*idJob, error) { |
168 var ji idJob | 181 var ji idJob |
169 ctx := context.Background() | 182 ctx := context.Background() |
170 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | 183 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
171 tx, err := conn.BeginTx(ctx, nil) | 184 tx, err := conn.BeginTx(ctx, nil) |
223 return err | 236 return err |
224 } | 237 } |
225 | 238 |
226 func importLoop() { | 239 func importLoop() { |
227 config.WaitReady() | 240 config.WaitReady() |
241 // re-enqueue the jobs that are in state running. | |
242 // They where in progess when the server went down. | |
243 if err := reEnqueueRunning(); err != nil { | |
244 log.Printf("re-enquing failed: %v", err) | |
245 } | |
246 | |
228 for { | 247 for { |
229 var idj *idJob | 248 var idj *idJob |
230 var err error | 249 var err error |
231 | 250 |
232 for { | 251 for { |