view pkg/imports/queue.go @ 987:3841509f6e9e

Store job id alongside to job in job queue.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 19 Oct 2018 18:09:26 +0200
parents 7934b5c1a910
children 7dfd3db94e6d
line wrap: on
line source

package imports

import (
	"container/list"
	"context"
	"database/sql"
	"log"
	"sync"
	"sync/atomic"

	"gemma.intevation.de/gemma/pkg/auth"
)

type Job interface {
	User() string
	Do(conn *sql.Conn) error
	CleanUp() error
}

type idJob struct {
	id  int64
	job Job
}

var (
	queueCond = sync.NewCond(new(sync.Mutex))
	queue     = list.New()

	jobID int64
)

func init() {
	go importLoop()
}

func AddJob(job Job) int64 {
	id := atomic.AddInt64(&jobID, 1)
	queueCond.L.Lock()
	defer queueCond.L.Unlock()
	queue.PushBack(idJob{id, job})
	queueCond.Signal()
	return id
}

func importLoop() {
	for {
		var idj idJob
		queueCond.L.Lock()
		for queue.Len() == 0 {
			queueCond.Wait()
		}
		idj = queue.Remove(queue.Front()).(idJob)
		queueCond.L.Unlock()

		log.Printf("starting import job %d\n", idj.id)

		if err := auth.RunAs(idj.job.User(), context.Background(), idj.job.Do); err != nil {
			log.Printf("import error (job %d): %v\n", idj.id, err)
		}
		if err := idj.job.CleanUp(); err != nil {
			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
		}
	}
}