Mercurial > gemma
changeset 5122:0b6b62d247e8 queued-stage-done
Prioritize review jobs on selection
This reverts rev. 37784b70eea3 and instead moves review jobs forward
in the queue when fetching the next job to be run. Also optimized
index setup for filtering by state but not enqueued.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 14:41:23 +0100 |
parents | 8bfc71eecde1 |
children | eeb45e3e0a5a |
files | pkg/imports/queue.go schema/gemma.sql schema/updates/1434/01.add_import_state_idx.sql schema/version.sql |
diffstat | 4 files changed, 14 insertions(+), 43 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Thu Mar 26 13:53:05 2020 +0100 +++ b/pkg/imports/queue.go Thu Mar 26 14:41:23 2020 +0100 @@ -160,29 +160,6 @@ changed = CURRENT_TIMESTAMP WHERE state = 'running'::import_state` - insertFrontJobSQL = ` -INSERT INTO import.imports ( - enqueued, - kind, - due, - trys_left, - retry_wait, - username, - send_email, - data -) VALUES ( - (SELECT coalesce(min(enqueued), CURRENT_TIMESTAMP) - '1s'::interval - FROM import.imports - WHERE state = 'queued'::import_state), - $1, - COALESCE($2, CURRENT_TIMESTAMP), - $3, - $4, - $5, - $6, - $7 -) RETURNING id` - insertJobSQL = ` INSERT INTO import.imports ( kind, @@ -202,8 +179,9 @@ $7 ) RETURNING id` + // Select oldest queued job but prioritize review jobs selectJobSQL = ` -SELECT +SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') id, kind, trys_left, @@ -214,11 +192,9 @@ FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND - state = 'queued'::import_state AND enqueued IN ( - SELECT min(enqueued) - FROM import.imports - WHERE state = 'queued'::import_state AND - kind = ANY($1)) + state = 'queued'::import_state AND + kind = ANY($1) +ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued LIMIT 1` updateStateSQL = ` @@ -510,7 +486,6 @@ } func (q *importQueue) addJob( - front bool, kind JobKind, due time.Time, triesLeft *int, @@ -542,15 +517,9 @@ ctx := context.Background() err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { - var stmt string - if front { - stmt = insertFrontJobSQL - } else { - stmt = insertJobSQL - } return conn.QueryRowContext( ctx, - stmt, + insertJobSQL, string(kind), due, tl, @@ -582,7 +551,6 @@ data string, ) (int64, error) { return iqueue.addJob( - false, kind, due, triesLeft, @@ -659,7 +627,6 @@ wait := reviewJobWait rID, err := q.addJob( - true, JobKind(kind+ReviewJobSuffix), time.Now(), &tries, @@ -1010,7 +977,6 @@ if retry { nid, err := q.addJob( - false, idj.kind, idj.nextDue(), idj.triesLeftPointer(),
--- a/schema/gemma.sql Thu Mar 26 13:53:05 2020 +0100 +++ b/schema/gemma.sql Thu Mar 26 14:41:23 2020 +0100 @@ -1267,8 +1267,10 @@ data TEXT, summary TEXT ) - - CREATE INDEX enqueued_idx ON imports(enqueued, state) + -- Mainly for listing imports in clients: + CREATE INDEX enqueued_idx ON imports(enqueued) + -- For fast retrieval of queued imports by the import queue in backend: + CREATE INDEX state_idx ON imports(state) CREATE TABLE import_logs ( import_id int NOT NULL REFERENCES imports(id)