changeset 991:a301d240905f

Decoupled import job creation and job execution with a factory function. This is needed for persistence purposes.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 10:45:17 +0200
parents 3907a7b98067
children a978b2b26a88
files pkg/controllers/imports.go pkg/imports/queue.go pkg/imports/sr.go
diffstat 3 files changed, 74 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/imports.go	Mon Oct 22 10:21:22 2018 +0200
+++ b/pkg/controllers/imports.go	Mon Oct 22 10:45:17 2018 +0200
@@ -66,12 +66,7 @@
 
 	session, _ := auth.GetSession(req)
 
-	sr := &imports.SoundingResult{
-		Who: session.User,
-		Dir: dir,
-	}
-
-	jobID := imports.AddJob(sr)
+	jobID := imports.AddJob(imports.SRJobKind, session.User, dir)
 	log.Printf("Added job %d\n", jobID)
 
 	result := struct {
--- a/pkg/imports/queue.go	Mon Oct 22 10:21:22 2018 +0200
+++ b/pkg/imports/queue.go	Mon Oct 22 10:45:17 2018 +0200
@@ -11,39 +11,67 @@
 	"gemma.intevation.de/gemma/pkg/auth"
 )
 
-type Feedback interface {
-	Info(fmt string, args ...interface{})
-	Warn(fmt string, args ...interface{})
-	Error(fmt string, args ...interface{})
-}
+type (
+	Feedback interface {
+		Info(fmt string, args ...interface{})
+		Warn(fmt string, args ...interface{})
+		Error(fmt string, args ...interface{})
+	}
+
+	Job interface {
+		Do(*sql.Conn, Feedback) error
+		CleanUp() error
+	}
 
-type Job interface {
-	User() string
-	Do(*sql.Conn, Feedback) error
-	CleanUp() error
-}
+	JobKind string
+
+	JobCreator func(kind JobKind, data string) (Job, error)
 
-type idJob struct {
-	id  int64
-	job Job
-}
+	idJob struct {
+		kind JobKind
+		id   int64
+		user string
+		data string
+	}
+)
 
 var (
 	queueCond = sync.NewCond(new(sync.Mutex))
 	queue     = list.New()
 
 	jobID int64
+
+	creatorsMu sync.Mutex
+	creators   = map[JobKind]JobCreator{}
 )
 
 func init() {
 	go importLoop()
 }
 
-func AddJob(job Job) int64 {
+func RegisterJobCreator(kind JobKind, jc JobCreator) {
+	log.Printf("info: register import job creator for kind '%s'\n", kind)
+	creatorsMu.Lock()
+	defer creatorsMu.Unlock()
+	creators[kind] = jc
+}
+
+func jobCreator(kind JobKind) JobCreator {
+	creatorsMu.Lock()
+	defer creatorsMu.Unlock()
+	return creators[kind]
+}
+
+func AddJob(kind JobKind, user, data string) int64 {
 	id := atomic.AddInt64(&jobID, 1)
 	queueCond.L.Lock()
 	defer queueCond.L.Unlock()
-	queue.PushBack(idJob{id, job})
+	queue.PushBack(idJob{
+		kind: kind,
+		id:   id,
+		user: user,
+		data: data,
+	})
 	queueCond.Signal()
 	return id
 }
@@ -74,14 +102,26 @@
 
 		log.Printf("starting import job %d\n", idj.id)
 
-		fn := func(conn *sql.Conn) error {
-			return idj.job.Do(conn, logFeedback{})
+		jc := jobCreator(idj.kind)
+		if jc == nil {
+			log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind)
+			continue
 		}
 
-		if err := auth.RunAs(idj.job.User(), context.Background(), fn); err != nil {
+		job, err := jc(idj.kind, idj.data)
+		if err != nil {
+			log.Printf("Failed to create job: %v\n", err)
+			continue
+		}
+
+		fn := func(conn *sql.Conn) error {
+			return job.Do(conn, logFeedback{})
+		}
+
+		if err := auth.RunAs(idj.user, context.Background(), fn); err != nil {
 			log.Printf("import error (job %d): %v\n", idj.id, err)
 		}
-		if err := idj.job.CleanUp(); err != nil {
+		if err := job.CleanUp(); err != nil {
 			log.Printf("cleanup error (job %d): %v\n", idj.id, err)
 		}
 	}
--- a/pkg/imports/sr.go	Mon Oct 22 10:21:22 2018 +0200
+++ b/pkg/imports/sr.go	Mon Oct 22 10:45:17 2018 +0200
@@ -24,10 +24,7 @@
 	"gemma.intevation.de/gemma/pkg/octree"
 )
 
-type SoundingResult struct {
-	Who string
-	Dir string
-}
+type SoundingResult string
 
 const SoundingResultDateFormat = "2006-01-02"
 
@@ -49,6 +46,14 @@
 	contourTolerance = 0.1
 )
 
+const SRJobKind JobKind = "sr"
+
+func init() {
+	RegisterJobCreator(SRJobKind, func(_ JobKind, data string) (Job, error) {
+		return SoundingResult(data), nil
+	})
+}
+
 const (
 	checkDepthReferenceSQL = `
 SELECT true FROM depth_references WHERE depth_reference = $1`
@@ -124,9 +129,9 @@
 `
 )
 
-func (sr *SoundingResult) Do(conn *sql.Conn, feedback Feedback) error {
+func (sr SoundingResult) Do(conn *sql.Conn, feedback Feedback) error {
 
-	z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip"))
+	z, err := zip.OpenReader(filepath.Join(string(sr), "sr.zip"))
 	if err != nil {
 		return err
 	}
@@ -257,12 +262,8 @@
 	return err
 }
 
-func (sr *SoundingResult) User() string {
-	return sr.Who
-}
-
-func (sr *SoundingResult) CleanUp() error {
-	return os.RemoveAll(sr.Dir)
+func (sr SoundingResult) CleanUp() error {
+	return os.RemoveAll(string(sr))
 }
 
 func find(needle string, haystack []*zip.File) *zip.File {