Mercurial > gemma
changeset 1006:5981ff466cf4
Merged branch persistent-import-queue back into default.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 18:16:20 +0200 |
parents | 26ca04caa330 (current diff) fcf016ebdef4 (diff) |
children | ea8db3ec863c |
files | |
diffstat | 7 files changed, 255 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/client/src/morphtool/Morphtool.vue Mon Oct 22 14:53:20 2018 +0200 +++ b/client/src/morphtool/Morphtool.vue Mon Oct 22 18:16:20 2018 +0200 @@ -97,7 +97,7 @@ ...mapState("identifystore", ["identifiedFeatures"]), ...mapState("morphstore", ["selectedMorph"]), selectedBottleneck: function() { - if (this.identifiedFeatures) { + if (this.identifiedFeatures && !this.drawMode) { for (let feature of this.identifiedFeatures) { let id = feature.getId(); // RegExp.prototype.test() works with number, str and undefined
--- a/cmd/gemma/main.go Mon Oct 22 14:53:20 2018 +0200 +++ b/cmd/gemma/main.go Mon Oct 22 18:16:20 2018 +0200 @@ -31,6 +31,8 @@ func start(cmd *cobra.Command, args []string) { + config.Ready() + web, err := filepath.Abs(config.Web()) if err != nil { log.Fatalf("error: %v\n", err)
--- a/pkg/config/config.go Mon Oct 22 14:53:20 2018 +0200 +++ b/pkg/config/config.go Mon Oct 22 18:16:20 2018 +0200 @@ -156,7 +156,26 @@ str("proxy-prefix", "", `URL prefix of proxy. Defaults to "http://${web-host}:${web-port}"`) str("tmp-dir", "", "Temp directory of gemma server. Defaults to system temp directory.") +} +var ( + configCond = sync.NewCond(new(sync.Mutex)) + configReady bool +) + +func Ready() { + configCond.L.Lock() + defer configCond.L.Unlock() + configReady = true + configCond.Broadcast() +} + +func WaitReady() { + configCond.L.Lock() + defer configCond.L.Unlock() + for !configReady { + configCond.Wait() + } } func initConfig() {
--- a/pkg/controllers/imports.go Mon Oct 22 14:53:20 2018 +0200 +++ b/pkg/controllers/imports.go Mon Oct 22 18:16:20 2018 +0200 @@ -66,7 +66,13 @@ session, _ := auth.GetSession(req) - jobID := imports.AddJob(imports.SRJobKind, session.User, dir) + jobID, err := imports.AddJob(imports.SRJobKind, session.User, dir) + if err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) + return + } + log.Printf("Added job %d\n", jobID) result := struct {
--- a/pkg/controllers/pwreset.go Mon Oct 22 14:53:20 2018 +0200 +++ b/pkg/controllers/pwreset.go Mon Oct 22 18:16:20 2018 +0200 @@ -18,6 +18,7 @@ "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/config" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" ) @@ -103,6 +104,7 @@ } func removeOutdated() { + config.WaitReady() for { time.Sleep(cleanupPause) err := auth.RunAs(
--- a/pkg/imports/queue.go Mon Oct 22 14:53:20 2018 +0200 +++ b/pkg/imports/queue.go Mon Oct 22 18:16:20 2018 +0200 @@ -1,16 +1,16 @@ package imports import ( - "container/list" "context" "database/sql" "fmt" "log" "runtime/debug" "sync" - "sync/atomic" + "time" "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/config" ) type ( @@ -30,23 +30,71 @@ JobCreator func(kind JobKind, data string) (Job, error) idJob struct { + id int64 kind JobKind - id int64 user string data string } ) +const pollDuration = time.Second * 10 + var ( - queueCond = sync.NewCond(new(sync.Mutex)) - queue = list.New() - - jobID int64 + signalChan = make(chan struct{}) creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} ) +const ( + queueUser = "sys_admin" + + reEnqueueRunningSQL = ` +UPDATE waterway.imports SET state = 'queued'::waterway.import_state +WHERE state = 'running'::waterway.import_state +` + + insertJobSQL = ` +INSERT INTO waterway.imports ( + kind, + username, + data +) VALUES ( + $1, + $2, + $3 +) RETURNING id` + + selectJobSQL = ` +SELECT + id, + kind, + username, + data +FROM waterway.imports +WHERE state = 'queued'::waterway.import_state AND enqueued IN ( + SELECT min(enqueued) + FROM waterway.imports + WHERE state = 'queued'::waterway.import_state +) +LIMIT 1 +` + updateStateSQL = ` +UPDATE waterway.imports SET state = $1::waterway.import_state +WHERE id = $2 +` + logMessageSQL = ` +INSERT INTO waterway.import_logs ( + import_id, + kind, + msg +) VALUES ( + $1, + $2::waterway.log_type, + $3 +)` +) + func init() { go importLoop() } @@ -64,32 +112,45 @@ return creators[kind] } -func AddJob(kind JobKind, user, data string) int64 { - id := atomic.AddInt64(&jobID, 1) - queueCond.L.Lock() - defer queueCond.L.Unlock() - queue.PushBack(idJob{ - kind: kind, - id: id, - user: user, - data: data, +func AddJob(kind JobKind, user, data string) (int64, error) { + ctx := context.Background() + var id int64 + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) }) - queueCond.Signal() - return id + if err == nil { + select { + case signalChan <- struct{}{}: + default: + } + } + return id, err } -type logFeedback struct{} +type logFeedback int64 -func (logFeedback) Info(fmt string, args ...interface{}) { - log.Printf("info: "+fmt, args...) +func (lf logFeedback) log(kind, format string, args ...interface{}) { + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext( + ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) + return err + }) + if err != nil { + log.Printf("logging failed: %v\n", err) + } } -func (logFeedback) Warn(fmt string, args ...interface{}) { - log.Printf("warn: "+fmt, args...) +func (lf logFeedback) Info(format string, args ...interface{}) { + lf.log("info", format, args...) } -func (logFeedback) Error(fmt string, args ...interface{}) { - log.Printf("error: "+fmt, args...) +func (lf logFeedback) Warn(format string, args ...interface{}) { + lf.log("warn", format, args...) +} + +func (lf logFeedback) Error(format string, args ...interface{}) { + lf.log("error", format, args...) } func survive(fn func() error) func() error { @@ -108,39 +169,135 @@ } } +func reEnqueueRunning() error { + ctx := context.Background() + return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) + return err + }) +} + +func fetchJob() (*idJob, error) { + var ji idJob + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( + &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { + return err + } + _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) + if err == nil { + err = tx.Commit() + } + return err + }) + switch { + case err == sql.ErrNoRows: + return nil, nil + case err != nil: + return nil, err + } + return &ji, nil +} + +func updateState(id int64, state string) error { + ctx := context.Background() + return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, updateStateSQL, state, id) + return err + }) +} + +func errorAndFail(id int64, format string, args ...interface{}) error { + ctx := context.Background() + err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + _, err = conn.ExecContext( + ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) + if err != nil { + return err + } + _, err = conn.ExecContext( + ctx, updateStateSQL, "failed", id) + if err == nil { + err = tx.Commit() + } + return err + }) + return err +} + func importLoop() { + config.WaitReady() + // re-enqueue the jobs that are in state running. + // They where in progess when the server went down. + if err := reEnqueueRunning(); err != nil { + log.Printf("re-enquing failed: %v", err) + } + for { - var idj idJob - queueCond.L.Lock() - for queue.Len() == 0 { - queueCond.Wait() + var idj *idJob + var err error + + for { + if idj, err = fetchJob(); err != nil { + log.Printf("db error: %v\n", err) + } + if idj != nil { + break + } + select { + case <-signalChan: + case <-time.After(pollDuration): + } } - idj = queue.Remove(queue.Front()).(idJob) - queueCond.L.Unlock() log.Printf("starting import job %d\n", idj.id) jc := jobCreator(idj.kind) if jc == nil { - log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) + errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) continue } job, err := jc(idj.kind, idj.data) if err != nil { - log.Printf("Failed to create job: %v\n", err) + errorAndFail(idj.id, "Faild to create job: %v", err) continue } - do := survive(func() error { + feedback := logFeedback(idj.id) + + errDo := survive(func() error { return auth.RunAs(idj.user, context.Background(), - func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) }) - }) - if err := do(); err != nil { - log.Printf("import error (job %d): %v\n", idj.id, err) + func(conn *sql.Conn) error { return job.Do(conn, feedback) }) + })() + if errDo != nil { + feedback.Error("error do: %v\n", errDo) + } + errCleanup := survive(job.CleanUp)() + if errCleanup != nil { + feedback.Error("error cleanup: %v\n", errCleanup) } - if err := survive(job.CleanUp)(); err != nil { - log.Printf("cleanup error (job %d): %v\n", idj.id, err) + + var state string + if errDo != nil || errCleanup != nil { + state = "failed" + } else { + state = "successful" } + if err := updateState(idj.id, state); err != nil { + log.Printf("setting state of job %d failed: %v\n", idj.id, err) + } + log.Printf("job %d finished: %s\n", idj.id, state) } }
--- a/schema/gemma.sql Mon Oct 22 14:53:20 2018 +0200 +++ b/schema/gemma.sql Mon Oct 22 18:16:20 2018 +0200 @@ -500,6 +500,33 @@ CHECK(measure_type = 'minimum guaranteed' OR value_lifetime IS NOT NULL) ) + + -- + -- Import queue and respective logging + -- + CREATE TYPE import_state AS ENUM ('queued', 'running', 'successful', 'failed') + + CREATE TABLE imports ( + id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + state import_state NOT NULL DEFAULT 'queued', + enqueued timestamp NOT NULL DEFAULT now(), + kind varchar NOT NULL, + username varchar NOT NULL + REFERENCES internal.user_profiles(username) + ON DELETE CASCADE ON UPDATE CASCADE, + data TEXT + ) + + CREATE INDEX enqueued_idx ON imports(enqueued, state) + + CREATE TYPE log_type AS ENUM ('info', 'warn', 'error') + + CREATE TABLE import_logs ( + import_id int NOT NULL REFERENCES imports(id), + time timestamp NOT NULL DEFAULT now(), + kind log_type NOT NULL DEFAULT 'info', + msg TEXT NOT NULL + ) ; COMMIT;