Mercurial > gemma
view pkg/imports/queue.go @ 1000:14425e35e3c2 persistent-import-queue
Wait with start of import queue until configuration is fully loaded.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 17:16:02 +0200 |
parents | 75e65599ea52 |
children | d789f19877f4 |
line wrap: on
line source
package imports import ( "context" "database/sql" "fmt" "log" "runtime/debug" "sync" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/config" ) type ( Feedback interface { Info(fmt string, args ...interface{}) Warn(fmt string, args ...interface{}) Error(fmt string, args ...interface{}) } Job interface { Do(*sql.Conn, Feedback) error CleanUp() error } JobKind string JobCreator func(kind JobKind, data string) (Job, error) idJob struct { id int64 kind JobKind user string data string } ) var ( queueCond = sync.NewCond(new(sync.Mutex)) creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} ) const ( queueUser = "sys_admin" insertJobSQL = ` INSERT INTO waterway.imports ( kind, username, data ) VALUES ( $1, $2, $3 ) RETURNING id` selectJobSQL = ` SELECT id, kind, username, data FROM waterway.imports WHERE state = 'queued'::waterway.import_state AND enqueued IN ( SELECT min(enqueued) FROM waterway.imports WHERE state = 'queued'::waterway.import_state ) LIMIT 1 ` updateStateSQL = ` UPDATE waterway.imports SET state = $1::waterway.import_state WHERE id = $2 ` logMessageSQL = ` INSERT INTO waterway.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::waterway.log_type, $3 )` ) func init() { go importLoop() } func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) creatorsMu.Lock() defer creatorsMu.Unlock() creators[kind] = jc } func jobCreator(kind JobKind) JobCreator { creatorsMu.Lock() defer creatorsMu.Unlock() return creators[kind] } func AddJob(kind JobKind, user, data string) (int64, error) { ctx := context.Background() queueCond.L.Lock() defer queueCond.L.Unlock() var id int64 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) }) if err == nil { queueCond.Signal() } return id, err } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Printf("logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...interface{}) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...interface{}) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...interface{}) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() error { errCh := make(chan error) go func() { defer func() { if err := recover(); err != nil { errCh <- fmt.Errorf("%v: %s", err, string(debug.Stack())) } }() errCh <- fn() }() return <-errCh } } func fetchJob() (*idJob, error) { var ji idJob ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { err = tx.Commit() } return err }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func updateState(id int64, state string) error { ctx := context.Background() return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSQL, state, id) return err }) } func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) return err } func importLoop() { config.WaitReady() for { queueCond.L.Lock() var idj *idJob var err error for idj == nil { if idj, err = fetchJob(); err != nil { log.Printf("db error: %v\n", err) queueCond.Wait() } else if idj == nil { queueCond.Wait() } } queueCond.L.Unlock() log.Printf("starting import job %d\n", idj.id) jc := jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) continue } job, err := jc(idj.kind, idj.data) if err != nil { errorAndFail(idj.id, "Faild to create job: %v", err) continue } feedback := logFeedback(idj.id) errDo := survive(func() error { return auth.RunAs(idj.user, context.Background(), func(conn *sql.Conn) error { return job.Do(conn, feedback) }) })() if errDo != nil { feedback.Error("error do: %v\n", errDo) } errCleanup := survive(job.CleanUp)() if errCleanup != nil { feedback.Error("error cleanup: %v\n", errCleanup) } if errDo != nil || errCleanup != nil { err = updateState(idj.id, "failed") } else { err = updateState(idj.id, "successful") } if err != nil { log.Printf("setting state of job %d failed: %v\n", idj.id, err) } } }