changeset 998:75e65599ea52 persistent-import-queue

Persist job queue in database. WIP.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 16:54:34 +0200
parents 839526f44f0f
children 60ca5bd3e326
files pkg/controllers/imports.go pkg/imports/queue.go schema/gemma.sql
diffstat 3 files changed, 174 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/imports.go	Mon Oct 22 14:49:00 2018 +0200
+++ b/pkg/controllers/imports.go	Mon Oct 22 16:54:34 2018 +0200
@@ -66,7 +66,13 @@
 
 	session, _ := auth.GetSession(req)
 
-	jobID := imports.AddJob(imports.SRJobKind, session.User, dir)
+	jobID, err := imports.AddJob(imports.SRJobKind, session.User, dir)
+	if err != nil {
+		log.Printf("error: %v\n", err)
+		http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError)
+		return
+	}
+
 	log.Printf("Added job %d\n", jobID)
 
 	result := struct {
--- a/pkg/imports/queue.go	Mon Oct 22 14:49:00 2018 +0200
+++ b/pkg/imports/queue.go	Mon Oct 22 16:54:34 2018 +0200
@@ -1,14 +1,12 @@
 package imports
 
 import (
-	"container/list"
 	"context"
 	"database/sql"
 	"fmt"
 	"log"
 	"runtime/debug"
 	"sync"
-	"sync/atomic"
 
 	"gemma.intevation.de/gemma/pkg/auth"
 )
@@ -30,8 +28,8 @@
 	JobCreator func(kind JobKind, data string) (Job, error)
 
 	idJob struct {
+		id   int64
 		kind JobKind
-		id   int64
 		user string
 		data string
 	}
@@ -39,14 +37,55 @@
 
 var (
 	queueCond = sync.NewCond(new(sync.Mutex))
-	queue     = list.New()
-
-	jobID int64
 
 	creatorsMu sync.Mutex
 	creators   = map[JobKind]JobCreator{}
 )
 
+const (
+	queueUser = "sys_admin"
+
+	insertJobSQL = `
+INSERT INTO waterway.imports (
+  kind,
+  username,
+  data
+) VALUES (
+  $1,
+  $2,
+  $3
+) RETURNING id`
+
+	selectJobSQL = `
+SELECT
+  id,
+  kind,
+  username,
+  data
+FROM waterway.imports 
+WHERE state = 'queued'::waterway.import_state AND enqueued IN (
+  SELECT min(enqueued)
+  FROM waterway.imports 
+  WHERE state = 'queued'::waterway.import_state
+)
+LIMIT 1
+`
+	updateStateSQL = `
+UPDATE waterway.imports SET state = $1::waterway.import_state
+WHERE id = $2
+`
+	logMessageSQL = `
+INSERT INTO waterway.import_logs (
+  import_id,
+  kind,
+  msg
+) VALUES (
+  $1,
+  $2::waterway.log_type,
+  $3
+)`
+)
+
 func init() {
 	go importLoop()
 }
@@ -64,32 +103,44 @@
 	return creators[kind]
 }
 
-func AddJob(kind JobKind, user, data string) int64 {
-	id := atomic.AddInt64(&jobID, 1)
+func AddJob(kind JobKind, user, data string) (int64, error) {
+	ctx := context.Background()
 	queueCond.L.Lock()
 	defer queueCond.L.Unlock()
-	queue.PushBack(idJob{
-		kind: kind,
-		id:   id,
-		user: user,
-		data: data,
+	var id int64
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
 	})
-	queueCond.Signal()
-	return id
+	if err == nil {
+		queueCond.Signal()
+	}
+	return id, err
 }
 
-type logFeedback struct{}
+type logFeedback int64
 
-func (logFeedback) Info(fmt string, args ...interface{}) {
-	log.Printf("info: "+fmt, args...)
+func (lf logFeedback) log(kind, format string, args ...interface{}) {
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(
+			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
+		return err
+	})
+	if err != nil {
+		log.Printf("logging failed: %v\n", err)
+	}
 }
 
-func (logFeedback) Warn(fmt string, args ...interface{}) {
-	log.Printf("warn: "+fmt, args...)
+func (lf logFeedback) Info(format string, args ...interface{}) {
+	lf.log("info", format, args...)
 }
 
-func (logFeedback) Error(fmt string, args ...interface{}) {
-	log.Printf("error: "+fmt, args...)
+func (lf logFeedback) Warn(format string, args ...interface{}) {
+	lf.log("warn", format, args...)
+}
+
+func (lf logFeedback) Error(format string, args ...interface{}) {
+	lf.log("error", format, args...)
 }
 
 func survive(fn func() error) func() error {
@@ -108,39 +159,118 @@
 	}
 }
 
+func fetchJob() (*idJob, error) {
+	var ji idJob
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
+			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
+			return err
+		}
+		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return nil, nil
+	case err != nil:
+		return nil, err
+	}
+	return &ji, nil
+}
+
+func updateState(id int64, state string) error {
+	ctx := context.Background()
+	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
+		return err
+	})
+}
+
+func errorAndFail(id int64, format string, args ...interface{}) error {
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		_, err = conn.ExecContext(
+			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
+		if err != nil {
+			return err
+		}
+		_, err = conn.ExecContext(
+			ctx, updateStateSQL, "failed", id)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+	return err
+}
+
 func importLoop() {
 	for {
-		var idj idJob
 		queueCond.L.Lock()
-		for queue.Len() == 0 {
-			queueCond.Wait()
+
+		var idj *idJob
+		var err error
+
+		for idj == nil {
+			if idj, err = fetchJob(); err != nil {
+				log.Printf("db error: %v\n", err)
+				queueCond.Wait()
+			} else if idj == nil {
+				queueCond.Wait()
+			}
 		}
-		idj = queue.Remove(queue.Front()).(idJob)
+
 		queueCond.L.Unlock()
 
 		log.Printf("starting import job %d\n", idj.id)
 
 		jc := jobCreator(idj.kind)
 		if jc == nil {
-			log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind)
+			errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind)
 			continue
 		}
 
 		job, err := jc(idj.kind, idj.data)
 		if err != nil {
-			log.Printf("Failed to create job: %v\n", err)
+			errorAndFail(idj.id, "Faild to create job: %v", err)
 			continue
 		}
 
-		do := survive(func() error {
+		feedback := logFeedback(idj.id)
+
+		errDo := survive(func() error {
 			return auth.RunAs(idj.user, context.Background(),
-				func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) })
-		})
-		if err := do(); err != nil {
-			log.Printf("import error (job %d): %v\n", idj.id, err)
+				func(conn *sql.Conn) error { return job.Do(conn, feedback) })
+		})()
+		if errDo != nil {
+			feedback.Error("error do: %v\n", errDo)
+		}
+		errCleanup := survive(job.CleanUp)
+		if errCleanup != nil {
+			feedback.Error("error cleanup: %v\n", errCleanup)
 		}
-		if err := survive(job.CleanUp)(); err != nil {
-			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
+
+		if errDo != nil || errCleanup != nil {
+			err = updateState(idj.id, "failed")
+		} else {
+			err = updateState(idj.id, "successful")
+		}
+		if err != nil {
+			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
 		}
 	}
 }
--- a/schema/gemma.sql	Mon Oct 22 14:49:00 2018 +0200
+++ b/schema/gemma.sql	Mon Oct 22 16:54:34 2018 +0200
@@ -517,6 +517,8 @@
         data TEXT
     )
 
+    CREATE INDEX enqueued_idx ON imports(enqueued, state)
+
     CREATE TYPE log_type AS ENUM ('info', 'warn', 'error')
 
     CREATE TABLE import_logs (