view pkg/imports/queue.go @ 984:3c9ea8ab856a

Fixed small glitches in sounding result importer.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 19 Oct 2018 16:05:46 +0200
parents 7934b5c1a910
children 3841509f6e9e
line wrap: on
line source

package imports

import (
	"container/list"
	"context"
	"database/sql"
	"log"
	"sync"
	"sync/atomic"

	"gemma.intevation.de/gemma/pkg/auth"
)

type Job interface {
	User() string
	Do(conn *sql.Conn) error
	CleanUp() error
}

var (
	queueCond = sync.NewCond(new(sync.Mutex))
	queue     = list.New()

	jobID int64
)

func init() {
	go importLoop()
}

func AddJob(job Job) int64 {
	id := atomic.AddInt64(&jobID, 1)
	queueCond.L.Lock()
	defer queueCond.L.Unlock()
	queue.PushBack(job)
	queueCond.Signal()
	return id
}

func importLoop() {
	for {
		var job Job
		queueCond.L.Lock()
		for queue.Len() == 0 {
			queueCond.Wait()
		}
		job = queue.Remove(queue.Front()).(Job)
		queueCond.L.Unlock()

		if err := auth.RunAs(job.User(), context.Background(), job.Do); err != nil {
			log.Printf("import error: %v\n", err)
		}
		if err := job.CleanUp(); err != nil {
			log.Printf("cleanup error: %v\n", err)
		}
	}
}