changeset 958:2818ad6c7d32

Started with import queue.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 16 Oct 2018 14:59:32 +0200
parents 93364f153da4
children 6ab012d0f0c2
files cmd/gemma/main.go pkg/config/config.go pkg/imports/queue.go
diffstat 3 files changed, 58 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/cmd/gemma/main.go	Tue Oct 16 14:38:46 2018 +0200
+++ b/cmd/gemma/main.go	Tue Oct 16 14:59:32 2018 +0200
@@ -18,6 +18,8 @@
 	"gemma.intevation.de/gemma/pkg/config"
 	"gemma.intevation.de/gemma/pkg/controllers"
 	"gemma.intevation.de/gemma/pkg/geoserver"
+
+	_ "gemma.intevation.de/gemma/pkg/imports"
 )
 
 func prepareSessionStore() {
--- a/pkg/config/config.go	Tue Oct 16 14:38:46 2018 +0200
+++ b/pkg/config/config.go	Tue Oct 16 14:59:32 2018 +0200
@@ -47,6 +47,8 @@
 func GeoServerPassword() string { return viper.GetString("geoserver-password") }
 func GeoServerClean() bool      { return viper.GetBool("geoserver-clean") }
 
+func TmpDir() string { return viper.GetString("tmp-dir") }
+
 var (
 	proxyKeyOnce sync.Once
 	proxyKey     []byte
@@ -153,6 +155,8 @@
 	str("proxy-key", "", `signing key for proxy URLs. Defaults to random key.`)
 	str("proxy-prefix", "", `URL prefix of proxy. Defaults to "http://${web-host}:${web-port}"`)
 
+	str("tmp-dir", "", "Temp directory of gemma server. Defaults to system temp directory.")
+
 }
 
 func initConfig() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/queue.go	Tue Oct 16 14:59:32 2018 +0200
@@ -0,0 +1,52 @@
+package imports
+
+import (
+	"container/list"
+	"context"
+	"database/sql"
+	"log"
+	"sync"
+
+	"gemma.intevation.de/gemma/pkg/auth"
+)
+
+type Job interface {
+	Who() string
+	Do(conn *sql.Conn) error
+	CleanUp() error
+}
+
+var (
+	queueCond = sync.NewCond(new(sync.Mutex))
+	queue     = list.New()
+)
+
+func init() {
+	go importLoop()
+}
+
+func AddJob(job Job) {
+	queueCond.L.Lock()
+	defer queueCond.L.Unlock()
+	queue.PushBack(job)
+	queueCond.Signal()
+}
+
+func importLoop() {
+	for {
+		var job Job
+		queueCond.L.Lock()
+		for queue.Len() == 0 {
+			queueCond.Wait()
+		}
+		job = queue.Remove(queue.Front()).(Job)
+		queueCond.L.Unlock()
+
+		if err := auth.RunAs(job.Who(), context.Background(), job.Do); err != nil {
+			log.Printf("import error: %v\n", err)
+		}
+		if err := job.CleanUp(); err != nil {
+			log.Printf("cleanup error: %v\n", err)
+		}
+	}
+}