view pkg/imports/queue.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents 1222b777f51f
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"runtime/debug"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/pgxutils"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// UnchangedError may be issued by Do of a Job to indicate
	// That the database has not changed.
	UnchangedError string

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	// FeedbackJob is a job to create feedback.
	FeedbackJob interface {
		Job
		CreateFeedback(int64) Feedback
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		Create() Job
		// Depends returns two lists of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		// The first list are locked exclusively.
		// The second allows multiple read users but only one writing one.
		Depends() [2][]string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64, Feedback) error
		// AutoAccept indicates that imports of this kind
		// don't need a review.
		AutoAccept() bool
	}

	// JobRemover is a extented JobCreator to remove a job.
	JobRemover interface {
		JobCreator
		RemoveJob() bool
	}

	idJob struct {
		id        int64
		kind      JobKind
		user      string
		waitRetry pgtype.Interval
		triesLeft sql.NullInt64
		sendEmail bool
		data      string
	}
)

const (
	pollDuration = time.Second * 10
	runExclusive = -66666
)

const (
	// ReviewJobSuffix is the prefix of review jobs.
	ReviewJobSuffix  = "#review"
	reviewJobRetries = 200
	reviewJobWait    = 10 * time.Minute
)

const (
	hardMaxTries = 200
	minWaitRetry = 5 * time.Second
)

// ErrRetrying are used to signal a retry.
var ErrRetrying = errors.New("retrying")

type importQueue struct {
	cmdCh chan func(*importQueue)

	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]int

	waiting map[int64]chan struct{}
}

var iqueue = importQueue{
	cmdCh: make(chan func(*importQueue)),

	creators: map[JobKind]JobCreator{},
	usedDeps: map[string]int{},
	waiting:  make(map[int64]chan struct{}),
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"unchanged",
		"pending",
		"accepted",
		"declined",
		"reviewed",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE import.imports SET
  state = 'queued'::import_state,
  changed = CURRENT_TIMESTAMP
WHERE state = 'running'::import_state`

	insertJobSQL = `
INSERT INTO import.imports (
  kind,
  due,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
) VALUES (
  $1,
  COALESCE($2, CURRENT_TIMESTAMP),
  $3,
  $4,
  $5,
  $6,
  $7
) RETURNING id`

	// Select oldest queued job but prioritize review jobs
	selectJobSQL = `
SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
  id,
  kind,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
FROM import.imports
WHERE
  due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
  state = 'queued'::import_state AND
  kind = ANY($1)
ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
LIMIT 1`

	updateStateSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE import.imports SET
   state = $1::import_state,
   changed = CURRENT_TIMESTAMP,
   summary = $2
WHERE id = $3`

	deleteJobSQL = `
DELETE FROM import.imports WHERE id = $1`

	logMessageSQL = `
INSERT INTO import.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

// Error makes UnchangedError an error.
func (ue UnchangedError) Error() string {
	return string(ue)
}

type reviewedJobCreator struct {
	jobCreator JobCreator
}

func (*reviewedJobCreator) AutoAccept() bool {
	return true
}

func (*reviewedJobCreator) RemoveJob() bool {
	return true
}

func (rjc *reviewedJobCreator) Depends() [2][]string {
	return rjc.jobCreator.Depends()
}

func (rjc *reviewedJobCreator) Description() string {
	return rjc.jobCreator.Description() + ReviewJobSuffix
}

func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

type reviewedJob struct {
	ID       int64 `json:"id"`
	Accepted bool  `json:"accepted"`
}

func (*reviewedJobCreator) Create() Job {
	return new(reviewedJob)
}

func (*reviewedJob) CleanUp() error { return nil }

func (rj *reviewedJob) CreateFeedback(int64) Feedback {
	return logFeedback(rj.ID)
}

func (rj *reviewedJob) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	_ Feedback,
) (interface{}, error) {

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var signer string
	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
		return nil, err
	}

	var user, kind string
	if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
		return nil, err
	}

	jc := FindJobCreator(JobKind(kind))
	if jc == nil {
		return nil, fmt.Errorf("no job creator found for '%s'", kind)
	}

	importFeedback := logFeedback(rj.ID)

	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		userTx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer userTx.Rollback()

		if rj.Accepted {
			err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
		} else {
			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
		}
		if err == nil {
			err = userTx.Commit()
		}
		return err
	}); err != nil {
		return nil, err
	}

	// Remove the import track
	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
		return nil, err
	}

	var state string
	if rj.Accepted {
		state = "accepted"
	} else {
		state = "declined"
	}

	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
		return nil, err
	}

	importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)

	return nil, tx.Commit()
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
	q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}

}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// LogImportKindNames logs a list of importer types registered
// to the global import queue.
func LogImportKindNames() {
	kinds := ImportKindNames()
	sort.Strings(kinds)
	log.Infof("registered import kinds: %s",
		strings.Join(kinds, ", "))
}

// HasImportKindName checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (idj *idJob) nextRetry(feedback Feedback) bool {
	switch {
	case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
		return false
	case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
		return true
	case idj.triesLeft.Valid:
		if idj.triesLeft.Int64 < 1 {
			feedback.Warn("no retries left")
		} else {
			idj.triesLeft.Int64--
			feedback.Info("failed but will retry")
			return true
		}
	}
	return false
}

func (idj *idJob) nextDue() time.Time {
	now := time.Now()
	if idj.waitRetry.Status == pgtype.Present {
		var d time.Duration
		if err := idj.waitRetry.AssignTo(&d); err != nil {
			log.Errorf("converting waitRetry failed: %v\n", err)
		} else {
			now = now.Add(d)
		}
	}
	return now
}

func (idj *idJob) triesLeftPointer() *int {
	if !idj.triesLeft.Valid {
		return nil
	}
	t := int(idj.triesLeft.Int64)
	return &t
}

func (idj *idJob) waitRetryPointer() *time.Duration {
	if idj.waitRetry.Status != pgtype.Present {
		return nil
	}
	d := new(time.Duration)
	if err := idj.waitRetry.AssignTo(d); err != nil {
		log.Errorf("converting waitRetry failed: %v\n", err)
		return nil
	}
	return d
}

func (q *importQueue) lockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = runExclusive
	}
	for _, d := range deps[1] {
		q.usedDeps[d]++
	}
}

func (q *importQueue) unlockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = 0
	}
	for _, d := range deps[1] {
		q.usedDeps[d]--
	}
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
	sync bool,
) (int64, chan struct{}, error) {

	var id int64
	if due.IsZero() {
		due = time.Now()
	}
	due = due.UTC()

	var tl sql.NullInt64
	if triesLeft != nil {
		var many int64
		if *triesLeft > hardMaxTries || *triesLeft < 0 {
			many = hardMaxTries
		} else {
			many = int64(*triesLeft)
		}
		tl = sql.NullInt64{Int64: many, Valid: true}
	}

	var wr pgtype.Interval
	if waitRetry != nil {
		var howLong time.Duration
		if minWaitRetry > *waitRetry {
			howLong = minWaitRetry
		} else {
			howLong = *waitRetry
		}

		if err := wr.Set(howLong); err != nil {
			return 0, nil, err
		}
	} else {
		wr = pgtype.Interval{Status: pgtype.Null}
	}

	errCh := make(chan error)
	var done chan struct{}

	q.cmdCh <- func(q *importQueue) {
		ctx := context.Background()
		errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
			err := conn.QueryRowContext(
				ctx,
				insertJobSQL,
				string(kind),
				due,
				tl,
				&wr,
				user,
				sendEmail,
				data).Scan(&id)

			if err == nil && sync {
				log.Infof("register wait for %d\n", id)
				done = make(chan struct{})
				q.waiting[id] = done
			}

			return err
		})
	}

	return id, done, <-errCh
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible after due.
// This is gone in a separate Go routine
// so this will not block.
func AddJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {
	id, _, err := iqueue.addJob(
		kind,
		due,
		triesLeft,
		waitRetry,
		user,
		sendEmail,
		data,
		false)
	return id, err
}

const (
	isPendingSQL = `
SELECT
	state = 'pending'::import_state,
	kind
FROM import.imports
WHERE id = $1`

	selectUserSQL = `
SELECT username from import.imports WHERE ID = $1`

	selectUserKindSQL = `
SELECT username, kind from import.imports WHERE ID = $1`

	reviewSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP,
  signer = $2
WHERE id = $3`

	deleteImportDataSQL = `SELECT import.del_import($1)`

	deleteImportTrackSQL = `
DELETE FROM import.track_imports WHERE import_id = $1`
)

func (q *importQueue) decideImportTx(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	accepted bool,
	reviewer string,
) (chan struct{}, error) {
	var (
		pending bool
		kind    string
	)

	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
	case err == sql.ErrNoRows:
		return nil, fmt.Errorf("cannot find import #%d", id)
	case err != nil:
		return nil, err
	case !pending:
		return nil, fmt.Errorf("#%d is not pending", id)
	}

	jc := q.jobCreator(JobKind(kind))
	if jc == nil {
		return nil, fmt.Errorf("no job creator for kind '%s'", kind)
	}

	r := &reviewedJob{
		ID:       id,
		Accepted: accepted,
	}
	serialized, err := common.ToJSONString(r)
	if err != nil {
		return nil, err
	}

	// Try a little harder to persist the decision.
	tries := reviewJobRetries
	wait := reviewJobWait

	rID, done, err := q.addJob(
		JobKind(kind+ReviewJobSuffix),
		time.Now(),
		&tries,
		&wait,
		reviewer,
		false,
		serialized,
		true)
	if err != nil {
		return nil, err
	}
	log.Infof("add review job %d\n", rID)
	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
	if err != nil && done != nil {
		go func() {
			q.cmdCh <- func(q *importQueue) {
				delete(q.waiting, rID)
			}
		}()
		done = nil
	}
	return done, err
}

func (q *importQueue) decideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	if ctx == nil {
		ctx = context.Background()
	}

	var done chan struct{}

	if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
		if err == nil {
			err = tx.Commit()
		}
		return err
	}); err != nil {
		return err
	}

	_, retry := <-done
	if retry {
		return ErrRetrying
	}
	return nil
}

// DecideImport decides if a given import is accepted or not.
func DecideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	return iqueue.decideImport(ctx, id, accepted, reviewer)
}

func (q *importQueue) All(fn func(JobKind, JobCreator)) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for k, v := range q.creators {
		fn(k, v)
	}
}

// All reports all configured job creators and there kind
// to the given function.
func All(fn func(JobKind, JobCreator)) { iqueue.All(fn) }

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Errorf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		deps := jc.Depends()
		for _, d := range deps[0] {
			if q.usedDeps[d] != 0 {
				continue nextCreator
			}
		}
		for _, d := range deps[1] {
			if q.usedDeps[d] == runExclusive {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.triesLeft,
			&ji.waitRetry,
			&ji.user,
			&ji.sendEmail,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			if err = tx.Commit(); err != nil {
				return err
			}
		}
		// Clip repetition back to allowd values.
		if ji.waitRetry.Status == pgtype.Present {
			var d time.Duration
			ji.waitRetry.AssignTo(&d)
			if d < minWaitRetry {
				ji.waitRetry.Set(minWaitRetry)
			}
		}
		if ji.triesLeft.Valid {
			if ji.triesLeft.Int64 < 0 || ji.triesLeft.Int64 > hardMaxTries {
				ji.triesLeft.Int64 = hardMaxTries
			}
		}
		return nil
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
	// As it is important to keep the persistent model
	// in sync with the in-memory model try harder to store
	// the state.
	const maxTries = 10
	var sleep = time.Second

	for try := 1; ; try++ {
		var err error
		if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
			return err
		}
		log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n",
			try, maxTries, err, sleep)

		time.Sleep(sleep)
		if sleep < time.Minute {
			if sleep *= 2; sleep > time.Minute {
				sleep = time.Minute
			}
		}
	}
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}

	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func deleteJob(ctx context.Context, id int64) error {
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, deleteJobSQL, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Errorf("re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Errorf("error: db: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case cmd := <-q.cmdCh:
				cmd(q)

			case <-time.After(pollDuration):
			}
		}

		log.Infof("starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.lockDependencies(jc)

		go func(jc JobCreator, idj *idJob) {

			var retry bool

			defer func() {
				// Unlock the dependencies.
				q.unlockDependencies(jc)
				// Unlock waiting.
				q.cmdCh <- func(q *importQueue) {
					if w := q.waiting[idj.id]; w != nil {
						log.Infof("unlock waiting %d\n", idj.id)
						if retry {
							w <- struct{}{}
						} else {
							close(w)
						}
						delete(q.waiting, idj.id)
					}
				}
			}()

			job := jc.Create()
			if err := common.FromJSONString(idj.data, job); err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			var feedback Feedback
			if fc, ok := job.(FeedbackJob); ok {
				feedback = fc.CreateFeedback(idj.id)
			} else {
				feedback = logFeedback(idj.id)
			}

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()

			var unchanged bool
			if v, ok := errDo.(UnchangedError); ok {
				feedback.Info("unchanged: %s", v.Error())
				unchanged = true
			} else if errDo != nil {
				feedback.Error("error in import: %v",
					pgxutils.ReadableError{Err: errDo})
				retry = idj.nextRetry(feedback)
			}

			var errCleanup error
			if !retry { // cleanup debris
				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
					feedback.Error("error cleanup: %v", errCleanup)
				}
			}

			var remove bool
			if remover, ok := jc.(JobRemover); ok {
				remove = remover.RemoveJob()
			}

			var state string
			switch {
			case unchanged:
				state = "unchanged"
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case jc.AutoAccept():
				state = "accepted"
			default:
				state = "pending"
			}
			if !remove {
				if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
					log.Errorf("setting state of job %d failed: %v\n", idj.id, err)
				}
				log.Infof("info: import #%d finished: %s\n", idj.id, state)
			}
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}

			if retry {
				nid, _, err := q.addJob(
					idj.kind,
					idj.nextDue(),
					idj.triesLeftPointer(),
					idj.waitRetryPointer(),
					idj.user, idj.sendEmail,
					idj.data,
					false)
				if err != nil {
					log.Errorf("retry enqueue failed: %v\n", err)
				} else {
					log.Infof("re-enqueued job with id %d\n", nid)
				}
			}
			if remove {
				if err := deleteJob(ctx, idj.id); err != nil {
					log.Errorf("deleting job %d failed: %v\n", idj.id, err)
				}
			}
		}(jc, idj)
	}
}