comparison pkg/imports/queue.go @ 998:75e65599ea52 persistent-import-queue

Persist job queue in database. WIP.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 16:54:34 +0200
parents a978b2b26a88
children 14425e35e3c2
comparison
equal deleted inserted replaced
996:839526f44f0f 998:75e65599ea52
1 package imports 1 package imports
2 2
3 import ( 3 import (
4 "container/list"
5 "context" 4 "context"
6 "database/sql" 5 "database/sql"
7 "fmt" 6 "fmt"
8 "log" 7 "log"
9 "runtime/debug" 8 "runtime/debug"
10 "sync" 9 "sync"
11 "sync/atomic"
12 10
13 "gemma.intevation.de/gemma/pkg/auth" 11 "gemma.intevation.de/gemma/pkg/auth"
14 ) 12 )
15 13
16 type ( 14 type (
28 JobKind string 26 JobKind string
29 27
30 JobCreator func(kind JobKind, data string) (Job, error) 28 JobCreator func(kind JobKind, data string) (Job, error)
31 29
32 idJob struct { 30 idJob struct {
31 id int64
33 kind JobKind 32 kind JobKind
34 id int64
35 user string 33 user string
36 data string 34 data string
37 } 35 }
38 ) 36 )
39 37
40 var ( 38 var (
41 queueCond = sync.NewCond(new(sync.Mutex)) 39 queueCond = sync.NewCond(new(sync.Mutex))
42 queue = list.New()
43
44 jobID int64
45 40
46 creatorsMu sync.Mutex 41 creatorsMu sync.Mutex
47 creators = map[JobKind]JobCreator{} 42 creators = map[JobKind]JobCreator{}
43 )
44
45 const (
46 queueUser = "sys_admin"
47
48 insertJobSQL = `
49 INSERT INTO waterway.imports (
50 kind,
51 username,
52 data
53 ) VALUES (
54 $1,
55 $2,
56 $3
57 ) RETURNING id`
58
59 selectJobSQL = `
60 SELECT
61 id,
62 kind,
63 username,
64 data
65 FROM waterway.imports
66 WHERE state = 'queued'::waterway.import_state AND enqueued IN (
67 SELECT min(enqueued)
68 FROM waterway.imports
69 WHERE state = 'queued'::waterway.import_state
70 )
71 LIMIT 1
72 `
73 updateStateSQL = `
74 UPDATE waterway.imports SET state = $1::waterway.import_state
75 WHERE id = $2
76 `
77 logMessageSQL = `
78 INSERT INTO waterway.import_logs (
79 import_id,
80 kind,
81 msg
82 ) VALUES (
83 $1,
84 $2::waterway.log_type,
85 $3
86 )`
48 ) 87 )
49 88
50 func init() { 89 func init() {
51 go importLoop() 90 go importLoop()
52 } 91 }
62 creatorsMu.Lock() 101 creatorsMu.Lock()
63 defer creatorsMu.Unlock() 102 defer creatorsMu.Unlock()
64 return creators[kind] 103 return creators[kind]
65 } 104 }
66 105
67 func AddJob(kind JobKind, user, data string) int64 { 106 func AddJob(kind JobKind, user, data string) (int64, error) {
68 id := atomic.AddInt64(&jobID, 1) 107 ctx := context.Background()
69 queueCond.L.Lock() 108 queueCond.L.Lock()
70 defer queueCond.L.Unlock() 109 defer queueCond.L.Unlock()
71 queue.PushBack(idJob{ 110 var id int64
72 kind: kind, 111 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
73 id: id, 112 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
74 user: user, 113 })
75 data: data, 114 if err == nil {
76 }) 115 queueCond.Signal()
77 queueCond.Signal() 116 }
78 return id 117 return id, err
79 } 118 }
80 119
81 type logFeedback struct{} 120 type logFeedback int64
82 121
83 func (logFeedback) Info(fmt string, args ...interface{}) { 122 func (lf logFeedback) log(kind, format string, args ...interface{}) {
84 log.Printf("info: "+fmt, args...) 123 ctx := context.Background()
85 } 124 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
86 125 _, err := conn.ExecContext(
87 func (logFeedback) Warn(fmt string, args ...interface{}) { 126 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
88 log.Printf("warn: "+fmt, args...) 127 return err
89 } 128 })
90 129 if err != nil {
91 func (logFeedback) Error(fmt string, args ...interface{}) { 130 log.Printf("logging failed: %v\n", err)
92 log.Printf("error: "+fmt, args...) 131 }
132 }
133
134 func (lf logFeedback) Info(format string, args ...interface{}) {
135 lf.log("info", format, args...)
136 }
137
138 func (lf logFeedback) Warn(format string, args ...interface{}) {
139 lf.log("warn", format, args...)
140 }
141
142 func (lf logFeedback) Error(format string, args ...interface{}) {
143 lf.log("error", format, args...)
93 } 144 }
94 145
95 func survive(fn func() error) func() error { 146 func survive(fn func() error) func() error {
96 return func() error { 147 return func() error {
97 errCh := make(chan error) 148 errCh := make(chan error)
106 }() 157 }()
107 return <-errCh 158 return <-errCh
108 } 159 }
109 } 160 }
110 161
162 func fetchJob() (*idJob, error) {
163 var ji idJob
164 ctx := context.Background()
165 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
166 tx, err := conn.BeginTx(ctx, nil)
167 if err != nil {
168 return err
169 }
170 defer tx.Rollback()
171 if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
172 &ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
173 return err
174 }
175 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
176 if err == nil {
177 err = tx.Commit()
178 }
179 return err
180 })
181 switch {
182 case err == sql.ErrNoRows:
183 return nil, nil
184 case err != nil:
185 return nil, err
186 }
187 return &ji, nil
188 }
189
190 func updateState(id int64, state string) error {
191 ctx := context.Background()
192 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
193 _, err := conn.ExecContext(ctx, updateStateSQL, state, id)
194 return err
195 })
196 }
197
198 func errorAndFail(id int64, format string, args ...interface{}) error {
199 ctx := context.Background()
200 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
201 tx, err := conn.BeginTx(ctx, nil)
202 if err != nil {
203 return err
204 }
205 defer tx.Rollback()
206 _, err = conn.ExecContext(
207 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
208 if err != nil {
209 return err
210 }
211 _, err = conn.ExecContext(
212 ctx, updateStateSQL, "failed", id)
213 if err == nil {
214 err = tx.Commit()
215 }
216 return err
217 })
218 return err
219 }
220
111 func importLoop() { 221 func importLoop() {
112 for { 222 for {
113 var idj idJob
114 queueCond.L.Lock() 223 queueCond.L.Lock()
115 for queue.Len() == 0 { 224
116 queueCond.Wait() 225 var idj *idJob
117 } 226 var err error
118 idj = queue.Remove(queue.Front()).(idJob) 227
228 for idj == nil {
229 if idj, err = fetchJob(); err != nil {
230 log.Printf("db error: %v\n", err)
231 queueCond.Wait()
232 } else if idj == nil {
233 queueCond.Wait()
234 }
235 }
236
119 queueCond.L.Unlock() 237 queueCond.L.Unlock()
120 238
121 log.Printf("starting import job %d\n", idj.id) 239 log.Printf("starting import job %d\n", idj.id)
122 240
123 jc := jobCreator(idj.kind) 241 jc := jobCreator(idj.kind)
124 if jc == nil { 242 if jc == nil {
125 log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) 243 errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind)
126 continue 244 continue
127 } 245 }
128 246
129 job, err := jc(idj.kind, idj.data) 247 job, err := jc(idj.kind, idj.data)
130 if err != nil { 248 if err != nil {
131 log.Printf("Failed to create job: %v\n", err) 249 errorAndFail(idj.id, "Faild to create job: %v", err)
132 continue 250 continue
133 } 251 }
134 252
135 do := survive(func() error { 253 feedback := logFeedback(idj.id)
254
255 errDo := survive(func() error {
136 return auth.RunAs(idj.user, context.Background(), 256 return auth.RunAs(idj.user, context.Background(),
137 func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) }) 257 func(conn *sql.Conn) error { return job.Do(conn, feedback) })
138 }) 258 })()
139 if err := do(); err != nil { 259 if errDo != nil {
140 log.Printf("import error (job %d): %v\n", idj.id, err) 260 feedback.Error("error do: %v\n", errDo)
141 } 261 }
142 if err := survive(job.CleanUp)(); err != nil { 262 errCleanup := survive(job.CleanUp)
143 log.Printf("cleanup error (job %d): %v\n", idj.id, err) 263 if errCleanup != nil {
144 } 264 feedback.Error("error cleanup: %v\n", errCleanup)
145 } 265 }
146 } 266
267 if errDo != nil || errCleanup != nil {
268 err = updateState(idj.id, "failed")
269 } else {
270 err = updateState(idj.id, "successful")
271 }
272 if err != nil {
273 log.Printf("setting state of job %d failed: %v\n", idj.id, err)
274 }
275 }
276 }