changeset 1006:5981ff466cf4

Merged branch persistent-import-queue back into default.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 18:16:20 +0200
parents 26ca04caa330 (current diff) fcf016ebdef4 (diff)
children ea8db3ec863c
files
diffstat 7 files changed, 255 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/client/src/morphtool/Morphtool.vue	Mon Oct 22 14:53:20 2018 +0200
+++ b/client/src/morphtool/Morphtool.vue	Mon Oct 22 18:16:20 2018 +0200
@@ -97,7 +97,7 @@
     ...mapState("identifystore", ["identifiedFeatures"]),
     ...mapState("morphstore", ["selectedMorph"]),
     selectedBottleneck: function() {
-      if (this.identifiedFeatures) {
+      if (this.identifiedFeatures && !this.drawMode) {
         for (let feature of this.identifiedFeatures) {
           let id = feature.getId();
           // RegExp.prototype.test() works with number, str and undefined
--- a/cmd/gemma/main.go	Mon Oct 22 14:53:20 2018 +0200
+++ b/cmd/gemma/main.go	Mon Oct 22 18:16:20 2018 +0200
@@ -31,6 +31,8 @@
 
 func start(cmd *cobra.Command, args []string) {
 
+	config.Ready()
+
 	web, err := filepath.Abs(config.Web())
 	if err != nil {
 		log.Fatalf("error: %v\n", err)
--- a/pkg/config/config.go	Mon Oct 22 14:53:20 2018 +0200
+++ b/pkg/config/config.go	Mon Oct 22 18:16:20 2018 +0200
@@ -156,7 +156,26 @@
 	str("proxy-prefix", "", `URL prefix of proxy. Defaults to "http://${web-host}:${web-port}"`)
 
 	str("tmp-dir", "", "Temp directory of gemma server. Defaults to system temp directory.")
+}
 
+var (
+	configCond  = sync.NewCond(new(sync.Mutex))
+	configReady bool
+)
+
+func Ready() {
+	configCond.L.Lock()
+	defer configCond.L.Unlock()
+	configReady = true
+	configCond.Broadcast()
+}
+
+func WaitReady() {
+	configCond.L.Lock()
+	defer configCond.L.Unlock()
+	for !configReady {
+		configCond.Wait()
+	}
 }
 
 func initConfig() {
--- a/pkg/controllers/imports.go	Mon Oct 22 14:53:20 2018 +0200
+++ b/pkg/controllers/imports.go	Mon Oct 22 18:16:20 2018 +0200
@@ -66,7 +66,13 @@
 
 	session, _ := auth.GetSession(req)
 
-	jobID := imports.AddJob(imports.SRJobKind, session.User, dir)
+	jobID, err := imports.AddJob(imports.SRJobKind, session.User, dir)
+	if err != nil {
+		log.Printf("error: %v\n", err)
+		http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError)
+		return
+	}
+
 	log.Printf("Added job %d\n", jobID)
 
 	result := struct {
--- a/pkg/controllers/pwreset.go	Mon Oct 22 14:53:20 2018 +0200
+++ b/pkg/controllers/pwreset.go	Mon Oct 22 18:16:20 2018 +0200
@@ -18,6 +18,7 @@
 
 	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/config"
 	"gemma.intevation.de/gemma/pkg/misc"
 	"gemma.intevation.de/gemma/pkg/models"
 )
@@ -103,6 +104,7 @@
 }
 
 func removeOutdated() {
+	config.WaitReady()
 	for {
 		time.Sleep(cleanupPause)
 		err := auth.RunAs(
--- a/pkg/imports/queue.go	Mon Oct 22 14:53:20 2018 +0200
+++ b/pkg/imports/queue.go	Mon Oct 22 18:16:20 2018 +0200
@@ -1,16 +1,16 @@
 package imports
 
 import (
-	"container/list"
 	"context"
 	"database/sql"
 	"fmt"
 	"log"
 	"runtime/debug"
 	"sync"
-	"sync/atomic"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/auth"
+	"gemma.intevation.de/gemma/pkg/config"
 )
 
 type (
@@ -30,23 +30,71 @@
 	JobCreator func(kind JobKind, data string) (Job, error)
 
 	idJob struct {
+		id   int64
 		kind JobKind
-		id   int64
 		user string
 		data string
 	}
 )
 
+const pollDuration = time.Second * 10
+
 var (
-	queueCond = sync.NewCond(new(sync.Mutex))
-	queue     = list.New()
-
-	jobID int64
+	signalChan = make(chan struct{})
 
 	creatorsMu sync.Mutex
 	creators   = map[JobKind]JobCreator{}
 )
 
+const (
+	queueUser = "sys_admin"
+
+	reEnqueueRunningSQL = `
+UPDATE waterway.imports SET state = 'queued'::waterway.import_state
+WHERE state = 'running'::waterway.import_state
+`
+
+	insertJobSQL = `
+INSERT INTO waterway.imports (
+  kind,
+  username,
+  data
+) VALUES (
+  $1,
+  $2,
+  $3
+) RETURNING id`
+
+	selectJobSQL = `
+SELECT
+  id,
+  kind,
+  username,
+  data
+FROM waterway.imports 
+WHERE state = 'queued'::waterway.import_state AND enqueued IN (
+  SELECT min(enqueued)
+  FROM waterway.imports 
+  WHERE state = 'queued'::waterway.import_state
+)
+LIMIT 1
+`
+	updateStateSQL = `
+UPDATE waterway.imports SET state = $1::waterway.import_state
+WHERE id = $2
+`
+	logMessageSQL = `
+INSERT INTO waterway.import_logs (
+  import_id,
+  kind,
+  msg
+) VALUES (
+  $1,
+  $2::waterway.log_type,
+  $3
+)`
+)
+
 func init() {
 	go importLoop()
 }
@@ -64,32 +112,45 @@
 	return creators[kind]
 }
 
-func AddJob(kind JobKind, user, data string) int64 {
-	id := atomic.AddInt64(&jobID, 1)
-	queueCond.L.Lock()
-	defer queueCond.L.Unlock()
-	queue.PushBack(idJob{
-		kind: kind,
-		id:   id,
-		user: user,
-		data: data,
+func AddJob(kind JobKind, user, data string) (int64, error) {
+	ctx := context.Background()
+	var id int64
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
 	})
-	queueCond.Signal()
-	return id
+	if err == nil {
+		select {
+		case signalChan <- struct{}{}:
+		default:
+		}
+	}
+	return id, err
 }
 
-type logFeedback struct{}
+type logFeedback int64
 
-func (logFeedback) Info(fmt string, args ...interface{}) {
-	log.Printf("info: "+fmt, args...)
+func (lf logFeedback) log(kind, format string, args ...interface{}) {
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(
+			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
+		return err
+	})
+	if err != nil {
+		log.Printf("logging failed: %v\n", err)
+	}
 }
 
-func (logFeedback) Warn(fmt string, args ...interface{}) {
-	log.Printf("warn: "+fmt, args...)
+func (lf logFeedback) Info(format string, args ...interface{}) {
+	lf.log("info", format, args...)
 }
 
-func (logFeedback) Error(fmt string, args ...interface{}) {
-	log.Printf("error: "+fmt, args...)
+func (lf logFeedback) Warn(format string, args ...interface{}) {
+	lf.log("warn", format, args...)
+}
+
+func (lf logFeedback) Error(format string, args ...interface{}) {
+	lf.log("error", format, args...)
 }
 
 func survive(fn func() error) func() error {
@@ -108,39 +169,135 @@
 	}
 }
 
+func reEnqueueRunning() error {
+	ctx := context.Background()
+	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
+		return err
+	})
+}
+
+func fetchJob() (*idJob, error) {
+	var ji idJob
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
+			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
+			return err
+		}
+		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return nil, nil
+	case err != nil:
+		return nil, err
+	}
+	return &ji, nil
+}
+
+func updateState(id int64, state string) error {
+	ctx := context.Background()
+	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
+		return err
+	})
+}
+
+func errorAndFail(id int64, format string, args ...interface{}) error {
+	ctx := context.Background()
+	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		_, err = conn.ExecContext(
+			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
+		if err != nil {
+			return err
+		}
+		_, err = conn.ExecContext(
+			ctx, updateStateSQL, "failed", id)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+	return err
+}
+
 func importLoop() {
+	config.WaitReady()
+	// re-enqueue the jobs that are in state running.
+	// They where in progess when the server went down.
+	if err := reEnqueueRunning(); err != nil {
+		log.Printf("re-enquing failed: %v", err)
+	}
+
 	for {
-		var idj idJob
-		queueCond.L.Lock()
-		for queue.Len() == 0 {
-			queueCond.Wait()
+		var idj *idJob
+		var err error
+
+		for {
+			if idj, err = fetchJob(); err != nil {
+				log.Printf("db error: %v\n", err)
+			}
+			if idj != nil {
+				break
+			}
+			select {
+			case <-signalChan:
+			case <-time.After(pollDuration):
+			}
 		}
-		idj = queue.Remove(queue.Front()).(idJob)
-		queueCond.L.Unlock()
 
 		log.Printf("starting import job %d\n", idj.id)
 
 		jc := jobCreator(idj.kind)
 		if jc == nil {
-			log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind)
+			errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind)
 			continue
 		}
 
 		job, err := jc(idj.kind, idj.data)
 		if err != nil {
-			log.Printf("Failed to create job: %v\n", err)
+			errorAndFail(idj.id, "Faild to create job: %v", err)
 			continue
 		}
 
-		do := survive(func() error {
+		feedback := logFeedback(idj.id)
+
+		errDo := survive(func() error {
 			return auth.RunAs(idj.user, context.Background(),
-				func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) })
-		})
-		if err := do(); err != nil {
-			log.Printf("import error (job %d): %v\n", idj.id, err)
+				func(conn *sql.Conn) error { return job.Do(conn, feedback) })
+		})()
+		if errDo != nil {
+			feedback.Error("error do: %v\n", errDo)
+		}
+		errCleanup := survive(job.CleanUp)()
+		if errCleanup != nil {
+			feedback.Error("error cleanup: %v\n", errCleanup)
 		}
-		if err := survive(job.CleanUp)(); err != nil {
-			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
+
+		var state string
+		if errDo != nil || errCleanup != nil {
+			state = "failed"
+		} else {
+			state = "successful"
 		}
+		if err := updateState(idj.id, state); err != nil {
+			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
+		}
+		log.Printf("job %d finished: %s\n", idj.id, state)
 	}
 }
--- a/schema/gemma.sql	Mon Oct 22 14:53:20 2018 +0200
+++ b/schema/gemma.sql	Mon Oct 22 18:16:20 2018 +0200
@@ -500,6 +500,33 @@
         CHECK(measure_type = 'minimum guaranteed'
             OR value_lifetime IS NOT NULL)
     )
+
+    --
+    -- Import queue and respective logging
+    --
+    CREATE TYPE import_state AS ENUM ('queued', 'running', 'successful', 'failed')
+
+    CREATE TABLE imports (
+        id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+        state import_state NOT NULL DEFAULT 'queued',
+        enqueued timestamp NOT NULL DEFAULT now(),
+        kind  varchar NOT NULL,
+        username varchar NOT NULL
+            REFERENCES internal.user_profiles(username)
+                ON DELETE CASCADE ON UPDATE CASCADE,
+        data TEXT
+    )
+
+    CREATE INDEX enqueued_idx ON imports(enqueued, state)
+
+    CREATE TYPE log_type AS ENUM ('info', 'warn', 'error')
+
+    CREATE TABLE import_logs (
+        import_id int NOT NULL REFERENCES imports(id),
+        time timestamp NOT NULL DEFAULT now(),
+        kind log_type NOT NULL DEFAULT 'info',
+        msg TEXT NOT NULL
+    )
 ;
 
 COMMIT;