Mercurial > gemma
changeset 991:a301d240905f
Decoupled import job creation and job execution with a factory function.
This is needed for persistence purposes.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 10:45:17 +0200 |
parents | 3907a7b98067 |
children | a978b2b26a88 |
files | pkg/controllers/imports.go pkg/imports/queue.go pkg/imports/sr.go |
diffstat | 3 files changed, 74 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/imports.go Mon Oct 22 10:21:22 2018 +0200 +++ b/pkg/controllers/imports.go Mon Oct 22 10:45:17 2018 +0200 @@ -66,12 +66,7 @@ session, _ := auth.GetSession(req) - sr := &imports.SoundingResult{ - Who: session.User, - Dir: dir, - } - - jobID := imports.AddJob(sr) + jobID := imports.AddJob(imports.SRJobKind, session.User, dir) log.Printf("Added job %d\n", jobID) result := struct {
--- a/pkg/imports/queue.go Mon Oct 22 10:21:22 2018 +0200 +++ b/pkg/imports/queue.go Mon Oct 22 10:45:17 2018 +0200 @@ -11,39 +11,67 @@ "gemma.intevation.de/gemma/pkg/auth" ) -type Feedback interface { - Info(fmt string, args ...interface{}) - Warn(fmt string, args ...interface{}) - Error(fmt string, args ...interface{}) -} +type ( + Feedback interface { + Info(fmt string, args ...interface{}) + Warn(fmt string, args ...interface{}) + Error(fmt string, args ...interface{}) + } + + Job interface { + Do(*sql.Conn, Feedback) error + CleanUp() error + } -type Job interface { - User() string - Do(*sql.Conn, Feedback) error - CleanUp() error -} + JobKind string + + JobCreator func(kind JobKind, data string) (Job, error) -type idJob struct { - id int64 - job Job -} + idJob struct { + kind JobKind + id int64 + user string + data string + } +) var ( queueCond = sync.NewCond(new(sync.Mutex)) queue = list.New() jobID int64 + + creatorsMu sync.Mutex + creators = map[JobKind]JobCreator{} ) func init() { go importLoop() } -func AddJob(job Job) int64 { +func RegisterJobCreator(kind JobKind, jc JobCreator) { + log.Printf("info: register import job creator for kind '%s'\n", kind) + creatorsMu.Lock() + defer creatorsMu.Unlock() + creators[kind] = jc +} + +func jobCreator(kind JobKind) JobCreator { + creatorsMu.Lock() + defer creatorsMu.Unlock() + return creators[kind] +} + +func AddJob(kind JobKind, user, data string) int64 { id := atomic.AddInt64(&jobID, 1) queueCond.L.Lock() defer queueCond.L.Unlock() - queue.PushBack(idJob{id, job}) + queue.PushBack(idJob{ + kind: kind, + id: id, + user: user, + data: data, + }) queueCond.Signal() return id } @@ -74,14 +102,26 @@ log.Printf("starting import job %d\n", idj.id) - fn := func(conn *sql.Conn) error { - return idj.job.Do(conn, logFeedback{}) + jc := jobCreator(idj.kind) + if jc == nil { + log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) + continue } - if err := auth.RunAs(idj.job.User(), context.Background(), fn); err != nil { + job, err := jc(idj.kind, idj.data) + if err != nil { + log.Printf("Failed to create job: %v\n", err) + continue + } + + fn := func(conn *sql.Conn) error { + return job.Do(conn, logFeedback{}) + } + + if err := auth.RunAs(idj.user, context.Background(), fn); err != nil { log.Printf("import error (job %d): %v\n", idj.id, err) } - if err := idj.job.CleanUp(); err != nil { + if err := job.CleanUp(); err != nil { log.Printf("cleanup error (job %d): %v\n", idj.id, err) } }
--- a/pkg/imports/sr.go Mon Oct 22 10:21:22 2018 +0200 +++ b/pkg/imports/sr.go Mon Oct 22 10:45:17 2018 +0200 @@ -24,10 +24,7 @@ "gemma.intevation.de/gemma/pkg/octree" ) -type SoundingResult struct { - Who string - Dir string -} +type SoundingResult string const SoundingResultDateFormat = "2006-01-02" @@ -49,6 +46,14 @@ contourTolerance = 0.1 ) +const SRJobKind JobKind = "sr" + +func init() { + RegisterJobCreator(SRJobKind, func(_ JobKind, data string) (Job, error) { + return SoundingResult(data), nil + }) +} + const ( checkDepthReferenceSQL = ` SELECT true FROM depth_references WHERE depth_reference = $1` @@ -124,9 +129,9 @@ ` ) -func (sr *SoundingResult) Do(conn *sql.Conn, feedback Feedback) error { +func (sr SoundingResult) Do(conn *sql.Conn, feedback Feedback) error { - z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip")) + z, err := zip.OpenReader(filepath.Join(string(sr), "sr.zip")) if err != nil { return err } @@ -257,12 +262,8 @@ return err } -func (sr *SoundingResult) User() string { - return sr.Who -} - -func (sr *SoundingResult) CleanUp() error { - return os.RemoveAll(sr.Dir) +func (sr SoundingResult) CleanUp() error { + return os.RemoveAll(string(sr)) } func find(needle string, haystack []*zip.File) *zip.File {