annotate pkg/imports/queue.go @ 987:3841509f6e9e

Store job id alongside to job in job queue.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 19 Oct 2018 18:09:26 +0200
parents 7934b5c1a910
children 7dfd3db94e6d
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1 package imports
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
2
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
3 import (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
4 "container/list"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
5 "context"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
6 "database/sql"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
7 "log"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
8 "sync"
978
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
9 "sync/atomic"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
10
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
11 "gemma.intevation.de/gemma/pkg/auth"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
12 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
13
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 type Job interface {
979
7934b5c1a910 Finally enqueue sounding result import job to import jobs.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 978
diff changeset
15 User() string
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16 Do(conn *sql.Conn) error
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 CleanUp() error
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
19
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
20 type idJob struct {
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
21 id int64
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
22 job Job
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
23 }
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
24
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 var (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
26 queueCond = sync.NewCond(new(sync.Mutex))
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27 queue = list.New()
978
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
28
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
29 jobID int64
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
31
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
32 func init() {
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
33 go importLoop()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35
978
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
36 func AddJob(job Job) int64 {
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
37 id := atomic.AddInt64(&jobID, 1)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
38 queueCond.L.Lock()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
39 defer queueCond.L.Unlock()
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
40 queue.PushBack(idJob{id, job})
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
41 queueCond.Signal()
978
544a5cfe07cd Started with endpoint for uploading sounding result and trigger respective import job.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 958
diff changeset
42 return id
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
43 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
44
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
45 func importLoop() {
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
46 for {
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
47 var idj idJob
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
48 queueCond.L.Lock()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
49 for queue.Len() == 0 {
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
50 queueCond.Wait()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
51 }
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
52 idj = queue.Remove(queue.Front()).(idJob)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
53 queueCond.L.Unlock()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
54
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
55 log.Printf("starting import job %d\n", idj.id)
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
56
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
57 if err := auth.RunAs(idj.job.User(), context.Background(), idj.job.Do); err != nil {
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
58 log.Printf("import error (job %d): %v\n", idj.id, err)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
59 }
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
60 if err := idj.job.CleanUp(); err != nil {
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
61 log.Printf("cleanup error (job %d): %v\n", idj.id, err)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
62 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
63 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
64 }