comparison pkg/imports/queue.go @ 5117:37784b70eea3 queued-stage-done

Enqueue review jobs in front of other 'queued' imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 25 Mar 2020 20:57:20 +0100
parents a4c8ed61e4df
children 5f62ac3db148
comparison
equal deleted inserted replaced
5116:a4c8ed61e4df 5117:37784b70eea3
158 UPDATE import.imports SET 158 UPDATE import.imports SET
159 state = 'queued'::import_state, 159 state = 'queued'::import_state,
160 changed = CURRENT_TIMESTAMP 160 changed = CURRENT_TIMESTAMP
161 WHERE state = 'running'::import_state` 161 WHERE state = 'running'::import_state`
162 162
163 insertFrontJobSQL = `
164 INSERT INTO import.imports (
165 enqueued,
166 kind,
167 due,
168 trys_left,
169 retry_wait,
170 username,
171 send_email,
172 data
173 ) VALUES (
174 (SELECT coalesce(min(enqueued), CURRENT_TIMESTAMP) - '1s'::interval
175 FROM import.imports
176 WHERE state = 'queued'::import_state),
177 $1,
178 COALESCE($2, CURRENT_TIMESTAMP),
179 $3,
180 $4,
181 $5,
182 $6,
183 $7
184 ) RETURNING id`
185
163 insertJobSQL = ` 186 insertJobSQL = `
164 INSERT INTO import.imports ( 187 INSERT INTO import.imports (
165 kind, 188 kind,
166 due, 189 due,
167 trys_left, 190 trys_left,
485 defer q.creatorsMu.Unlock() 508 defer q.creatorsMu.Unlock()
486 return q.creators[kind] 509 return q.creators[kind]
487 } 510 }
488 511
489 func (q *importQueue) addJob( 512 func (q *importQueue) addJob(
513 front bool,
490 kind JobKind, 514 kind JobKind,
491 due time.Time, 515 due time.Time,
492 triesLeft *int, 516 triesLeft *int,
493 waitRetry *time.Duration, 517 waitRetry *time.Duration,
494 user string, 518 user string,
516 wr = pgtype.Interval{Status: pgtype.Null} 540 wr = pgtype.Interval{Status: pgtype.Null}
517 } 541 }
518 542
519 ctx := context.Background() 543 ctx := context.Background()
520 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { 544 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
545 var stmt string
546 if front {
547 stmt = insertFrontJobSQL
548 } else {
549 stmt = insertJobSQL
550 }
521 return conn.QueryRowContext( 551 return conn.QueryRowContext(
522 ctx, 552 ctx,
523 insertJobSQL, 553 stmt,
524 string(kind), 554 string(kind),
525 due, 555 due,
526 tl, 556 tl,
527 &wr, 557 &wr,
528 user, 558 user,
550 user string, 580 user string,
551 sendEmail bool, 581 sendEmail bool,
552 data string, 582 data string,
553 ) (int64, error) { 583 ) (int64, error) {
554 return iqueue.addJob( 584 return iqueue.addJob(
585 false,
555 kind, 586 kind,
556 due, 587 due,
557 triesLeft, 588 triesLeft,
558 waitRetry, 589 waitRetry,
559 user, 590 user,
629 // Try a little harder to persist the decision. 660 // Try a little harder to persist the decision.
630 tries := reviewJobRetries 661 tries := reviewJobRetries
631 wait := reviewJobWait 662 wait := reviewJobWait
632 663
633 rID, err := q.addJob( 664 rID, err := q.addJob(
665 true,
634 JobKind(kind+ReviewJobSuffix), 666 JobKind(kind+ReviewJobSuffix),
635 time.Now(), 667 time.Now(),
636 &tries, 668 &tries,
637 &wait, 669 &wait,
638 reviewer, 670 reviewer,
1006 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 1038 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
1007 } 1039 }
1008 1040
1009 if retry { 1041 if retry {
1010 nid, err := q.addJob( 1042 nid, err := q.addJob(
1043 false,
1011 idj.kind, 1044 idj.kind,
1012 idj.nextDue(), 1045 idj.nextDue(),
1013 idj.triesLeftPointer(), 1046 idj.triesLeftPointer(),
1014 idj.waitRetryPointer(), 1047 idj.waitRetryPointer(),
1015 idj.user, idj.sendEmail, 1048 idj.user, idj.sendEmail,