view pkg/imports/queue.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 35966741e45e
children aaa9e658cabd
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"runtime/debug"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/pgxutils"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// UnchangedError may be issued by Do of a Job to indicate
	// That the database has not changed.
	UnchangedError string

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	FeedbackJob interface {
		Job
		CreateFeedback(int64) Feedback
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		Create() Job
		// Depends returns two lists of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		// The first list are locked exclusively.
		// The second allows multiple read users but only one writing one.
		Depends() [2][]string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64, Feedback) error
		// AutoAccept indicates that imports of this kind
		// don't need a review.
		AutoAccept() bool
	}

	JobRemover interface {
		JobCreator
		RemoveJob() bool
	}

	idJob struct {
		id        int64
		kind      JobKind
		user      string
		waitRetry pgtype.Interval
		triesLeft sql.NullInt64
		sendEmail bool
		data      string
	}
)

const (
	pollDuration = time.Second * 10
	runExclusive = -66666
)

const (
	ReviewJobSuffix  = "#review"
	reviewJobRetries = 200
	reviewJobWait    = 10 * time.Minute
)

const (
	hardMaxTries = 200
	minWaitRetry = 5 * time.Second
)

var ErrRetrying = errors.New("retrying")

type importQueue struct {
	cmdCh chan func(*importQueue)

	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]int

	waiting map[int64]chan struct{}
}

var iqueue = importQueue{
	cmdCh: make(chan func(*importQueue)),

	creators: map[JobKind]JobCreator{},
	usedDeps: map[string]int{},
	waiting:  make(map[int64]chan struct{}),
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"unchanged",
		"pending",
		"accepted",
		"declined",
		"reviewed",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE import.imports SET
  state = 'queued'::import_state,
  changed = CURRENT_TIMESTAMP
WHERE state = 'running'::import_state`

	insertJobSQL = `
INSERT INTO import.imports (
  kind,
  due,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
) VALUES (
  $1,
  COALESCE($2, CURRENT_TIMESTAMP),
  $3,
  $4,
  $5,
  $6,
  $7
) RETURNING id`

	// Select oldest queued job but prioritize review jobs
	selectJobSQL = `
SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
  id,
  kind,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
FROM import.imports
WHERE
  due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
  state = 'queued'::import_state AND
  kind = ANY($1)
ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
LIMIT 1`

	updateStateSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE import.imports SET
   state = $1::import_state,
   changed = CURRENT_TIMESTAMP,
   summary = $2
WHERE id = $3`

	deleteJobSQL = `
DELETE FROM import.imports WHERE id = $1`

	logMessageSQL = `
INSERT INTO import.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

// Error makes UnchangedError an error.
func (ue UnchangedError) Error() string {
	return string(ue)
}

type reviewedJobCreator struct {
	jobCreator JobCreator
}

func (*reviewedJobCreator) AutoAccept() bool {
	return true
}

func (*reviewedJobCreator) RemoveJob() bool {
	return true
}

func (rjc *reviewedJobCreator) Depends() [2][]string {
	return rjc.jobCreator.Depends()
}

func (rjc *reviewedJobCreator) Description() string {
	return rjc.jobCreator.Description() + ReviewJobSuffix
}

func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

type reviewedJob struct {
	ID       int64 `json:"id"`
	Accepted bool  `json:"accepted"`
}

func (*reviewedJobCreator) Create() Job {
	return new(reviewedJob)
}

func (*reviewedJob) CleanUp() error { return nil }

func (r *reviewedJob) CreateFeedback(int64) Feedback {
	return logFeedback(r.ID)
}

func (rj *reviewedJob) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var signer string
	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
		return nil, err
	}

	var user, kind string
	if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
		return nil, err
	}

	jc := FindJobCreator(JobKind(kind))
	if jc == nil {
		return nil, fmt.Errorf("no job creator found for '%s'", kind)
	}

	importFeedback := logFeedback(rj.ID)

	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		userTx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer userTx.Rollback()

		if rj.Accepted {
			err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
		} else {
			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
		}
		if err == nil {
			err = userTx.Commit()
		}
		return err
	}); err != nil {
		return nil, err
	}

	// Remove the import track
	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
		return nil, err
	}

	var state string
	if rj.Accepted {
		state = "accepted"
	} else {
		state = "declined"
	}

	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
		return nil, err
	}

	importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)

	return nil, tx.Commit()
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
	q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}

}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// LogImportKindNames logs a list of importer types registered
// to the global import queue.
func LogImportKindNames() {
	kinds := ImportKindNames()
	sort.Strings(kinds)
	log.Infof("registered import kinds: %s",
		strings.Join(kinds, ", "))
}

// HasImportKindName checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

//
func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (idj *idJob) nextRetry(feedback Feedback) bool {
	switch {
	case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
		return false
	case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
		return true
	case idj.triesLeft.Valid:
		if idj.triesLeft.Int64 < 1 {
			feedback.Warn("no retries left")
		} else {
			idj.triesLeft.Int64--
			feedback.Info("failed but will retry")
			return true
		}
	}
	return false
}

func (idj *idJob) nextDue() time.Time {
	now := time.Now()
	if idj.waitRetry.Status == pgtype.Present {
		var d time.Duration
		if err := idj.waitRetry.AssignTo(&d); err != nil {
			log.Errorf("converting waitRetry failed: %v\n", err)
		} else {
			now = now.Add(d)
		}
	}
	return now
}

func (idj *idJob) triesLeftPointer() *int {
	if !idj.triesLeft.Valid {
		return nil
	}
	t := int(idj.triesLeft.Int64)
	return &t
}

func (idj *idJob) waitRetryPointer() *time.Duration {
	if idj.waitRetry.Status != pgtype.Present {
		return nil
	}
	d := new(time.Duration)
	if err := idj.waitRetry.AssignTo(d); err != nil {
		log.Errorf("converting waitRetry failed: %v\n", err)
		return nil
	}
	return d
}

func (q *importQueue) lockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = runExclusive
	}
	for _, d := range deps[1] {
		q.usedDeps[d]++
	}
}

func (q *importQueue) unlockDependencies(jc JobCreator) {
	deps := jc.Depends()
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	for _, d := range deps[0] {
		q.usedDeps[d] = 0
	}
	for _, d := range deps[1] {
		q.usedDeps[d]--
	}
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
	sync bool,
) (int64, chan struct{}, error) {

	var id int64
	if due.IsZero() {
		due = time.Now()
	}
	due = due.UTC()

	var tl sql.NullInt64
	if triesLeft != nil {
		var many int64
		if *triesLeft > hardMaxTries || *triesLeft < 0 {
			many = hardMaxTries
		} else {
			many = int64(*triesLeft)
		}
		tl = sql.NullInt64{Int64: many, Valid: true}
	}

	var wr pgtype.Interval
	if waitRetry != nil {
		var howLong time.Duration
		if minWaitRetry > *waitRetry {
			howLong = minWaitRetry
		} else {
			howLong = *waitRetry
		}

		if err := wr.Set(howLong); err != nil {
			return 0, nil, err
		}
	} else {
		wr = pgtype.Interval{Status: pgtype.Null}
	}

	errCh := make(chan error)
	var done chan struct{}

	q.cmdCh <- func(q *importQueue) {
		ctx := context.Background()
		errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
			err := conn.QueryRowContext(
				ctx,
				insertJobSQL,
				string(kind),
				due,
				tl,
				&wr,
				user,
				sendEmail,
				data).Scan(&id)

			if err == nil && sync {
				log.Infof("register wait for %d\n", id)
				done = make(chan struct{})
				q.waiting[id] = done
			}

			return err
		})
	}

	return id, done, <-errCh
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible after due.
// This is gone in a separate Go routine
// so this will not block.
func AddJob(
	kind JobKind,
	due time.Time,
	triesLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {
	id, _, err := iqueue.addJob(
		kind,
		due,
		triesLeft,
		waitRetry,
		user,
		sendEmail,
		data,
		false)
	return id, err
}

const (
	isPendingSQL = `
SELECT
	state = 'pending'::import_state,
	kind
FROM import.imports
WHERE id = $1`

	selectUserSQL = `
SELECT username from import.imports WHERE ID = $1`

	selectUserKindSQL = `
SELECT username, kind from import.imports WHERE ID = $1`

	reviewSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  changed = CURRENT_TIMESTAMP,
  signer = $2
WHERE id = $3`

	deleteImportDataSQL = `SELECT import.del_import($1)`

	deleteImportTrackSQL = `
DELETE FROM import.track_imports WHERE import_id = $1`
)

func (q *importQueue) decideImportTx(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	accepted bool,
	reviewer string,
) (chan struct{}, error) {
	var (
		pending bool
		kind    string
	)

	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
	case err == sql.ErrNoRows:
		return nil, fmt.Errorf("cannot find import #%d", id)
	case err != nil:
		return nil, err
	case !pending:
		return nil, fmt.Errorf("#%d is not pending", id)
	}

	jc := q.jobCreator(JobKind(kind))
	if jc == nil {
		return nil, fmt.Errorf("no job creator for kind '%s'", kind)
	}

	r := &reviewedJob{
		ID:       id,
		Accepted: accepted,
	}
	serialized, err := common.ToJSONString(r)
	if err != nil {
		return nil, err
	}

	// Try a little harder to persist the decision.
	tries := reviewJobRetries
	wait := reviewJobWait

	rID, done, err := q.addJob(
		JobKind(kind+ReviewJobSuffix),
		time.Now(),
		&tries,
		&wait,
		reviewer,
		false,
		serialized,
		true)
	if err != nil {
		return nil, err
	}
	log.Infof("add review job %d\n", rID)
	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
	if err != nil && done != nil {
		go func() {
			q.cmdCh <- func(q *importQueue) {
				delete(q.waiting, rID)
			}
		}()
		done = nil
	}
	return done, err
}

func (q *importQueue) decideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	if ctx == nil {
		ctx = context.Background()
	}

	var done chan struct{}

	if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
		if err == nil {
			err = tx.Commit()
		}
		return err
	}); err != nil {
		return err
	}

	_, retry := <-done
	if retry {
		return ErrRetrying
	}
	return nil
}

func DecideImport(
	ctx context.Context,
	id int64,
	accepted bool,
	reviewer string,
) error {
	return iqueue.decideImport(ctx, id, accepted, reviewer)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Errorf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		deps := jc.Depends()
		for _, d := range deps[0] {
			if q.usedDeps[d] != 0 {
				continue nextCreator
			}
		}
		for _, d := range deps[1] {
			if q.usedDeps[d] == runExclusive {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.triesLeft,
			&ji.waitRetry,
			&ji.user,
			&ji.sendEmail,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			if err = tx.Commit(); err != nil {
				return err
			}
		}
		// Clip repetition back to allowd values.
		if ji.waitRetry.Status == pgtype.Present {
			var d time.Duration
			ji.waitRetry.AssignTo(&d)
			if d < minWaitRetry {
				ji.waitRetry.Set(minWaitRetry)
			}
		}
		if ji.triesLeft.Valid {
			if ji.triesLeft.Int64 < 0 || ji.triesLeft.Int64 > hardMaxTries {
				ji.triesLeft.Int64 = hardMaxTries
			}
		}
		return nil
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
	// As it is important to keep the persistent model
	// in sync with the in-memory model try harder to store
	// the state.
	const maxTries = 10
	var sleep = time.Second

	for try := 1; ; try++ {
		var err error
		if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
			return err
		}
		log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n",
			try, maxTries, err, sleep)

		time.Sleep(sleep)
		if sleep < time.Minute {
			if sleep *= 2; sleep > time.Minute {
				sleep = time.Minute
			}
		}
	}
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}

	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func deleteJob(ctx context.Context, id int64) error {
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, deleteJobSQL, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Errorf("re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Errorf("error: db: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case cmd := <-q.cmdCh:
				cmd(q)

			case <-time.After(pollDuration):
			}
		}

		log.Infof("starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.lockDependencies(jc)

		go func(jc JobCreator, idj *idJob) {

			var retry bool

			defer func() {
				// Unlock the dependencies.
				q.unlockDependencies(jc)
				// Unlock waiting.
				q.cmdCh <- func(q *importQueue) {
					if w := q.waiting[idj.id]; w != nil {
						log.Infof("unlock waiting %d\n", idj.id)
						if retry {
							w <- struct{}{}
						} else {
							close(w)
						}
						delete(q.waiting, idj.id)
					}
				}
			}()

			job := jc.Create()
			if err := common.FromJSONString(idj.data, job); err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			var feedback Feedback
			if fc, ok := job.(FeedbackJob); ok {
				feedback = fc.CreateFeedback(idj.id)
			} else {
				feedback = logFeedback(idj.id)
			}

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()

			var unchanged bool
			if v, ok := errDo.(UnchangedError); ok {
				feedback.Info("unchanged: %s", v.Error())
				unchanged = true
			} else if errDo != nil {
				feedback.Error("error in import: %v",
					pgxutils.ReadableError{Err: errDo})
				retry = idj.nextRetry(feedback)
			}

			var errCleanup error
			if !retry { // cleanup debris
				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
					feedback.Error("error cleanup: %v", errCleanup)
				}
			}

			var remove bool
			if remover, ok := jc.(JobRemover); ok {
				remove = remover.RemoveJob()
			}

			var state string
			switch {
			case unchanged:
				state = "unchanged"
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case jc.AutoAccept():
				state = "accepted"
			default:
				state = "pending"
			}
			if !remove {
				if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
					log.Errorf("setting state of job %d failed: %v\n", idj.id, err)
				}
				log.Infof("info: import #%d finished: %s\n", idj.id, state)
			}
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}

			if retry {
				nid, _, err := q.addJob(
					idj.kind,
					idj.nextDue(),
					idj.triesLeftPointer(),
					idj.waitRetryPointer(),
					idj.user, idj.sendEmail,
					idj.data,
					false)
				if err != nil {
					log.Errorf("retry enqueue failed: %v\n", err)
				} else {
					log.Infof("re-enqueued job with id %d\n", nid)
				}
			}
			if remove {
				if err := deleteJob(ctx, idj.id); err != nil {
					log.Errorf("deleting job %d failed: %v\n", idj.id, err)
				}
			}
		}(jc, idj)
	}
}