Mercurial > gemma
changeset 1005:fcf016ebdef4 persistent-import-queue
Re-enqueue import jobs in state running if the the gemma server starts.
These are jobs that where started but did not finish before the server
died before.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 18:14:16 +0200 |
parents | 7a4d233be077 |
children | 5981ff466cf4 |
files | pkg/imports/queue.go |
diffstat | 1 files changed, 19 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Mon Oct 22 17:39:25 2018 +0200 +++ b/pkg/imports/queue.go Mon Oct 22 18:14:16 2018 +0200 @@ -49,6 +49,11 @@ const ( queueUser = "sys_admin" + reEnqueueRunningSQL = ` +UPDATE waterway.imports SET state = 'queued'::waterway.import_state +WHERE state = 'running'::waterway.import_state +` + insertJobSQL = ` INSERT INTO waterway.imports ( kind, @@ -164,6 +169,14 @@ } } +func reEnqueueRunning() error { + ctx := context.Background() + return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) + return err + }) +} + func fetchJob() (*idJob, error) { var ji idJob ctx := context.Background() @@ -225,6 +238,12 @@ func importLoop() { config.WaitReady() + // re-enqueue the jobs that are in state running. + // They where in progess when the server went down. + if err := reEnqueueRunning(); err != nil { + log.Printf("re-enquing failed: %v", err) + } + for { var idj *idJob var err error