view pkg/imports/queue.go @ 988:7dfd3db94e6d

In preparation of persisting import jobs logging is done through an interface.
author Sascha L. Teichmann <teichmann@intevation.de>
date Sat, 20 Oct 2018 19:14:00 +0200
parents 3841509f6e9e
children da19c68e36ba
line wrap: on
line source

package imports

import (
	"container/list"
	"context"
	"database/sql"
	"log"
	"sync"
	"sync/atomic"

	"gemma.intevation.de/gemma/pkg/auth"
)

type Feedback interface {
	Info(fmt string, args ...interface{})
	Warn(fmt string, args ...interface{})
	Error(fmt string, args ...interface{})
}

type Job interface {
	User() string
	Do(*sql.Conn, Feedback) error
	CleanUp() error
}

type idJob struct {
	id  int64
	job Job
}

var (
	queueCond = sync.NewCond(new(sync.Mutex))
	queue     = list.New()

	jobID int64
)

func init() {
	go importLoop()
}

func AddJob(job Job) int64 {
	id := atomic.AddInt64(&jobID, 1)
	queueCond.L.Lock()
	defer queueCond.L.Unlock()
	queue.PushBack(idJob{id, job})
	queueCond.Signal()
	return id
}

type logFeedback struct{}

func (logFeedback) Info(fmt string, args ...interface{}) {
	log.Printf("info: "+fmt, args...)
}

func (logFeedback) Warn(fmt string, args ...interface{}) {
	log.Printf("warn: "+fmt, args...)
}

func (logFeedback) Error(fmt string, args ...interface{}) {
	log.Printf("log: "+fmt, args...)
}

func importLoop() {
	for {
		var idj idJob
		queueCond.L.Lock()
		for queue.Len() == 0 {
			queueCond.Wait()
		}
		idj = queue.Remove(queue.Front()).(idJob)
		queueCond.L.Unlock()

		log.Printf("starting import job %d\n", idj.id)

		fn := func(conn *sql.Conn) error {
			return idj.job.Do(conn, logFeedback{})
		}

		if err := auth.RunAs(idj.job.User(), context.Background(), fn); err != nil {
			log.Printf("import error (job %d): %v\n", idj.id, err)
		}
		if err := idj.job.CleanUp(); err != nil {
			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
		}
	}
}