view pkg/imports/queue.go @ 5122:0b6b62d247e8 queued-stage-done

Prioritize review jobs on selection This reverts rev. 37784b70eea3 and instead moves review jobs forward in the queue when fetching the next job to be run. Also optimized index setup for filtering by state but not enqueued.
author Tom Gottfried <tom@intevation.de>
date Thu, 26 Mar 2020 14:41:23 +0100
parents 8bfc71eecde1
children eeb45e3e0a5a
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"runtime/debug"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/pgxutils"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// UnchangedError may be issued by Do of a Job to indicate
	// That the database has not changed.
	UnchangedError string

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	FeedbackJob interface {
		Job
		CreateFeedback(int64) Feedback
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		Create() Job
		// Depends returns two lists of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		// The first list are locked exclusively.
		// The second allows multiple read users but only one writing one.
		Depends() [2][]string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64, Feedback) error
		// AutoAccept indicates that imports of this kind
		// don't need a review.
		AutoAccept() bool
	}

	JobRemover interface {
		JobCreator
		RemoveJob() bool
	}

	idJob struct {
		id        int64
		kind      JobKind
		user      string
		waitRetry pgtype.Interval
		triesLeft sql.NullInt64
		sendEmail bool
		data      string
	}
)

const (
	pollDuration = time.Second * 10
	runExclusive = -66666
)

const (
	ReviewJobSuffix  = "#review"
	reviewJobRetries = 10
	reviewJobWait    = time.Minute
)

type importQueue struct {
	signalChan chan struct{}
	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]int
}

var iqueue = importQueue{
	signalChan: make(chan struct{}),
	creators:   map[JobKind]JobCreator{},
	usedDeps:   map[string]int{},
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"unchanged",
		"pending",
		"accepted",
		"declined",
		"reviewed",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE import.imports SET
  state = 'queued'::import_state,
  changed = CURRENT_TIMESTAMP
WHERE state = 'running'::import_state`

	insertJobSQL = `
INSERT INTO import.imports (
  kind,
  due,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
) VALUES (
  $1,
  COALESCE($2, CURRENT_TIMESTAMP),
  $3,
  $4,
  $5,
  $6,
  $7
) RETURNING id`

	// Select oldest queued job but prioritize review jobs
	selectJobSQL = `
SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
  id,
  kind,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
FROM import.imports
WHERE
  due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
  state = 'queued'::import_state AND
  kind = ANY($1)
ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
LIMIT 1`

	updateStateSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE import.imports SET
   state = $1::import_state,
   changed = CURRENT_TIMESTAMP,
   summary = $2
WHERE id = $3`

	deleteJobSQL = `
DELETE FROM import.imports WHERE id = $1`

	logMessageSQL = `
INSERT INTO import.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

// Error makes UnchangedError an error.
func (ue UnchangedError) Error() string {
	return string(ue)
}

type reviewedJobCreator struct {
	jobCreator JobCreator
}

func (*reviewedJobCreator) AutoAccept() bool {
	return true
}

func (*reviewedJobCreator) RemoveJob() bool {
	return true
}

func (rjc *reviewedJobCreator) Depends() [2][]string {
	return rjc.jobCreator.Depends()
}

func (rjc *reviewedJobCreator) Description() string {
	return rjc.jobCreator.Description() + ReviewJobSuffix
}

func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

type reviewedJob struct {
	ID       int64 `json:"id"`
	Accepted bool  `json:"accepted"`
}

func (*reviewedJobCreator) Create() Job {
	return new(reviewedJob)
}

func (*reviewedJob) CleanUp() error { return nil }

func (r *reviewedJob) CreateFeedback(int64) Feedback {
	return logFeedback(r.ID)
}

func (rj *reviewedJob) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var signer string
	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
		return nil, err
	}

	var user, kind string
	if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
		return nil, err
	}

	jc := FindJobCreator(JobKind(kind))
	if jc == nil {
		return nil, fmt.Errorf("no job creator found for '%s'", kind)
	}

	importFeedback := logFeedback(rj.ID)

	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		userTx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer userTx.Rollback()

		if rj.Accepted {
			err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
		} else {
			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
		}
		if err == nil {
			err = userTx.Commit()
		}
		return err
	}); err != nil {
		return nil, err
	}

	// Remove the import track
	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
		return nil, err
	}

	var state string
	if rj.Accepted {
		state = "accepted"
	} else {
		state = "declined"
	}

	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
		return nil, err
	}

	importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)

	return nil, tx.Commit()
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
	q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}

}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// LogImportKindNames logs a list of importer types registered
// to the global import queue.
func LogImportKindNames() {
	kinds := ImportKindNames()
	sort.Strings(kinds)
	log.Printf("info: registered import kinds: %s",
		strings.Join(kinds, ", "))
}

// HasImportKindName checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

//
func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (idj *idJob) nextRetry(feedback Feedback) bool {
	switch {
	case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
		return false
	case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
		return true
	case idj.triesLeft.Valid:
		if idj.triesLeft.Int64 < 1 {
			feedback.Warn("no retries left")
		} else {
			idj.triesLeft.Int64--
			feedback.Info("failed but will retry")
			return true
		}
	}
	return false
}

func (idj *idJob) nextDue() time.Time {
	now := time.Now()
	if idj.waitRetry.Status == pgtype.Present {
		var d time.Duration
		if err := idj.waitRetry.AssignTo(&d); err != nil {
			log.Printf("error: converting waitRetry failed: %v\n", err)
		} else {
			now = now.Add(d)
		}
	}
	return now
}

func (idj *idJob) triesLeftPointer() *int {
	if !idj.triesLeft.Valid {
		return nil
	}
	t := int(idj.triesLeft.Int64)
	return &t
}

func (idj *idJob) waitRetryPointer() *time.Duration {
	if idj.waitRetry.Status != pgtype.Present {
		return nil
	}
	d := new(time.Duration)
	if err := idj.waitRetry.AssignTo(d); err != nil {
		log.Printf("error: converting waitRetry failed: %v\n", err)
		return nil
	}
	return d
}

func (q *importQueue) lockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = runExclusive
	}
	for _, d := range deps[1] {
		q.usedDeps[d]++
	}
}

func (q *importQueue) unlockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = 0
	}
	for _, d := range deps[1] {
		q.usedDeps[d]--
	}
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {

	var id int64
	if due.IsZero() {
		due = time.Now()
	}
	due = due.UTC()

	var tl sql.NullInt64
	if triesLeft != nil {
		tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true}
	}

	var wr pgtype.Interval
	if waitRetry != nil {
		if err := wr.Set(*waitRetry); err != nil {
			return 0, err
		}
	} else {
		wr = pgtype.Interval{Status: pgtype.Null}
	}

	ctx := context.Background()
	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		return conn.QueryRowContext(
			ctx,
			insertJobSQL,
			string(kind),
			due,
			tl,
			&wr,
			user,
			sendEmail,
			data).Scan(&id)
	})
	if err == nil {
		select {
		case q.signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible after due.
// This is gone in a separate Go routine
// so this will not block.
func AddJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {
	return iqueue.addJob(
		kind,
		due,
		triesLeft,
		waitRetry,
		user,
		sendEmail,
		data)
}

const (
	isPendingSQL = `
SELECT
	state = 'pending'::import_state,
	kind
FROM import.imports
WHERE id = $1`

	selectUserSQL = `
SELECT username from import.imports WHERE ID = $1`

	selectUserKindSQL = `
SELECT username, kind from import.imports WHERE ID = $1`

	reviewSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP,
  signer = $2
WHERE id = $3`

	deleteImportDataSQL = `SELECT import.del_import($1)`

	deleteImportTrackSQL = `
DELETE FROM import.track_imports WHERE import_id = $1`
)

func (q *importQueue) decideImportTx(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	accepted bool,
	reviewer string,
) error {
	var (
		pending bool
		kind    string
	)

	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
	case err == sql.ErrNoRows:
		return fmt.Errorf("cannot find import #%d", id)
	case err != nil:
		return err
	case !pending:
		return fmt.Errorf("#%d is not pending", id)
	}

	jc := q.jobCreator(JobKind(kind))
	if jc == nil {
		return fmt.Errorf("no job creator for kind '%s'", kind)
	}

	r := &reviewedJob{
		ID:       id,
		Accepted: accepted,
	}
	serialized, err := common.ToJSONString(r)
	if err != nil {
		return err
	}

	// Try a little harder to persist the decision.
	tries := reviewJobRetries
	wait := reviewJobWait

	rID, err := q.addJob(
		JobKind(kind+ReviewJobSuffix),
		time.Now(),
		&tries,
		&wait,
		reviewer,
		false,
		serialized)
	log.Printf("info: add review job %d\n", rID)
	if err != nil {
		return err
	}
	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
	return err
}

func (q *importQueue) decideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	if ctx == nil {
		ctx = context.Background()
	}

	return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
}

func DecideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	return iqueue.decideImport(ctx, id, accepted, reviewer)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("error: logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		deps := jc.Depends()
		for _, d := range deps[0] {
			if q.usedDeps[d] != 0 {
				continue nextCreator
			}
		}
		for _, d := range deps[1] {
			if q.usedDeps[d] == runExclusive {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.triesLeft,
			&ji.waitRetry,
			&ji.user,
			&ji.sendEmail,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
	// As it is important to keep the persistent model
	// in sync with the in-memory model try harder to store
	// the state.
	const maxTries = 10
	var sleep = time.Second

	for try := 1; ; try++ {
		var err error
		if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
			return err
		}
		log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n",
			try, maxTries, err, sleep)

		time.Sleep(sleep)
		if sleep < time.Minute {
			if sleep *= 2; sleep > time.Minute {
				sleep = time.Minute
			}
		}
	}
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}

	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func deleteJob(ctx context.Context, id int64) error {
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, deleteJobSQL, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("error: re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Printf("error: db: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-q.signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("info: starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.lockDependencies(jc)

		go func(jc JobCreator, idj *idJob) {

			// Unlock the dependencies.
			defer func() {
				q.unlockDependencies(jc)
				select {
				case q.signalChan <- struct{}{}:
				default:
				}
			}()

			job := jc.Create()
			if err := common.FromJSONString(idj.data, job); err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			var feedback Feedback
			if fc, ok := job.(FeedbackJob); ok {
				feedback = fc.CreateFeedback(idj.id)
			} else {
				feedback = logFeedback(idj.id)
			}

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()

			var unchanged, retry bool
			if v, ok := errDo.(UnchangedError); ok {
				feedback.Info("unchanged: %s", v.Error())
				unchanged = true
			} else if errDo != nil {
				feedback.Error("error in import: %v",
					pgxutils.ReadableError{Err: errDo})
				retry = idj.nextRetry(feedback)
			}

			var errCleanup error
			if !retry { // cleanup debris
				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
					feedback.Error("error cleanup: %v", errCleanup)
				}
			}

			var remove bool
			if remover, ok := jc.(JobRemover); ok {
				remove = remover.RemoveJob()
			}

			var state string
			switch {
			case unchanged:
				state = "unchanged"
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case jc.AutoAccept():
				state = "accepted"
			default:
				state = "pending"
			}
			if !remove {
				if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
					log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
				}
				log.Printf("info: import #%d finished: %s\n", idj.id, state)
			}
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}

			if retry {
				nid, err := q.addJob(
					idj.kind,
					idj.nextDue(),
					idj.triesLeftPointer(),
					idj.waitRetryPointer(),
					idj.user, idj.sendEmail,
					idj.data)
				if err != nil {
					log.Printf("error: retry enqueue failed: %v\n", err)
				} else {
					log.Printf("info: re-enqueued job with id %d\n", nid)
				}
			}
			if remove {
				if err := deleteJob(ctx, idj.id); err != nil {
					log.Printf("error: deleting job %d failed: %v\n", idj.id, err)
				}
			}
		}(jc, idj)
	}
}