Mercurial > gemma
changeset 1003:d789f19877f4 persistent-import-queue
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 17:38:07 +0200 |
parents | e2860eff5d03 |
children | 7a4d233be077 |
files | pkg/imports/queue.go |
diffstat | 1 files changed, 21 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Mon Oct 22 17:17:43 2018 +0200 +++ b/pkg/imports/queue.go Mon Oct 22 17:38:07 2018 +0200 @@ -7,6 +7,7 @@ "log" "runtime/debug" "sync" + "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/config" @@ -36,8 +37,10 @@ } ) +const pollDuration = time.Second * 10 + var ( - queueCond = sync.NewCond(new(sync.Mutex)) + signalChan = make(chan struct{}) creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} @@ -106,14 +109,15 @@ func AddJob(kind JobKind, user, data string) (int64, error) { ctx := context.Background() - queueCond.L.Lock() - defer queueCond.L.Unlock() var id int64 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) }) if err == nil { - queueCond.Signal() + select { + case signalChan <- struct{}{}: + default: + } } return id, err } @@ -222,22 +226,22 @@ func importLoop() { config.WaitReady() for { - queueCond.L.Lock() - var idj *idJob var err error - for idj == nil { + for { if idj, err = fetchJob(); err != nil { log.Printf("db error: %v\n", err) - queueCond.Wait() - } else if idj == nil { - queueCond.Wait() + } + if idj != nil { + break + } + select { + case <-signalChan: + case <-time.After(pollDuration): } } - queueCond.L.Unlock() - log.Printf("starting import job %d\n", idj.id) jc := jobCreator(idj.kind) @@ -266,13 +270,15 @@ feedback.Error("error cleanup: %v\n", errCleanup) } + var state string if errDo != nil || errCleanup != nil { - err = updateState(idj.id, "failed") + state = "failed" } else { - err = updateState(idj.id, "successful") + state = "successful" } - if err != nil { + if err := updateState(idj.id, state); err != nil { log.Printf("setting state of job %d failed: %v\n", idj.id, err) } + log.Printf("job %d finished: %s\n", idj.id, state) } }