Mercurial > gemma
comparison pkg/imports/queue.go @ 5117:37784b70eea3 queued-stage-done
Enqueue review jobs in front of other 'queued' imports.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 25 Mar 2020 20:57:20 +0100 |
parents | a4c8ed61e4df |
children | 5f62ac3db148 |
comparison
equal
deleted
inserted
replaced
5116:a4c8ed61e4df | 5117:37784b70eea3 |
---|---|
158 UPDATE import.imports SET | 158 UPDATE import.imports SET |
159 state = 'queued'::import_state, | 159 state = 'queued'::import_state, |
160 changed = CURRENT_TIMESTAMP | 160 changed = CURRENT_TIMESTAMP |
161 WHERE state = 'running'::import_state` | 161 WHERE state = 'running'::import_state` |
162 | 162 |
163 insertFrontJobSQL = ` | |
164 INSERT INTO import.imports ( | |
165 enqueued, | |
166 kind, | |
167 due, | |
168 trys_left, | |
169 retry_wait, | |
170 username, | |
171 send_email, | |
172 data | |
173 ) VALUES ( | |
174 (SELECT coalesce(min(enqueued), CURRENT_TIMESTAMP) - '1s'::interval | |
175 FROM import.imports | |
176 WHERE state = 'queued'::import_state), | |
177 $1, | |
178 COALESCE($2, CURRENT_TIMESTAMP), | |
179 $3, | |
180 $4, | |
181 $5, | |
182 $6, | |
183 $7 | |
184 ) RETURNING id` | |
185 | |
163 insertJobSQL = ` | 186 insertJobSQL = ` |
164 INSERT INTO import.imports ( | 187 INSERT INTO import.imports ( |
165 kind, | 188 kind, |
166 due, | 189 due, |
167 trys_left, | 190 trys_left, |
485 defer q.creatorsMu.Unlock() | 508 defer q.creatorsMu.Unlock() |
486 return q.creators[kind] | 509 return q.creators[kind] |
487 } | 510 } |
488 | 511 |
489 func (q *importQueue) addJob( | 512 func (q *importQueue) addJob( |
513 front bool, | |
490 kind JobKind, | 514 kind JobKind, |
491 due time.Time, | 515 due time.Time, |
492 triesLeft *int, | 516 triesLeft *int, |
493 waitRetry *time.Duration, | 517 waitRetry *time.Duration, |
494 user string, | 518 user string, |
516 wr = pgtype.Interval{Status: pgtype.Null} | 540 wr = pgtype.Interval{Status: pgtype.Null} |
517 } | 541 } |
518 | 542 |
519 ctx := context.Background() | 543 ctx := context.Background() |
520 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { | 544 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
545 var stmt string | |
546 if front { | |
547 stmt = insertFrontJobSQL | |
548 } else { | |
549 stmt = insertJobSQL | |
550 } | |
521 return conn.QueryRowContext( | 551 return conn.QueryRowContext( |
522 ctx, | 552 ctx, |
523 insertJobSQL, | 553 stmt, |
524 string(kind), | 554 string(kind), |
525 due, | 555 due, |
526 tl, | 556 tl, |
527 &wr, | 557 &wr, |
528 user, | 558 user, |
550 user string, | 580 user string, |
551 sendEmail bool, | 581 sendEmail bool, |
552 data string, | 582 data string, |
553 ) (int64, error) { | 583 ) (int64, error) { |
554 return iqueue.addJob( | 584 return iqueue.addJob( |
585 false, | |
555 kind, | 586 kind, |
556 due, | 587 due, |
557 triesLeft, | 588 triesLeft, |
558 waitRetry, | 589 waitRetry, |
559 user, | 590 user, |
629 // Try a little harder to persist the decision. | 660 // Try a little harder to persist the decision. |
630 tries := reviewJobRetries | 661 tries := reviewJobRetries |
631 wait := reviewJobWait | 662 wait := reviewJobWait |
632 | 663 |
633 rID, err := q.addJob( | 664 rID, err := q.addJob( |
665 true, | |
634 JobKind(kind+ReviewJobSuffix), | 666 JobKind(kind+ReviewJobSuffix), |
635 time.Now(), | 667 time.Now(), |
636 &tries, | 668 &tries, |
637 &wait, | 669 &wait, |
638 reviewer, | 670 reviewer, |
1006 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) | 1038 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
1007 } | 1039 } |
1008 | 1040 |
1009 if retry { | 1041 if retry { |
1010 nid, err := q.addJob( | 1042 nid, err := q.addJob( |
1043 false, | |
1011 idj.kind, | 1044 idj.kind, |
1012 idj.nextDue(), | 1045 idj.nextDue(), |
1013 idj.triesLeftPointer(), | 1046 idj.triesLeftPointer(), |
1014 idj.waitRetryPointer(), | 1047 idj.waitRetryPointer(), |
1015 idj.user, idj.sendEmail, | 1048 idj.user, idj.sendEmail, |