# HG changeset patch # User Sascha L. Teichmann # Date 1540220074 -7200 # Node ID 75e65599ea5215841ba2acbc53332ceda10077a6 # Parent 839526f44f0fbaced9d6f475a3f0146337c569d5 Persist job queue in database. WIP. diff -r 839526f44f0f -r 75e65599ea52 pkg/controllers/imports.go --- a/pkg/controllers/imports.go Mon Oct 22 14:49:00 2018 +0200 +++ b/pkg/controllers/imports.go Mon Oct 22 16:54:34 2018 +0200 @@ -66,7 +66,13 @@ session, _ := auth.GetSession(req) - jobID := imports.AddJob(imports.SRJobKind, session.User, dir) + jobID, err := imports.AddJob(imports.SRJobKind, session.User, dir) + if err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) + return + } + log.Printf("Added job %d\n", jobID) result := struct { diff -r 839526f44f0f -r 75e65599ea52 pkg/imports/queue.go --- a/pkg/imports/queue.go Mon Oct 22 14:49:00 2018 +0200 +++ b/pkg/imports/queue.go Mon Oct 22 16:54:34 2018 +0200 @@ -1,14 +1,12 @@ package imports import ( - "container/list" "context" "database/sql" "fmt" "log" "runtime/debug" "sync" - "sync/atomic" "gemma.intevation.de/gemma/pkg/auth" ) @@ -30,8 +28,8 @@ JobCreator func(kind JobKind, data string) (Job, error) idJob struct { + id int64 kind JobKind - id int64 user string data string } @@ -39,14 +37,55 @@ var ( queueCond = sync.NewCond(new(sync.Mutex)) - queue = list.New() - - jobID int64 creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} ) +const ( + queueUser = "sys_admin" + + insertJobSQL = ` +INSERT INTO waterway.imports ( + kind, + username, + data +) VALUES ( + $1, + $2, + $3 +) RETURNING id` + + selectJobSQL = ` +SELECT + id, + kind, + username, + data +FROM waterway.imports +WHERE state = 'queued'::waterway.import_state AND enqueued IN ( + SELECT min(enqueued) + FROM waterway.imports + WHERE state = 'queued'::waterway.import_state +) +LIMIT 1 +` + updateStateSQL = ` +UPDATE waterway.imports SET state = $1::waterway.import_state +WHERE id = $2 +` + logMessageSQL = ` +INSERT INTO waterway.import_logs ( + import_id, + kind, + msg +) VALUES ( + $1, + $2::waterway.log_type, + $3 +)` +) + func init() { go importLoop() } @@ -64,32 +103,44 @@ return creators[kind] } -func AddJob(kind JobKind, user, data string) int64 { - id := atomic.AddInt64(&jobID, 1) +func AddJob(kind JobKind, user, data string) (int64, error) { + ctx := context.Background() queueCond.L.Lock() defer queueCond.L.Unlock() - queue.PushBack(idJob{ - kind: kind, - id: id, - user: user, - data: data, + var id int64 + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) }) - queueCond.Signal() - return id + if err == nil { + queueCond.Signal() + } + return id, err } -type logFeedback struct{} +type logFeedback int64 -func (logFeedback) Info(fmt string, args ...interface{}) { - log.Printf("info: "+fmt, args...) +func (lf logFeedback) log(kind, format string, args ...interface{}) { + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext( + ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) + return err + }) + if err != nil { + log.Printf("logging failed: %v\n", err) + } } -func (logFeedback) Warn(fmt string, args ...interface{}) { - log.Printf("warn: "+fmt, args...) +func (lf logFeedback) Info(format string, args ...interface{}) { + lf.log("info", format, args...) } -func (logFeedback) Error(fmt string, args ...interface{}) { - log.Printf("error: "+fmt, args...) +func (lf logFeedback) Warn(format string, args ...interface{}) { + lf.log("warn", format, args...) +} + +func (lf logFeedback) Error(format string, args ...interface{}) { + lf.log("error", format, args...) } func survive(fn func() error) func() error { @@ -108,39 +159,118 @@ } } +func fetchJob() (*idJob, error) { + var ji idJob + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( + &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { + return err + } + _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) + if err == nil { + err = tx.Commit() + } + return err + }) + switch { + case err == sql.ErrNoRows: + return nil, nil + case err != nil: + return nil, err + } + return &ji, nil +} + +func updateState(id int64, state string) error { + ctx := context.Background() + return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, updateStateSQL, state, id) + return err + }) +} + +func errorAndFail(id int64, format string, args ...interface{}) error { + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + _, err = conn.ExecContext( + ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) + if err != nil { + return err + } + _, err = conn.ExecContext( + ctx, updateStateSQL, "failed", id) + if err == nil { + err = tx.Commit() + } + return err + }) + return err +} + func importLoop() { for { - var idj idJob queueCond.L.Lock() - for queue.Len() == 0 { - queueCond.Wait() + + var idj *idJob + var err error + + for idj == nil { + if idj, err = fetchJob(); err != nil { + log.Printf("db error: %v\n", err) + queueCond.Wait() + } else if idj == nil { + queueCond.Wait() + } } - idj = queue.Remove(queue.Front()).(idJob) + queueCond.L.Unlock() log.Printf("starting import job %d\n", idj.id) jc := jobCreator(idj.kind) if jc == nil { - log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) + errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) continue } job, err := jc(idj.kind, idj.data) if err != nil { - log.Printf("Failed to create job: %v\n", err) + errorAndFail(idj.id, "Faild to create job: %v", err) continue } - do := survive(func() error { + feedback := logFeedback(idj.id) + + errDo := survive(func() error { return auth.RunAs(idj.user, context.Background(), - func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) }) - }) - if err := do(); err != nil { - log.Printf("import error (job %d): %v\n", idj.id, err) + func(conn *sql.Conn) error { return job.Do(conn, feedback) }) + })() + if errDo != nil { + feedback.Error("error do: %v\n", errDo) + } + errCleanup := survive(job.CleanUp) + if errCleanup != nil { + feedback.Error("error cleanup: %v\n", errCleanup) } - if err := survive(job.CleanUp)(); err != nil { - log.Printf("cleanup error (job %d): %v\n", idj.id, err) + + if errDo != nil || errCleanup != nil { + err = updateState(idj.id, "failed") + } else { + err = updateState(idj.id, "successful") + } + if err != nil { + log.Printf("setting state of job %d failed: %v\n", idj.id, err) } } } diff -r 839526f44f0f -r 75e65599ea52 schema/gemma.sql --- a/schema/gemma.sql Mon Oct 22 14:49:00 2018 +0200 +++ b/schema/gemma.sql Mon Oct 22 16:54:34 2018 +0200 @@ -517,6 +517,8 @@ data TEXT ) + CREATE INDEX enqueued_idx ON imports(enqueued, state) + CREATE TYPE log_type AS ENUM ('info', 'warn', 'error') CREATE TABLE import_logs (