Mercurial > gemma
view pkg/imports/queue.go @ 988:7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sat, 20 Oct 2018 19:14:00 +0200 |
parents | 3841509f6e9e |
children | da19c68e36ba |
line wrap: on
line source
package imports import ( "container/list" "context" "database/sql" "log" "sync" "sync/atomic" "gemma.intevation.de/gemma/pkg/auth" ) type Feedback interface { Info(fmt string, args ...interface{}) Warn(fmt string, args ...interface{}) Error(fmt string, args ...interface{}) } type Job interface { User() string Do(*sql.Conn, Feedback) error CleanUp() error } type idJob struct { id int64 job Job } var ( queueCond = sync.NewCond(new(sync.Mutex)) queue = list.New() jobID int64 ) func init() { go importLoop() } func AddJob(job Job) int64 { id := atomic.AddInt64(&jobID, 1) queueCond.L.Lock() defer queueCond.L.Unlock() queue.PushBack(idJob{id, job}) queueCond.Signal() return id } type logFeedback struct{} func (logFeedback) Info(fmt string, args ...interface{}) { log.Printf("info: "+fmt, args...) } func (logFeedback) Warn(fmt string, args ...interface{}) { log.Printf("warn: "+fmt, args...) } func (logFeedback) Error(fmt string, args ...interface{}) { log.Printf("log: "+fmt, args...) } func importLoop() { for { var idj idJob queueCond.L.Lock() for queue.Len() == 0 { queueCond.Wait() } idj = queue.Remove(queue.Front()).(idJob) queueCond.L.Unlock() log.Printf("starting import job %d\n", idj.id) fn := func(conn *sql.Conn) error { return idj.job.Do(conn, logFeedback{}) } if err := auth.RunAs(idj.job.User(), context.Background(), fn); err != nil { log.Printf("import error (job %d): %v\n", idj.id, err) } if err := idj.job.CleanUp(); err != nil { log.Printf("cleanup error (job %d): %v\n", idj.id, err) } } }