changeset 5122:0b6b62d247e8 queued-stage-done

Prioritize review jobs on selection This reverts rev. 37784b70eea3 and instead moves review jobs forward in the queue when fetching the next job to be run. Also optimized index setup for filtering by state but not enqueued.
author Tom Gottfried <tom@intevation.de>
date Thu, 26 Mar 2020 14:41:23 +0100
parents 8bfc71eecde1
children eeb45e3e0a5a
files pkg/imports/queue.go schema/gemma.sql schema/updates/1434/01.add_import_state_idx.sql schema/version.sql
diffstat 4 files changed, 14 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Thu Mar 26 13:53:05 2020 +0100
+++ b/pkg/imports/queue.go	Thu Mar 26 14:41:23 2020 +0100
@@ -160,29 +160,6 @@
   changed = CURRENT_TIMESTAMP
 WHERE state = 'running'::import_state`
 
-	insertFrontJobSQL = `
-INSERT INTO import.imports (
-  enqueued,
-  kind,
-  due,
-  trys_left,
-  retry_wait,
-  username,
-  send_email,
-  data
-) VALUES (
-  (SELECT coalesce(min(enqueued), CURRENT_TIMESTAMP) - '1s'::interval
-     FROM import.imports
-     WHERE state = 'queued'::import_state),
-  $1,
-  COALESCE($2, CURRENT_TIMESTAMP),
-  $3,
-  $4,
-  $5,
-  $6,
-  $7
-) RETURNING id`
-
 	insertJobSQL = `
 INSERT INTO import.imports (
   kind,
@@ -202,8 +179,9 @@
   $7
 ) RETURNING id`
 
+	// Select oldest queued job but prioritize review jobs
 	selectJobSQL = `
-SELECT
+SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
   id,
   kind,
   trys_left,
@@ -214,11 +192,9 @@
 FROM import.imports
 WHERE
   due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
-  state = 'queued'::import_state AND enqueued IN (
-    SELECT min(enqueued)
-    FROM import.imports
-    WHERE state = 'queued'::import_state AND
-    kind = ANY($1))
+  state = 'queued'::import_state AND
+  kind = ANY($1)
+ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
 LIMIT 1`
 
 	updateStateSQL = `
@@ -510,7 +486,6 @@
 }
 
 func (q *importQueue) addJob(
-	front bool,
 	kind JobKind,
 	due time.Time,
 	triesLeft *int,
@@ -542,15 +517,9 @@
 
 	ctx := context.Background()
 	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		var stmt string
-		if front {
-			stmt = insertFrontJobSQL
-		} else {
-			stmt = insertJobSQL
-		}
 		return conn.QueryRowContext(
 			ctx,
-			stmt,
+			insertJobSQL,
 			string(kind),
 			due,
 			tl,
@@ -582,7 +551,6 @@
 	data string,
 ) (int64, error) {
 	return iqueue.addJob(
-		false,
 		kind,
 		due,
 		triesLeft,
@@ -659,7 +627,6 @@
 	wait := reviewJobWait
 
 	rID, err := q.addJob(
-		true,
 		JobKind(kind+ReviewJobSuffix),
 		time.Now(),
 		&tries,
@@ -1010,7 +977,6 @@
 
 			if retry {
 				nid, err := q.addJob(
-					false,
 					idj.kind,
 					idj.nextDue(),
 					idj.triesLeftPointer(),
--- a/schema/gemma.sql	Thu Mar 26 13:53:05 2020 +0100
+++ b/schema/gemma.sql	Thu Mar 26 14:41:23 2020 +0100
@@ -1267,8 +1267,10 @@
         data       TEXT,
         summary    TEXT
     )
-
-    CREATE INDEX enqueued_idx ON imports(enqueued, state)
+    -- Mainly for listing imports in clients:
+    CREATE INDEX enqueued_idx ON imports(enqueued)
+    -- For fast retrieval of queued imports by the import queue in backend:
+    CREATE INDEX state_idx ON imports(state)
 
     CREATE TABLE import_logs (
         import_id int NOT NULL REFERENCES imports(id)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1434/01.add_import_state_idx.sql	Thu Mar 26 14:41:23 2020 +0100
@@ -0,0 +1,3 @@
+DROP INDEX import.enqueued_idx;
+CREATE INDEX enqueued_idx ON import.imports(enqueued);
+CREATE INDEX state_idx ON import.imports(state);
--- a/schema/version.sql	Thu Mar 26 13:53:05 2020 +0100
+++ b/schema/version.sql	Thu Mar 26 14:41:23 2020 +0100
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1433);
+INSERT INTO gemma_schema_version(version) VALUES (1434);