Mercurial > gemma
comparison pkg/imports/queue.go @ 1985:8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 23 Jan 2019 17:58:57 +0100 |
parents | d966f03ea819 |
children | 59055c8301df |
comparison
equal
deleted
inserted
replaced
1983:f9f1babe52ae | 1985:8eeb0b5eb340 |
---|---|
38 Info(fmt string, args ...interface{}) | 38 Info(fmt string, args ...interface{}) |
39 // Warn logs warnings. | 39 // Warn logs warnings. |
40 Warn(fmt string, args ...interface{}) | 40 Warn(fmt string, args ...interface{}) |
41 // Error logs errors. | 41 // Error logs errors. |
42 Error(fmt string, args ...interface{}) | 42 Error(fmt string, args ...interface{}) |
43 } | |
44 | |
45 // RetryError is an error type to signal that | |
46 // the import should be tried again. | |
47 RetryError struct { | |
48 // Message is the error message. | |
49 Message string | |
50 // When is the new scheduled execution time. | |
51 When time.Time | |
52 } | 43 } |
53 | 44 |
54 // UnchangedError may be issued by Do of a Job to indicate | 45 // UnchangedError may be issued by Do of a Job to indicate |
55 // That the database has not changed. | 46 // That the database has not changed. |
56 UnchangedError string | 47 UnchangedError string |
102 | 93 |
103 idJob struct { | 94 idJob struct { |
104 id int64 | 95 id int64 |
105 kind JobKind | 96 kind JobKind |
106 user string | 97 user string |
98 waitRetry pgtype.Interval | |
107 trysLeft sql.NullInt64 | 99 trysLeft sql.NullInt64 |
108 sendEmail bool | 100 sendEmail bool |
109 data string | 101 data string |
110 } | 102 } |
111 ) | 103 ) |
148 insertJobSQL = ` | 140 insertJobSQL = ` |
149 INSERT INTO waterway.imports ( | 141 INSERT INTO waterway.imports ( |
150 kind, | 142 kind, |
151 due, | 143 due, |
152 trys_left, | 144 trys_left, |
145 retry_wait, | |
153 username, | 146 username, |
154 send_email, | 147 send_email, |
155 data | 148 data |
156 ) VALUES ( | 149 ) VALUES ( |
157 $1, | 150 $1, |
158 COALESCE($2, CURRENT_TIMESTAMP), | 151 COALESCE($2, CURRENT_TIMESTAMP), |
159 $3, | 152 $3, |
160 $4, | 153 $4, |
161 $5, | 154 $5, |
162 $6 | 155 $6, |
156 $7 | |
163 ) RETURNING id` | 157 ) RETURNING id` |
164 | 158 |
165 selectJobSQL = ` | 159 selectJobSQL = ` |
166 SELECT | 160 SELECT |
167 id, | 161 id, |
168 kind, | 162 kind, |
169 trys_left, | 163 trys_left, |
164 retry_wait, | |
170 username, | 165 username, |
171 send_email, | 166 send_email, |
172 data | 167 data |
173 FROM waterway.imports | 168 FROM waterway.imports |
174 WHERE | 169 WHERE |
204 | 199 |
205 func init() { | 200 func init() { |
206 go iqueue.importLoop() | 201 go iqueue.importLoop() |
207 } | 202 } |
208 | 203 |
209 // Error makes RetryError an error. | |
210 func (re *RetryError) Error() string { | |
211 return re.Message | |
212 } | |
213 | |
214 // Error makes UnchangedError an error. | 204 // Error makes UnchangedError an error. |
215 func (ue UnchangedError) Error() string { | 205 func (ue UnchangedError) Error() string { |
216 return string(ue) | 206 return string(ue) |
217 } | 207 } |
218 | 208 |
264 } | 254 } |
265 // XXX: Consider using sort.Strings to make output deterministic. | 255 // XXX: Consider using sort.Strings to make output deterministic. |
266 return names | 256 return names |
267 } | 257 } |
268 | 258 |
269 func (idj *idJob) trys() int { | 259 func (idj *idJob) nextRetry(feedback Feedback) bool { |
260 switch { | |
261 case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: | |
262 return false | |
263 case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: | |
264 return true | |
265 case idj.trysLeft.Valid: | |
266 if idj.trysLeft.Int64 < 1 { | |
267 feedback.Warn("import should be retried, but no retrys left") | |
268 } else { | |
269 idj.trysLeft.Int64-- | |
270 feedback.Info("import failed but will be retried") | |
271 return true | |
272 } | |
273 } | |
274 return false | |
275 } | |
276 | |
277 func (idj *idJob) nextDue() time.Time { | |
278 now := time.Now() | |
279 if idj.waitRetry.Status == pgtype.Present { | |
280 var d time.Duration | |
281 if err := idj.waitRetry.AssignTo(&d); err != nil { | |
282 log.Printf("error: converting waitRetry failed: %v\n", err) | |
283 } else { | |
284 now = now.Add(d) | |
285 } | |
286 } | |
287 return now | |
288 } | |
289 | |
290 func (idj *idJob) trysLeftPointer() *int { | |
270 if !idj.trysLeft.Valid { | 291 if !idj.trysLeft.Valid { |
271 return -1 | 292 return nil |
272 } | 293 } |
273 return int(idj.trysLeft.Int64) | 294 t := int(idj.trysLeft.Int64) |
295 return &t | |
296 } | |
297 | |
298 func (idj *idJob) waitRetryPointer() *time.Duration { | |
299 if idj.waitRetry.Status != pgtype.Present { | |
300 return nil | |
301 } | |
302 d := new(time.Duration) | |
303 if err := idj.waitRetry.AssignTo(d); err != nil { | |
304 log.Printf("error: converting waitRetry failed: %v\n", err) | |
305 return nil | |
306 } | |
307 return d | |
274 } | 308 } |
275 | 309 |
276 func (q *importQueue) jobCreator(kind JobKind) JobCreator { | 310 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
277 q.creatorsMu.Lock() | 311 q.creatorsMu.Lock() |
278 defer q.creatorsMu.Unlock() | 312 defer q.creatorsMu.Unlock() |
280 } | 314 } |
281 | 315 |
282 func (q *importQueue) addJob( | 316 func (q *importQueue) addJob( |
283 kind JobKind, | 317 kind JobKind, |
284 due time.Time, | 318 due time.Time, |
285 trysLeft int, | 319 trysLeft *int, |
320 waitRetry *time.Duration, | |
286 user string, | 321 user string, |
287 sendEmail bool, | 322 sendEmail bool, |
288 data string, | 323 data string, |
289 ) (int64, error) { | 324 ) (int64, error) { |
290 ctx := context.Background() | 325 |
291 var id int64 | 326 var id int64 |
292 if due.IsZero() { | 327 if due.IsZero() { |
293 due = time.Now() | 328 due = time.Now() |
294 } | 329 } |
330 | |
295 var tl sql.NullInt64 | 331 var tl sql.NullInt64 |
296 if trysLeft >= 0 { | 332 if trysLeft != nil { |
297 tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} | 333 tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} |
298 } | 334 } |
335 | |
336 var wr pgtype.Interval | |
337 if waitRetry != nil { | |
338 if err := wr.Set(*waitRetry); err != nil { | |
339 return 0, err | |
340 } | |
341 } else { | |
342 wr = pgtype.Interval{Status: pgtype.Null} | |
343 } | |
344 | |
345 ctx := context.Background() | |
299 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { | 346 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
300 return conn.QueryRowContext( | 347 return conn.QueryRowContext( |
301 ctx, | 348 ctx, |
302 insertJobSQL, | 349 insertJobSQL, |
303 string(kind), | 350 string(kind), |
304 due, | 351 due, |
305 tl, | 352 tl, |
353 &wr, | |
306 user, | 354 user, |
307 sendEmail, | 355 sendEmail, |
308 data).Scan(&id) | 356 data).Scan(&id) |
309 }) | 357 }) |
310 if err == nil { | 358 if err == nil { |
321 // This is gone in a separate Go routine | 369 // This is gone in a separate Go routine |
322 // so this will not block. | 370 // so this will not block. |
323 func AddJob( | 371 func AddJob( |
324 kind JobKind, | 372 kind JobKind, |
325 due time.Time, | 373 due time.Time, |
326 trysLeft int, | 374 trysLeft *int, |
375 waitRetry *time.Duration, | |
327 user string, | 376 user string, |
328 sendEmail bool, | 377 sendEmail bool, |
329 data string, | 378 data string, |
330 ) (int64, error) { | 379 ) (int64, error) { |
331 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data) | 380 return iqueue.addJob( |
381 kind, | |
382 due, | |
383 trysLeft, | |
384 waitRetry, | |
385 user, | |
386 sendEmail, | |
387 data) | |
332 } | 388 } |
333 | 389 |
334 type logFeedback int64 | 390 type logFeedback int64 |
335 | 391 |
336 func (lf logFeedback) log(kind, format string, args ...interface{}) { | 392 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
411 defer tx.Rollback() | 467 defer tx.Rollback() |
412 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( | 468 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
413 &ji.id, | 469 &ji.id, |
414 &ji.kind, | 470 &ji.kind, |
415 &ji.trysLeft, | 471 &ji.trysLeft, |
472 &ji.waitRetry, | |
416 &ji.user, | 473 &ji.user, |
417 &ji.sendEmail, | 474 &ji.sendEmail, |
418 &ji.data, | 475 &ji.data, |
419 ); err != nil { | 476 ); err != nil { |
420 return err | 477 return err |
553 summary, err = job.Do(ctx, idj.id, conn, feedback) | 610 summary, err = job.Do(ctx, idj.id, conn, feedback) |
554 return err | 611 return err |
555 }) | 612 }) |
556 })() | 613 })() |
557 | 614 |
558 var retry *RetryError | 615 var unchanged, retry bool |
559 var unchanged bool | 616 if v, ok := errDo.(UnchangedError); ok { |
560 | |
561 switch v := errDo.(type) { | |
562 case *RetryError: | |
563 // NULL -> limit less | |
564 if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 { | |
565 feedback.Warn("import should be retried, but no retrys left") | |
566 } else { | |
567 if idj.trysLeft.Valid { | |
568 idj.trysLeft.Int64-- | |
569 } | |
570 feedback.Info("import failed but will be retried") | |
571 retry = v | |
572 } | |
573 case UnchangedError: | |
574 feedback.Info("unchanged: %s", v.Error()) | 617 feedback.Info("unchanged: %s", v.Error()) |
575 unchanged = true | 618 unchanged = true |
576 default: | 619 } else if errDo != nil { |
577 if errDo != nil { | 620 feedback.Error("error in import: %v", errDo) |
578 feedback.Error("error in import: %v", errDo) | 621 retry = idj.nextRetry(feedback) |
579 } | |
580 } | 622 } |
581 | 623 |
582 var errCleanup error | 624 var errCleanup error |
583 if retry == nil { // cleanup debris | 625 if retry { // cleanup debris |
584 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { | 626 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
585 feedback.Error("error cleanup: %v", errCleanup) | 627 feedback.Error("error cleanup: %v", errCleanup) |
586 } | 628 } |
587 } | 629 } |
588 | 630 |
603 log.Printf("info: import #%d finished: %s\n", idj.id, state) | 645 log.Printf("info: import #%d finished: %s\n", idj.id, state) |
604 if idj.sendEmail { | 646 if idj.sendEmail { |
605 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) | 647 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
606 } | 648 } |
607 | 649 |
608 if retry != nil { | 650 if retry { |
609 nid, err := q.addJob( | 651 nid, err := q.addJob( |
610 idj.kind, | 652 idj.kind, |
611 retry.When, idj.trys(), | 653 idj.nextDue(), |
654 idj.trysLeftPointer(), | |
655 idj.waitRetryPointer(), | |
612 idj.user, idj.sendEmail, | 656 idj.user, idj.sendEmail, |
613 idj.data) | 657 idj.data) |
614 if err != nil { | 658 if err != nil { |
615 log.Printf("error: retry enqueue failed: %v\n", err) | 659 log.Printf("error: retry enqueue failed: %v\n", err) |
616 } else { | 660 } else { |