Mercurial > gemma
diff pkg/imports/queue.go @ 1708:49e047c2106e
Imports: Made imports re-runnable if they fail.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 08 Jan 2019 13:35:44 +0100 |
parents | 4a2fad8f57de |
children | 8ddbedf296d7 |
line wrap: on
line diff
--- a/pkg/imports/queue.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/imports/queue.go Tue Jan 08 13:35:44 2019 +0100 @@ -42,6 +42,15 @@ Error(fmt string, args ...interface{}) } + // RetryError is an error type to signal that + // the import should be tried again. + RetryError struct { + // Message is the error message. + Message string + // When is the new scheduled execution time. + When time.Time + } + // Job is the central abstraction of an import job // run by the import queue. Job interface { @@ -88,6 +97,7 @@ id int64 kind JobKind user string + trysLeft sql.NullInt64 sendEmail bool autoAccept bool data string @@ -131,33 +141,39 @@ insertJobSQL = ` INSERT INTO waterway.imports ( kind, + due, + trys_left, username, send_email, auto_accept, data ) VALUES ( $1, - $2, + COALESCE($2, CURRENT_TIMESTAMP), $3, $4, - $5 + $5, + $6, + $7 ) RETURNING id` selectJobSQL = ` SELECT id, kind, + trys_left, username, send_email, auto_accept, data FROM waterway.imports -WHERE state = 'queued'::waterway.import_state AND enqueued IN ( - SELECT min(enqueued) - FROM waterway.imports - WHERE state = 'queued'::waterway.import_state AND - kind = ANY($1) -) +WHERE + due <= CURRENT_TIMESTAMP AND + state = 'queued'::waterway.import_state AND enqueued IN ( + SELECT min(enqueued) + FROM waterway.imports + WHERE state = 'queued'::waterway.import_state AND + kind = ANY($1)) LIMIT 1` updateStateSQL = ` @@ -186,6 +202,11 @@ go iqueue.importLoop() } +// Error makes RetryError an error. +func (re *RetryError) Error() string { + return re.Message +} + func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() @@ -236,6 +257,13 @@ return names } +func (idj *idJob) trys() int { + if !idj.trysLeft.Valid { + return -1 + } + return int(idj.trysLeft.Int64) +} + func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() @@ -244,17 +272,28 @@ func (q *importQueue) addJob( kind JobKind, + due time.Time, + trysLeft int, user string, sendEmail, autoAccept bool, data string, ) (int64, error) { ctx := context.Background() var id int64 + if due.IsZero() { + due = time.Now() + } + var tl sql.NullInt64 + if trysLeft >= 0 { + tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} + } err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, insertJobSQL, string(kind), + due, + tl, user, sendEmail, autoAccept, @@ -270,10 +309,18 @@ } // AddJob adds a job to the global import queue to be executed -// as soon as possible. This is gone in a separate Go routine +// as soon as possible after due. +// This is gone in a separate Go routine // so this will not block. -func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) { - return iqueue.addJob(kind, user, sendEmail, autoAccept, data) +func AddJob( + kind JobKind, + due time.Time, + trysLeft int, + user string, + sendEmail, autoAccept bool, + data string, +) (int64, error) { + return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data) } type logFeedback int64 @@ -357,6 +404,7 @@ if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, + &ji.trysLeft, &ji.user, &ji.sendEmail, &ji.autoAccept, @@ -502,9 +550,21 @@ if errDo != nil { feedback.Error("error do: %v", errDo) } - errCleanup := survive(job.CleanUp)() - if errCleanup != nil { - feedback.Error("error cleanup: %v", errCleanup) + // Should we try again? + retry, shouldRetry := errDo.(*RetryError) + + if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less + if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 { + shouldRetry = false + } + } + + var errCleanup error + if !shouldRetry { // cleanup debris + errCleanup = survive(job.CleanUp)() + if errCleanup != nil { + feedback.Error("error cleanup: %v", errCleanup) + } } var state string @@ -524,6 +584,19 @@ if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } + + if shouldRetry { + nid, err := q.addJob( + idj.kind, + retry.When, idj.trys(), + idj.user, idj.sendEmail, idj.autoAccept, + idj.data) + if err != nil { + log.Printf("error: retry enqueue failed: %v\n", err) + } else { + log.Printf("info: re-enqueued job with id %d\n", nid) + } + } }(jc, idj) } }