view pkg/imports/queue.go @ 1995:59055c8301df

Move import queue to its own database namespace Authorisation of the import queue has to be handled differently from the waterway-related data in the waterway schema. This is easier to handle, if both are in their own schema/namespace.
author Tom Gottfried <tom@intevation.de>
date Thu, 24 Jan 2019 12:56:31 +0100
parents 8eeb0b5eb340
children b868cb653c4d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/config"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// UnchangedError may be issued by Do of a Job to indicate
	// That the database has not changed.
	UnchangedError string

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		// kind is the name of the import type.
		// data is a free form string to pass arguments to the creation
		// process. This is useful to tell e.g. where to find data
		// in the file system to be used for importing.
		Create(kind JobKind, data string) (Job, error)
		// Depends returns a list of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		Depends() []string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64) error
		// AutoAccept indicates that imports of this kind
		// don't need a review.
		AutoAccept() bool
	}

	idJob struct {
		id        int64
		kind      JobKind
		user      string
		waitRetry pgtype.Interval
		trysLeft  sql.NullInt64
		sendEmail bool
		data      string
	}
)

const pollDuration = time.Second * 10

type importQueue struct {
	signalChan chan struct{}
	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]struct{}
}

var iqueue = importQueue{
	signalChan: make(chan struct{}),
	creators:   map[JobKind]JobCreator{},
	usedDeps:   map[string]struct{}{},
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"unchanged",
		"pending",
		"accepted",
		"declined",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE import.imports SET state = 'queued'::import_state
WHERE state = 'running'::import_state`

	insertJobSQL = `
INSERT INTO import.imports (
  kind,
  due,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
) VALUES (
  $1,
  COALESCE($2, CURRENT_TIMESTAMP),
  $3,
  $4,
  $5,
  $6,
  $7
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
FROM import.imports
WHERE
  due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
  state = 'queued'::import_state AND enqueued IN (
    SELECT min(enqueued)
    FROM import.imports
    WHERE state = 'queued'::import_state AND
    kind = ANY($1))
LIMIT 1`

	updateStateSQL = `
UPDATE import.imports SET state = $1::import_state
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE import.imports SET
   state = $1::import_state,
   summary = $2
WHERE id = $3`

	logMessageSQL = `
INSERT INTO import.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

// Error makes UnchangedError an error.
func (ue UnchangedError) Error() string {
	return string(ue)
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// HasImportKindName checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

//
func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (idj *idJob) nextRetry(feedback Feedback) bool {
	switch {
	case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
		return false
	case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
		return true
	case idj.trysLeft.Valid:
		if idj.trysLeft.Int64 < 1 {
			feedback.Warn("import should be retried, but no retrys left")
		} else {
			idj.trysLeft.Int64--
			feedback.Info("import failed but will be retried")
			return true
		}
	}
	return false
}

func (idj *idJob) nextDue() time.Time {
	now := time.Now()
	if idj.waitRetry.Status == pgtype.Present {
		var d time.Duration
		if err := idj.waitRetry.AssignTo(&d); err != nil {
			log.Printf("error: converting waitRetry failed: %v\n", err)
		} else {
			now = now.Add(d)
		}
	}
	return now
}

func (idj *idJob) trysLeftPointer() *int {
	if !idj.trysLeft.Valid {
		return nil
	}
	t := int(idj.trysLeft.Int64)
	return &t
}

func (idj *idJob) waitRetryPointer() *time.Duration {
	if idj.waitRetry.Status != pgtype.Present {
		return nil
	}
	d := new(time.Duration)
	if err := idj.waitRetry.AssignTo(d); err != nil {
		log.Printf("error: converting waitRetry failed: %v\n", err)
		return nil
	}
	return d
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	due time.Time,
	trysLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {

	var id int64
	if due.IsZero() {
		due = time.Now()
	}

	var tl sql.NullInt64
	if trysLeft != nil {
		tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
	}

	var wr pgtype.Interval
	if waitRetry != nil {
		if err := wr.Set(*waitRetry); err != nil {
			return 0, err
		}
	} else {
		wr = pgtype.Interval{Status: pgtype.Null}
	}

	ctx := context.Background()
	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		return conn.QueryRowContext(
			ctx,
			insertJobSQL,
			string(kind),
			due,
			tl,
			&wr,
			user,
			sendEmail,
			data).Scan(&id)
	})
	if err == nil {
		select {
		case q.signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible after due.
// This is gone in a separate Go routine
// so this will not block.
func AddJob(
	kind JobKind,
	due time.Time,
	trysLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {
	return iqueue.addJob(
		kind,
		due,
		trysLeft,
		waitRetry,
		user,
		sendEmail,
		data)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("error: logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		for _, d := range jc.Depends() {
			if _, found := q.usedDeps[d]; found {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.trysLeft,
			&ji.waitRetry,
			&ji.user,
			&ji.sendEmail,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	return err
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("error: re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Printf("error: db: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-q.signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("info: starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.creatorsMu.Lock()
		for _, d := range jc.Depends() {
			q.usedDeps[d] = struct{}{}
		}
		q.creatorsMu.Unlock()

		go func(jc JobCreator, idj *idJob) {

			// Unlock the dependencies.
			defer func() {
				q.creatorsMu.Lock()
				for _, d := range jc.Depends() {
					delete(q.usedDeps, d)
				}
				q.creatorsMu.Unlock()
				select {
				case q.signalChan <- struct{}{}:
				default:
				}
			}()

			job, err := jc.Create(idj.kind, idj.data)
			if err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			feedback := logFeedback(idj.id)

			feedback.Info("import #%d started", idj.id)

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()

			var unchanged, retry bool
			if v, ok := errDo.(UnchangedError); ok {
				feedback.Info("unchanged: %s", v.Error())
				unchanged = true
			} else if errDo != nil {
				feedback.Error("error in import: %v", errDo)
				retry = idj.nextRetry(feedback)
			}

			var errCleanup error
			if retry { // cleanup debris
				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
					feedback.Error("error cleanup: %v", errCleanup)
				}
			}

			var state string
			switch {
			case unchanged:
				state = "unchanged"
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case jc.AutoAccept():
				state = "accepted"
			default:
				state = "pending"
			}
			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
				log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
			}
			log.Printf("info: import #%d finished: %s\n", idj.id, state)
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}

			if retry {
				nid, err := q.addJob(
					idj.kind,
					idj.nextDue(),
					idj.trysLeftPointer(),
					idj.waitRetryPointer(),
					idj.user, idj.sendEmail,
					idj.data)
				if err != nil {
					log.Printf("error: retry enqueue failed: %v\n", err)
				} else {
					log.Printf("info: re-enqueued job with id %d\n", nid)
				}
			}
		}(jc, idj)
	}
}