view pkg/imports/queue.go @ 3163:d9903cb34842

Handle failing INSERTs gracefully during gauges import Using the special table EXCLUDED in INSERT statements makes functionally no difference, but makes editing of the statements easier. Since reference water levels are not deleted all at once before (re-)importing anymore, take the chance to report those that were deleted.
author Tom Gottfried <tom@intevation.de>
date Mon, 06 May 2019 13:25:49 +0200
parents bfea3f80ca9a
children 4acbee65275d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/config"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// UnchangedError may be issued by Do of a Job to indicate
	// That the database has not changed.
	UnchangedError string

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		Create() Job
		// Depends returns a list of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		Depends() []string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64) error
		// AutoAccept indicates that imports of this kind
		// don't need a review.
		AutoAccept() bool
	}

	idJob struct {
		id        int64
		kind      JobKind
		user      string
		waitRetry pgtype.Interval
		trysLeft  sql.NullInt64
		sendEmail bool
		data      string
	}
)

const pollDuration = time.Second * 10

type importQueue struct {
	signalChan chan struct{}
	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]struct{}
}

var iqueue = importQueue{
	signalChan: make(chan struct{}),
	creators:   map[JobKind]JobCreator{},
	usedDeps:   map[string]struct{}{},
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"unchanged",
		"pending",
		"accepted",
		"declined",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE import.imports SET state = 'queued'::import_state
WHERE state = 'running'::import_state`

	insertJobSQL = `
INSERT INTO import.imports (
  kind,
  due,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
) VALUES (
  $1,
  COALESCE($2, CURRENT_TIMESTAMP),
  $3,
  $4,
  $5,
  $6,
  $7
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  trys_left,
  retry_wait,
  username,
  send_email,
  data
FROM import.imports
WHERE
  due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
  state = 'queued'::import_state AND enqueued IN (
    SELECT min(enqueued)
    FROM import.imports
    WHERE state = 'queued'::import_state AND
    kind = ANY($1))
LIMIT 1`

	updateStateSQL = `
UPDATE import.imports SET state = $1::import_state
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE import.imports SET
   state = $1::import_state,
   summary = $2
WHERE id = $3`

	logMessageSQL = `
INSERT INTO import.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

// Error makes UnchangedError an error.
func (ue UnchangedError) Error() string {
	return string(ue)
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// HasImportKindName checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

//
func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (idj *idJob) nextRetry(feedback Feedback) bool {
	switch {
	case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
		return false
	case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
		return true
	case idj.trysLeft.Valid:
		if idj.trysLeft.Int64 < 1 {
			feedback.Warn("import should be retried, but no retrys left")
		} else {
			idj.trysLeft.Int64--
			feedback.Info("import failed but will be retried")
			return true
		}
	}
	return false
}

func (idj *idJob) nextDue() time.Time {
	now := time.Now()
	if idj.waitRetry.Status == pgtype.Present {
		var d time.Duration
		if err := idj.waitRetry.AssignTo(&d); err != nil {
			log.Printf("error: converting waitRetry failed: %v\n", err)
		} else {
			now = now.Add(d)
		}
	}
	return now
}

func (idj *idJob) trysLeftPointer() *int {
	if !idj.trysLeft.Valid {
		return nil
	}
	t := int(idj.trysLeft.Int64)
	return &t
}

func (idj *idJob) waitRetryPointer() *time.Duration {
	if idj.waitRetry.Status != pgtype.Present {
		return nil
	}
	d := new(time.Duration)
	if err := idj.waitRetry.AssignTo(d); err != nil {
		log.Printf("error: converting waitRetry failed: %v\n", err)
		return nil
	}
	return d
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	due time.Time,
	trysLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {

	var id int64
	if due.IsZero() {
		due = time.Now()
	}

	var tl sql.NullInt64
	if trysLeft != nil {
		tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
	}

	var wr pgtype.Interval
	if waitRetry != nil {
		if err := wr.Set(*waitRetry); err != nil {
			return 0, err
		}
	} else {
		wr = pgtype.Interval{Status: pgtype.Null}
	}

	ctx := context.Background()
	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
		return conn.QueryRowContext(
			ctx,
			insertJobSQL,
			string(kind),
			due,
			tl,
			&wr,
			user,
			sendEmail,
			data).Scan(&id)
	})
	if err == nil {
		select {
		case q.signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible after due.
// This is gone in a separate Go routine
// so this will not block.
func AddJob(
	kind JobKind,
	due time.Time,
	trysLeft *int,
	waitRetry *time.Duration,
	user string,
	sendEmail bool,
	data string,
) (int64, error) {
	return iqueue.addJob(
		kind,
		due,
		trysLeft,
		waitRetry,
		user,
		sendEmail,
		data)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("error: logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		for _, d := range jc.Depends() {
			if _, found := q.usedDeps[d]; found {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.trysLeft,
			&ji.waitRetry,
			&ji.user,
			&ji.sendEmail,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
	// As it is important to keep the persistent model
	// in sync with the in-memory model try harder to store
	// the state.
	const maxTries = 10
	var sleep = time.Second

	for try := 1; ; try++ {
		var err error
		if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
			return err
		}
		log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n",
			try, maxTries, err, sleep)

		time.Sleep(sleep)
		if sleep < time.Minute {
			if sleep *= 2; sleep > time.Minute {
				sleep = time.Minute
			}
		}
	}
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}

	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("error: re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Printf("error: db: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-q.signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("info: starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.creatorsMu.Lock()
		for _, d := range jc.Depends() {
			q.usedDeps[d] = struct{}{}
		}
		q.creatorsMu.Unlock()

		go func(jc JobCreator, idj *idJob) {

			// Unlock the dependencies.
			defer func() {
				q.creatorsMu.Lock()
				for _, d := range jc.Depends() {
					delete(q.usedDeps, d)
				}
				q.creatorsMu.Unlock()
				select {
				case q.signalChan <- struct{}{}:
				default:
				}
			}()

			job := jc.Create()
			if err := common.FromJSONString(idj.data, job); err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			feedback := logFeedback(idj.id)

			feedback.Info("import #%d started", idj.id)

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()

			var unchanged, retry bool
			if v, ok := errDo.(UnchangedError); ok {
				feedback.Info("unchanged: %s", v.Error())
				unchanged = true
			} else if errDo != nil {
				feedback.Error("error in import: %v", errDo)
				retry = idj.nextRetry(feedback)
			}

			var errCleanup error
			if !retry { // cleanup debris
				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
					feedback.Error("error cleanup: %v", errCleanup)
				}
			}

			var state string
			switch {
			case unchanged:
				state = "unchanged"
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case jc.AutoAccept():
				state = "accepted"
			default:
				state = "pending"
			}
			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
				log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
			}
			log.Printf("info: import #%d finished: %s\n", idj.id, state)
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}

			if retry {
				nid, err := q.addJob(
					idj.kind,
					idj.nextDue(),
					idj.trysLeftPointer(),
					idj.waitRetryPointer(),
					idj.user, idj.sendEmail,
					idj.data)
				if err != nil {
					log.Printf("error: retry enqueue failed: %v\n", err)
				} else {
					log.Printf("info: re-enqueued job with id %d\n", nid)
				}
			}
		}(jc, idj)
	}
}