view pkg/imports/queue.go @ 991:a301d240905f

Decoupled import job creation and job execution with a factory function. This is needed for persistence purposes.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 10:45:17 +0200
parents da19c68e36ba
children a978b2b26a88
line wrap: on
line source

package imports

import (
	"container/list"
	"context"
	"database/sql"
	"log"
	"sync"
	"sync/atomic"

	"gemma.intevation.de/gemma/pkg/auth"
)

type (
	Feedback interface {
		Info(fmt string, args ...interface{})
		Warn(fmt string, args ...interface{})
		Error(fmt string, args ...interface{})
	}

	Job interface {
		Do(*sql.Conn, Feedback) error
		CleanUp() error
	}

	JobKind string

	JobCreator func(kind JobKind, data string) (Job, error)

	idJob struct {
		kind JobKind
		id   int64
		user string
		data string
	}
)

var (
	queueCond = sync.NewCond(new(sync.Mutex))
	queue     = list.New()

	jobID int64

	creatorsMu sync.Mutex
	creators   = map[JobKind]JobCreator{}
)

func init() {
	go importLoop()
}

func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	creators[kind] = jc
}

func jobCreator(kind JobKind) JobCreator {
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	return creators[kind]
}

func AddJob(kind JobKind, user, data string) int64 {
	id := atomic.AddInt64(&jobID, 1)
	queueCond.L.Lock()
	defer queueCond.L.Unlock()
	queue.PushBack(idJob{
		kind: kind,
		id:   id,
		user: user,
		data: data,
	})
	queueCond.Signal()
	return id
}

type logFeedback struct{}

func (logFeedback) Info(fmt string, args ...interface{}) {
	log.Printf("info: "+fmt, args...)
}

func (logFeedback) Warn(fmt string, args ...interface{}) {
	log.Printf("warn: "+fmt, args...)
}

func (logFeedback) Error(fmt string, args ...interface{}) {
	log.Printf("error: "+fmt, args...)
}

func importLoop() {
	for {
		var idj idJob
		queueCond.L.Lock()
		for queue.Len() == 0 {
			queueCond.Wait()
		}
		idj = queue.Remove(queue.Front()).(idJob)
		queueCond.L.Unlock()

		log.Printf("starting import job %d\n", idj.id)

		jc := jobCreator(idj.kind)
		if jc == nil {
			log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind)
			continue
		}

		job, err := jc(idj.kind, idj.data)
		if err != nil {
			log.Printf("Failed to create job: %v\n", err)
			continue
		}

		fn := func(conn *sql.Conn) error {
			return job.Do(conn, logFeedback{})
		}

		if err := auth.RunAs(idj.user, context.Background(), fn); err != nil {
			log.Printf("import error (job %d): %v\n", idj.id, err)
		}
		if err := job.CleanUp(); err != nil {
			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
		}
	}
}