Mercurial > gemma
view pkg/imports/queue.go @ 992:a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
This way the import job may die badly w/o killing the job queue, too.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 11:24:25 +0200 |
parents | a301d240905f |
children | 75e65599ea52 |
line wrap: on
line source
package imports import ( "container/list" "context" "database/sql" "fmt" "log" "runtime/debug" "sync" "sync/atomic" "gemma.intevation.de/gemma/pkg/auth" ) type ( Feedback interface { Info(fmt string, args ...interface{}) Warn(fmt string, args ...interface{}) Error(fmt string, args ...interface{}) } Job interface { Do(*sql.Conn, Feedback) error CleanUp() error } JobKind string JobCreator func(kind JobKind, data string) (Job, error) idJob struct { kind JobKind id int64 user string data string } ) var ( queueCond = sync.NewCond(new(sync.Mutex)) queue = list.New() jobID int64 creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} ) func init() { go importLoop() } func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) creatorsMu.Lock() defer creatorsMu.Unlock() creators[kind] = jc } func jobCreator(kind JobKind) JobCreator { creatorsMu.Lock() defer creatorsMu.Unlock() return creators[kind] } func AddJob(kind JobKind, user, data string) int64 { id := atomic.AddInt64(&jobID, 1) queueCond.L.Lock() defer queueCond.L.Unlock() queue.PushBack(idJob{ kind: kind, id: id, user: user, data: data, }) queueCond.Signal() return id } type logFeedback struct{} func (logFeedback) Info(fmt string, args ...interface{}) { log.Printf("info: "+fmt, args...) } func (logFeedback) Warn(fmt string, args ...interface{}) { log.Printf("warn: "+fmt, args...) } func (logFeedback) Error(fmt string, args ...interface{}) { log.Printf("error: "+fmt, args...) } func survive(fn func() error) func() error { return func() error { errCh := make(chan error) go func() { defer func() { if err := recover(); err != nil { errCh <- fmt.Errorf("%v: %s", err, string(debug.Stack())) } }() errCh <- fn() }() return <-errCh } } func importLoop() { for { var idj idJob queueCond.L.Lock() for queue.Len() == 0 { queueCond.Wait() } idj = queue.Remove(queue.Front()).(idJob) queueCond.L.Unlock() log.Printf("starting import job %d\n", idj.id) jc := jobCreator(idj.kind) if jc == nil { log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) continue } job, err := jc(idj.kind, idj.data) if err != nil { log.Printf("Failed to create job: %v\n", err) continue } do := survive(func() error { return auth.RunAs(idj.user, context.Background(), func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) }) }) if err := do(); err != nil { log.Printf("import error (job %d): %v\n", idj.id, err) } if err := survive(job.CleanUp)(); err != nil { log.Printf("cleanup error (job %d): %v\n", idj.id, err) } } }