view pkg/imports/queue.go @ 998:75e65599ea52 persistent-import-queue

Persist job queue in database. WIP.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 16:54:34 +0200
parents a978b2b26a88
children 14425e35e3c2
line wrap: on
line source

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"runtime/debug"
	"sync"

	"gemma.intevation.de/gemma/pkg/auth"
)

type (
	Feedback interface {
		Info(fmt string, args ...interface{})
		Warn(fmt string, args ...interface{})
		Error(fmt string, args ...interface{})
	}

	Job interface {
		Do(*sql.Conn, Feedback) error
		CleanUp() error
	}

	JobKind string

	JobCreator func(kind JobKind, data string) (Job, error)

	idJob struct {
		id   int64
		kind JobKind
		user string
		data string
	}
)

var (
	queueCond = sync.NewCond(new(sync.Mutex))

	creatorsMu sync.Mutex
	creators   = map[JobKind]JobCreator{}
)

const (
	queueUser = "sys_admin"

	insertJobSQL = `
INSERT INTO waterway.imports (
  kind,
  username,
  data
) VALUES (
  $1,
  $2,
  $3
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  username,
  data
FROM waterway.imports 
WHERE state = 'queued'::waterway.import_state AND enqueued IN (
  SELECT min(enqueued)
  FROM waterway.imports 
  WHERE state = 'queued'::waterway.import_state
)
LIMIT 1
`
	updateStateSQL = `
UPDATE waterway.imports SET state = $1::waterway.import_state
WHERE id = $2
`
	logMessageSQL = `
INSERT INTO waterway.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::waterway.log_type,
  $3
)`
)

func init() {
	go importLoop()
}

func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	creators[kind] = jc
}

func jobCreator(kind JobKind) JobCreator {
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	return creators[kind]
}

func AddJob(kind JobKind, user, data string) (int64, error) {
	ctx := context.Background()
	queueCond.L.Lock()
	defer queueCond.L.Unlock()
	var id int64
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
	})
	if err == nil {
		queueCond.Signal()
	}
	return id, err
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() error {
		errCh := make(chan error)
		go func() {
			defer func() {
				if err := recover(); err != nil {
					errCh <- fmt.Errorf("%v: %s",
						err, string(debug.Stack()))
				}
			}()
			errCh <- fn()
		}()
		return <-errCh
	}
}

func fetchJob() (*idJob, error) {
	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func updateState(id int64, state string) error {
	ctx := context.Background()
	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	return err
}

func importLoop() {
	for {
		queueCond.L.Lock()

		var idj *idJob
		var err error

		for idj == nil {
			if idj, err = fetchJob(); err != nil {
				log.Printf("db error: %v\n", err)
				queueCond.Wait()
			} else if idj == nil {
				queueCond.Wait()
			}
		}

		queueCond.L.Unlock()

		log.Printf("starting import job %d\n", idj.id)

		jc := jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind)
			continue
		}

		job, err := jc(idj.kind, idj.data)
		if err != nil {
			errorAndFail(idj.id, "Faild to create job: %v", err)
			continue
		}

		feedback := logFeedback(idj.id)

		errDo := survive(func() error {
			return auth.RunAs(idj.user, context.Background(),
				func(conn *sql.Conn) error { return job.Do(conn, feedback) })
		})()
		if errDo != nil {
			feedback.Error("error do: %v\n", errDo)
		}
		errCleanup := survive(job.CleanUp)
		if errCleanup != nil {
			feedback.Error("error cleanup: %v\n", errCleanup)
		}

		if errDo != nil || errCleanup != nil {
			err = updateState(idj.id, "failed")
		} else {
			err = updateState(idj.id, "successful")
		}
		if err != nil {
			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
		}
	}
}