Mercurial > gemma
comparison pkg/imports/queue.go @ 1708:49e047c2106e
Imports: Made imports re-runnable if they fail.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 08 Jan 2019 13:35:44 +0100 |
parents | 4a2fad8f57de |
children | 8ddbedf296d7 |
comparison
equal
deleted
inserted
replaced
1707:74b66527ae81 | 1708:49e047c2106e |
---|---|
38 Info(fmt string, args ...interface{}) | 38 Info(fmt string, args ...interface{}) |
39 // Warn logs warnings. | 39 // Warn logs warnings. |
40 Warn(fmt string, args ...interface{}) | 40 Warn(fmt string, args ...interface{}) |
41 // Error logs errors. | 41 // Error logs errors. |
42 Error(fmt string, args ...interface{}) | 42 Error(fmt string, args ...interface{}) |
43 } | |
44 | |
45 // RetryError is an error type to signal that | |
46 // the import should be tried again. | |
47 RetryError struct { | |
48 // Message is the error message. | |
49 Message string | |
50 // When is the new scheduled execution time. | |
51 When time.Time | |
43 } | 52 } |
44 | 53 |
45 // Job is the central abstraction of an import job | 54 // Job is the central abstraction of an import job |
46 // run by the import queue. | 55 // run by the import queue. |
47 Job interface { | 56 Job interface { |
86 | 95 |
87 idJob struct { | 96 idJob struct { |
88 id int64 | 97 id int64 |
89 kind JobKind | 98 kind JobKind |
90 user string | 99 user string |
100 trysLeft sql.NullInt64 | |
91 sendEmail bool | 101 sendEmail bool |
92 autoAccept bool | 102 autoAccept bool |
93 data string | 103 data string |
94 } | 104 } |
95 ) | 105 ) |
129 WHERE state = 'running'::waterway.import_state` | 139 WHERE state = 'running'::waterway.import_state` |
130 | 140 |
131 insertJobSQL = ` | 141 insertJobSQL = ` |
132 INSERT INTO waterway.imports ( | 142 INSERT INTO waterway.imports ( |
133 kind, | 143 kind, |
144 due, | |
145 trys_left, | |
134 username, | 146 username, |
135 send_email, | 147 send_email, |
136 auto_accept, | 148 auto_accept, |
137 data | 149 data |
138 ) VALUES ( | 150 ) VALUES ( |
139 $1, | 151 $1, |
140 $2, | 152 COALESCE($2, CURRENT_TIMESTAMP), |
141 $3, | 153 $3, |
142 $4, | 154 $4, |
143 $5 | 155 $5, |
156 $6, | |
157 $7 | |
144 ) RETURNING id` | 158 ) RETURNING id` |
145 | 159 |
146 selectJobSQL = ` | 160 selectJobSQL = ` |
147 SELECT | 161 SELECT |
148 id, | 162 id, |
149 kind, | 163 kind, |
164 trys_left, | |
150 username, | 165 username, |
151 send_email, | 166 send_email, |
152 auto_accept, | 167 auto_accept, |
153 data | 168 data |
154 FROM waterway.imports | 169 FROM waterway.imports |
155 WHERE state = 'queued'::waterway.import_state AND enqueued IN ( | 170 WHERE |
156 SELECT min(enqueued) | 171 due <= CURRENT_TIMESTAMP AND |
157 FROM waterway.imports | 172 state = 'queued'::waterway.import_state AND enqueued IN ( |
158 WHERE state = 'queued'::waterway.import_state AND | 173 SELECT min(enqueued) |
159 kind = ANY($1) | 174 FROM waterway.imports |
160 ) | 175 WHERE state = 'queued'::waterway.import_state AND |
176 kind = ANY($1)) | |
161 LIMIT 1` | 177 LIMIT 1` |
162 | 178 |
163 updateStateSQL = ` | 179 updateStateSQL = ` |
164 UPDATE waterway.imports SET state = $1::waterway.import_state | 180 UPDATE waterway.imports SET state = $1::waterway.import_state |
165 WHERE id = $2` | 181 WHERE id = $2` |
184 | 200 |
185 func init() { | 201 func init() { |
186 go iqueue.importLoop() | 202 go iqueue.importLoop() |
187 } | 203 } |
188 | 204 |
205 // Error makes RetryError an error. | |
206 func (re *RetryError) Error() string { | |
207 return re.Message | |
208 } | |
209 | |
189 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { | 210 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
190 q.creatorsMu.Lock() | 211 q.creatorsMu.Lock() |
191 defer q.creatorsMu.Unlock() | 212 defer q.creatorsMu.Unlock() |
192 q.creators[kind] = jc | 213 q.creators[kind] = jc |
193 } | 214 } |
234 } | 255 } |
235 // XXX: Consider using sort.Strings to make output deterministic. | 256 // XXX: Consider using sort.Strings to make output deterministic. |
236 return names | 257 return names |
237 } | 258 } |
238 | 259 |
260 func (idj *idJob) trys() int { | |
261 if !idj.trysLeft.Valid { | |
262 return -1 | |
263 } | |
264 return int(idj.trysLeft.Int64) | |
265 } | |
266 | |
239 func (q *importQueue) jobCreator(kind JobKind) JobCreator { | 267 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
240 q.creatorsMu.Lock() | 268 q.creatorsMu.Lock() |
241 defer q.creatorsMu.Unlock() | 269 defer q.creatorsMu.Unlock() |
242 return q.creators[kind] | 270 return q.creators[kind] |
243 } | 271 } |
244 | 272 |
245 func (q *importQueue) addJob( | 273 func (q *importQueue) addJob( |
246 kind JobKind, | 274 kind JobKind, |
275 due time.Time, | |
276 trysLeft int, | |
247 user string, | 277 user string, |
248 sendEmail, autoAccept bool, | 278 sendEmail, autoAccept bool, |
249 data string, | 279 data string, |
250 ) (int64, error) { | 280 ) (int64, error) { |
251 ctx := context.Background() | 281 ctx := context.Background() |
252 var id int64 | 282 var id int64 |
283 if due.IsZero() { | |
284 due = time.Now() | |
285 } | |
286 var tl sql.NullInt64 | |
287 if trysLeft >= 0 { | |
288 tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} | |
289 } | |
253 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { | 290 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
254 return conn.QueryRowContext( | 291 return conn.QueryRowContext( |
255 ctx, | 292 ctx, |
256 insertJobSQL, | 293 insertJobSQL, |
257 string(kind), | 294 string(kind), |
295 due, | |
296 tl, | |
258 user, | 297 user, |
259 sendEmail, | 298 sendEmail, |
260 autoAccept, | 299 autoAccept, |
261 data).Scan(&id) | 300 data).Scan(&id) |
262 }) | 301 }) |
268 } | 307 } |
269 return id, err | 308 return id, err |
270 } | 309 } |
271 | 310 |
272 // AddJob adds a job to the global import queue to be executed | 311 // AddJob adds a job to the global import queue to be executed |
273 // as soon as possible. This is gone in a separate Go routine | 312 // as soon as possible after due. |
313 // This is gone in a separate Go routine | |
274 // so this will not block. | 314 // so this will not block. |
275 func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) { | 315 func AddJob( |
276 return iqueue.addJob(kind, user, sendEmail, autoAccept, data) | 316 kind JobKind, |
317 due time.Time, | |
318 trysLeft int, | |
319 user string, | |
320 sendEmail, autoAccept bool, | |
321 data string, | |
322 ) (int64, error) { | |
323 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data) | |
277 } | 324 } |
278 | 325 |
279 type logFeedback int64 | 326 type logFeedback int64 |
280 | 327 |
281 func (lf logFeedback) log(kind, format string, args ...interface{}) { | 328 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
355 } | 402 } |
356 defer tx.Rollback() | 403 defer tx.Rollback() |
357 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( | 404 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
358 &ji.id, | 405 &ji.id, |
359 &ji.kind, | 406 &ji.kind, |
407 &ji.trysLeft, | |
360 &ji.user, | 408 &ji.user, |
361 &ji.sendEmail, | 409 &ji.sendEmail, |
362 &ji.autoAccept, | 410 &ji.autoAccept, |
363 &ji.data, | 411 &ji.data, |
364 ); err != nil { | 412 ); err != nil { |
500 }) | 548 }) |
501 })() | 549 })() |
502 if errDo != nil { | 550 if errDo != nil { |
503 feedback.Error("error do: %v", errDo) | 551 feedback.Error("error do: %v", errDo) |
504 } | 552 } |
505 errCleanup := survive(job.CleanUp)() | 553 // Should we try again? |
506 if errCleanup != nil { | 554 retry, shouldRetry := errDo.(*RetryError) |
507 feedback.Error("error cleanup: %v", errCleanup) | 555 |
556 if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less | |
557 if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 { | |
558 shouldRetry = false | |
559 } | |
560 } | |
561 | |
562 var errCleanup error | |
563 if !shouldRetry { // cleanup debris | |
564 errCleanup = survive(job.CleanUp)() | |
565 if errCleanup != nil { | |
566 feedback.Error("error cleanup: %v", errCleanup) | |
567 } | |
508 } | 568 } |
509 | 569 |
510 var state string | 570 var state string |
511 switch { | 571 switch { |
512 case errDo != nil || errCleanup != nil: | 572 case errDo != nil || errCleanup != nil: |
522 // TODO: Send email if sendEmail is set. | 582 // TODO: Send email if sendEmail is set. |
523 log.Printf("import #%d finished: %s\n", idj.id, state) | 583 log.Printf("import #%d finished: %s\n", idj.id, state) |
524 if idj.sendEmail { | 584 if idj.sendEmail { |
525 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) | 585 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
526 } | 586 } |
587 | |
588 if shouldRetry { | |
589 nid, err := q.addJob( | |
590 idj.kind, | |
591 retry.When, idj.trys(), | |
592 idj.user, idj.sendEmail, idj.autoAccept, | |
593 idj.data) | |
594 if err != nil { | |
595 log.Printf("error: retry enqueue failed: %v\n", err) | |
596 } else { | |
597 log.Printf("info: re-enqueued job with id %d\n", nid) | |
598 } | |
599 } | |
527 }(jc, idj) | 600 }(jc, idj) |
528 } | 601 } |
529 } | 602 } |