changeset 5117:37784b70eea3 queued-stage-done

Enqueue review jobs in front of other 'queued' imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 25 Mar 2020 20:57:20 +0100
parents a4c8ed61e4df
children 32c1cb94e24a
files pkg/imports/queue.go
diffstat 1 files changed, 34 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Wed Mar 25 18:34:54 2020 +0100
+++ b/pkg/imports/queue.go	Wed Mar 25 20:57:20 2020 +0100
@@ -160,6 +160,29 @@
   changed = CURRENT_TIMESTAMP
 WHERE state = 'running'::import_state`
 
+	insertFrontJobSQL = `
+INSERT INTO import.imports (
+  enqueued,
+  kind,
+  due,
+  trys_left,
+  retry_wait,
+  username,
+  send_email,
+  data
+) VALUES (
+  (SELECT coalesce(min(enqueued), CURRENT_TIMESTAMP) - '1s'::interval
+     FROM import.imports
+     WHERE state = 'queued'::import_state),
+  $1,
+  COALESCE($2, CURRENT_TIMESTAMP),
+  $3,
+  $4,
+  $5,
+  $6,
+  $7
+) RETURNING id`
+
 	insertJobSQL = `
 INSERT INTO import.imports (
   kind,
@@ -487,6 +510,7 @@
 }
 
 func (q *importQueue) addJob(
+	front bool,
 	kind JobKind,
 	due time.Time,
 	triesLeft *int,
@@ -518,9 +542,15 @@
 
 	ctx := context.Background()
 	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		var stmt string
+		if front {
+			stmt = insertFrontJobSQL
+		} else {
+			stmt = insertJobSQL
+		}
 		return conn.QueryRowContext(
 			ctx,
-			insertJobSQL,
+			stmt,
 			string(kind),
 			due,
 			tl,
@@ -552,6 +582,7 @@
 	data string,
 ) (int64, error) {
 	return iqueue.addJob(
+		false,
 		kind,
 		due,
 		triesLeft,
@@ -631,6 +662,7 @@
 	wait := reviewJobWait
 
 	rID, err := q.addJob(
+		true,
 		JobKind(kind+ReviewJobSuffix),
 		time.Now(),
 		&tries,
@@ -1008,6 +1040,7 @@
 
 			if retry {
 				nid, err := q.addJob(
+					false,
 					idj.kind,
 					idj.nextDue(),
 					idj.triesLeftPointer(),