changeset 1005:fcf016ebdef4 persistent-import-queue

Re-enqueue import jobs in state running if the the gemma server starts. These are jobs that where started but did not finish before the server died before.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 18:14:16 +0200
parents 7a4d233be077
children 5981ff466cf4
files pkg/imports/queue.go
diffstat 1 files changed, 19 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Mon Oct 22 17:39:25 2018 +0200
+++ b/pkg/imports/queue.go	Mon Oct 22 18:14:16 2018 +0200
@@ -49,6 +49,11 @@
 const (
 	queueUser = "sys_admin"
 
+	reEnqueueRunningSQL = `
+UPDATE waterway.imports SET state = 'queued'::waterway.import_state
+WHERE state = 'running'::waterway.import_state
+`
+
 	insertJobSQL = `
 INSERT INTO waterway.imports (
   kind,
@@ -164,6 +169,14 @@
 	}
 }
 
+func reEnqueueRunning() error {
+	ctx := context.Background()
+	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
+		return err
+	})
+}
+
 func fetchJob() (*idJob, error) {
 	var ji idJob
 	ctx := context.Background()
@@ -225,6 +238,12 @@
 
 func importLoop() {
 	config.WaitReady()
+	// re-enqueue the jobs that are in state running.
+	// They where in progess when the server went down.
+	if err := reEnqueueRunning(); err != nil {
+		log.Printf("re-enquing failed: %v", err)
+	}
+
 	for {
 		var idj *idJob
 		var err error