changeset 1003:d789f19877f4 persistent-import-queue

Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 17:38:07 +0200
parents e2860eff5d03
children 7a4d233be077
files pkg/imports/queue.go
diffstat 1 files changed, 21 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Mon Oct 22 17:17:43 2018 +0200
+++ b/pkg/imports/queue.go	Mon Oct 22 17:38:07 2018 +0200
@@ -7,6 +7,7 @@
 	"log"
 	"runtime/debug"
 	"sync"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/config"
@@ -36,8 +37,10 @@
 	}
 )
 
+const pollDuration = time.Second * 10
+
 var (
-	queueCond = sync.NewCond(new(sync.Mutex))
+	signalChan = make(chan struct{})
 
 	creatorsMu sync.Mutex
 	creators   = map[JobKind]JobCreator{}
@@ -106,14 +109,15 @@
 
 func AddJob(kind JobKind, user, data string) (int64, error) {
 	ctx := context.Background()
-	queueCond.L.Lock()
-	defer queueCond.L.Unlock()
 	var id int64
 	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
 		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
 	})
 	if err == nil {
-		queueCond.Signal()
+		select {
+		case signalChan <- struct{}{}:
+		default:
+		}
 	}
 	return id, err
 }
@@ -222,22 +226,22 @@
 func importLoop() {
 	config.WaitReady()
 	for {
-		queueCond.L.Lock()
-
 		var idj *idJob
 		var err error
 
-		for idj == nil {
+		for {
 			if idj, err = fetchJob(); err != nil {
 				log.Printf("db error: %v\n", err)
-				queueCond.Wait()
-			} else if idj == nil {
-				queueCond.Wait()
+			}
+			if idj != nil {
+				break
+			}
+			select {
+			case <-signalChan:
+			case <-time.After(pollDuration):
 			}
 		}
 
-		queueCond.L.Unlock()
-
 		log.Printf("starting import job %d\n", idj.id)
 
 		jc := jobCreator(idj.kind)
@@ -266,13 +270,15 @@
 			feedback.Error("error cleanup: %v\n", errCleanup)
 		}
 
+		var state string
 		if errDo != nil || errCleanup != nil {
-			err = updateState(idj.id, "failed")
+			state = "failed"
 		} else {
-			err = updateState(idj.id, "successful")
+			state = "successful"
 		}
-		if err != nil {
+		if err := updateState(idj.id, state); err != nil {
 			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
 		}
+		log.Printf("job %d finished: %s\n", idj.id, state)
 	}
 }