diff pkg/imports/queue.go @ 1708:49e047c2106e

Imports: Made imports re-runnable if they fail.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 08 Jan 2019 13:35:44 +0100
parents 4a2fad8f57de
children 8ddbedf296d7
line wrap: on
line diff
--- a/pkg/imports/queue.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/imports/queue.go	Tue Jan 08 13:35:44 2019 +0100
@@ -42,6 +42,15 @@
 		Error(fmt string, args ...interface{})
 	}
 
+	// RetryError is an error type to signal that
+	// the import should be tried again.
+	RetryError struct {
+		// Message is the error message.
+		Message string
+		// When is the new scheduled execution time.
+		When time.Time
+	}
+
 	// Job is the central abstraction of an import job
 	// run by the import queue.
 	Job interface {
@@ -88,6 +97,7 @@
 		id         int64
 		kind       JobKind
 		user       string
+		trysLeft   sql.NullInt64
 		sendEmail  bool
 		autoAccept bool
 		data       string
@@ -131,33 +141,39 @@
 	insertJobSQL = `
 INSERT INTO waterway.imports (
   kind,
+  due,
+  trys_left,
   username,
   send_email,
   auto_accept,
   data
 ) VALUES (
   $1,
-  $2,
+  COALESCE($2, CURRENT_TIMESTAMP),
   $3,
   $4,
-  $5
+  $5,
+  $6,
+  $7
 ) RETURNING id`
 
 	selectJobSQL = `
 SELECT
   id,
   kind,
+  trys_left,
   username,
   send_email,
   auto_accept,
   data
 FROM waterway.imports
-WHERE state = 'queued'::waterway.import_state AND enqueued IN (
-  SELECT min(enqueued)
-  FROM waterway.imports
-  WHERE state = 'queued'::waterway.import_state AND
-  kind = ANY($1)
-)
+WHERE
+  due <= CURRENT_TIMESTAMP AND
+  state = 'queued'::waterway.import_state AND enqueued IN (
+    SELECT min(enqueued)
+    FROM waterway.imports
+    WHERE state = 'queued'::waterway.import_state AND
+    kind = ANY($1))
 LIMIT 1`
 
 	updateStateSQL = `
@@ -186,6 +202,11 @@
 	go iqueue.importLoop()
 }
 
+// Error makes RetryError an error.
+func (re *RetryError) Error() string {
+	return re.Message
+}
+
 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
@@ -236,6 +257,13 @@
 	return names
 }
 
+func (idj *idJob) trys() int {
+	if !idj.trysLeft.Valid {
+		return -1
+	}
+	return int(idj.trysLeft.Int64)
+}
+
 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
@@ -244,17 +272,28 @@
 
 func (q *importQueue) addJob(
 	kind JobKind,
+	due time.Time,
+	trysLeft int,
 	user string,
 	sendEmail, autoAccept bool,
 	data string,
 ) (int64, error) {
 	ctx := context.Background()
 	var id int64
+	if due.IsZero() {
+		due = time.Now()
+	}
+	var tl sql.NullInt64
+	if trysLeft >= 0 {
+		tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true}
+	}
 	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
 		return conn.QueryRowContext(
 			ctx,
 			insertJobSQL,
 			string(kind),
+			due,
+			tl,
 			user,
 			sendEmail,
 			autoAccept,
@@ -270,10 +309,18 @@
 }
 
 // AddJob adds a job to the global import queue to be executed
-// as soon as possible. This is gone in a separate Go routine
+// as soon as possible after due.
+// This is gone in a separate Go routine
 // so this will not block.
-func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) {
-	return iqueue.addJob(kind, user, sendEmail, autoAccept, data)
+func AddJob(
+	kind JobKind,
+	due time.Time,
+	trysLeft int,
+	user string,
+	sendEmail, autoAccept bool,
+	data string,
+) (int64, error) {
+	return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data)
 }
 
 type logFeedback int64
@@ -357,6 +404,7 @@
 		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
 			&ji.id,
 			&ji.kind,
+			&ji.trysLeft,
 			&ji.user,
 			&ji.sendEmail,
 			&ji.autoAccept,
@@ -502,9 +550,21 @@
 			if errDo != nil {
 				feedback.Error("error do: %v", errDo)
 			}
-			errCleanup := survive(job.CleanUp)()
-			if errCleanup != nil {
-				feedback.Error("error cleanup: %v", errCleanup)
+			// Should we try again?
+			retry, shouldRetry := errDo.(*RetryError)
+
+			if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less
+				if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 {
+					shouldRetry = false
+				}
+			}
+
+			var errCleanup error
+			if !shouldRetry { // cleanup debris
+				errCleanup = survive(job.CleanUp)()
+				if errCleanup != nil {
+					feedback.Error("error cleanup: %v", errCleanup)
+				}
 			}
 
 			var state string
@@ -524,6 +584,19 @@
 			if idj.sendEmail {
 				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
 			}
+
+			if shouldRetry {
+				nid, err := q.addJob(
+					idj.kind,
+					retry.When, idj.trys(),
+					idj.user, idj.sendEmail, idj.autoAccept,
+					idj.data)
+				if err != nil {
+					log.Printf("error: retry enqueue failed: %v\n", err)
+				} else {
+					log.Printf("info: re-enqueued job with id %d\n", nid)
+				}
+			}
 		}(jc, idj)
 	}
 }