Mercurial > gemma
comparison pkg/imports/queue.go @ 998:75e65599ea52 persistent-import-queue
Persist job queue in database. WIP.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 16:54:34 +0200 |
parents | a978b2b26a88 |
children | 14425e35e3c2 |
comparison
equal
deleted
inserted
replaced
996:839526f44f0f | 998:75e65599ea52 |
---|---|
1 package imports | 1 package imports |
2 | 2 |
3 import ( | 3 import ( |
4 "container/list" | |
5 "context" | 4 "context" |
6 "database/sql" | 5 "database/sql" |
7 "fmt" | 6 "fmt" |
8 "log" | 7 "log" |
9 "runtime/debug" | 8 "runtime/debug" |
10 "sync" | 9 "sync" |
11 "sync/atomic" | |
12 | 10 |
13 "gemma.intevation.de/gemma/pkg/auth" | 11 "gemma.intevation.de/gemma/pkg/auth" |
14 ) | 12 ) |
15 | 13 |
16 type ( | 14 type ( |
28 JobKind string | 26 JobKind string |
29 | 27 |
30 JobCreator func(kind JobKind, data string) (Job, error) | 28 JobCreator func(kind JobKind, data string) (Job, error) |
31 | 29 |
32 idJob struct { | 30 idJob struct { |
31 id int64 | |
33 kind JobKind | 32 kind JobKind |
34 id int64 | |
35 user string | 33 user string |
36 data string | 34 data string |
37 } | 35 } |
38 ) | 36 ) |
39 | 37 |
40 var ( | 38 var ( |
41 queueCond = sync.NewCond(new(sync.Mutex)) | 39 queueCond = sync.NewCond(new(sync.Mutex)) |
42 queue = list.New() | |
43 | |
44 jobID int64 | |
45 | 40 |
46 creatorsMu sync.Mutex | 41 creatorsMu sync.Mutex |
47 creators = map[JobKind]JobCreator{} | 42 creators = map[JobKind]JobCreator{} |
43 ) | |
44 | |
45 const ( | |
46 queueUser = "sys_admin" | |
47 | |
48 insertJobSQL = ` | |
49 INSERT INTO waterway.imports ( | |
50 kind, | |
51 username, | |
52 data | |
53 ) VALUES ( | |
54 $1, | |
55 $2, | |
56 $3 | |
57 ) RETURNING id` | |
58 | |
59 selectJobSQL = ` | |
60 SELECT | |
61 id, | |
62 kind, | |
63 username, | |
64 data | |
65 FROM waterway.imports | |
66 WHERE state = 'queued'::waterway.import_state AND enqueued IN ( | |
67 SELECT min(enqueued) | |
68 FROM waterway.imports | |
69 WHERE state = 'queued'::waterway.import_state | |
70 ) | |
71 LIMIT 1 | |
72 ` | |
73 updateStateSQL = ` | |
74 UPDATE waterway.imports SET state = $1::waterway.import_state | |
75 WHERE id = $2 | |
76 ` | |
77 logMessageSQL = ` | |
78 INSERT INTO waterway.import_logs ( | |
79 import_id, | |
80 kind, | |
81 msg | |
82 ) VALUES ( | |
83 $1, | |
84 $2::waterway.log_type, | |
85 $3 | |
86 )` | |
48 ) | 87 ) |
49 | 88 |
50 func init() { | 89 func init() { |
51 go importLoop() | 90 go importLoop() |
52 } | 91 } |
62 creatorsMu.Lock() | 101 creatorsMu.Lock() |
63 defer creatorsMu.Unlock() | 102 defer creatorsMu.Unlock() |
64 return creators[kind] | 103 return creators[kind] |
65 } | 104 } |
66 | 105 |
67 func AddJob(kind JobKind, user, data string) int64 { | 106 func AddJob(kind JobKind, user, data string) (int64, error) { |
68 id := atomic.AddInt64(&jobID, 1) | 107 ctx := context.Background() |
69 queueCond.L.Lock() | 108 queueCond.L.Lock() |
70 defer queueCond.L.Unlock() | 109 defer queueCond.L.Unlock() |
71 queue.PushBack(idJob{ | 110 var id int64 |
72 kind: kind, | 111 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
73 id: id, | 112 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) |
74 user: user, | 113 }) |
75 data: data, | 114 if err == nil { |
76 }) | 115 queueCond.Signal() |
77 queueCond.Signal() | 116 } |
78 return id | 117 return id, err |
79 } | 118 } |
80 | 119 |
81 type logFeedback struct{} | 120 type logFeedback int64 |
82 | 121 |
83 func (logFeedback) Info(fmt string, args ...interface{}) { | 122 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
84 log.Printf("info: "+fmt, args...) | 123 ctx := context.Background() |
85 } | 124 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
86 | 125 _, err := conn.ExecContext( |
87 func (logFeedback) Warn(fmt string, args ...interface{}) { | 126 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
88 log.Printf("warn: "+fmt, args...) | 127 return err |
89 } | 128 }) |
90 | 129 if err != nil { |
91 func (logFeedback) Error(fmt string, args ...interface{}) { | 130 log.Printf("logging failed: %v\n", err) |
92 log.Printf("error: "+fmt, args...) | 131 } |
132 } | |
133 | |
134 func (lf logFeedback) Info(format string, args ...interface{}) { | |
135 lf.log("info", format, args...) | |
136 } | |
137 | |
138 func (lf logFeedback) Warn(format string, args ...interface{}) { | |
139 lf.log("warn", format, args...) | |
140 } | |
141 | |
142 func (lf logFeedback) Error(format string, args ...interface{}) { | |
143 lf.log("error", format, args...) | |
93 } | 144 } |
94 | 145 |
95 func survive(fn func() error) func() error { | 146 func survive(fn func() error) func() error { |
96 return func() error { | 147 return func() error { |
97 errCh := make(chan error) | 148 errCh := make(chan error) |
106 }() | 157 }() |
107 return <-errCh | 158 return <-errCh |
108 } | 159 } |
109 } | 160 } |
110 | 161 |
162 func fetchJob() (*idJob, error) { | |
163 var ji idJob | |
164 ctx := context.Background() | |
165 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | |
166 tx, err := conn.BeginTx(ctx, nil) | |
167 if err != nil { | |
168 return err | |
169 } | |
170 defer tx.Rollback() | |
171 if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( | |
172 &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { | |
173 return err | |
174 } | |
175 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) | |
176 if err == nil { | |
177 err = tx.Commit() | |
178 } | |
179 return err | |
180 }) | |
181 switch { | |
182 case err == sql.ErrNoRows: | |
183 return nil, nil | |
184 case err != nil: | |
185 return nil, err | |
186 } | |
187 return &ji, nil | |
188 } | |
189 | |
190 func updateState(id int64, state string) error { | |
191 ctx := context.Background() | |
192 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | |
193 _, err := conn.ExecContext(ctx, updateStateSQL, state, id) | |
194 return err | |
195 }) | |
196 } | |
197 | |
198 func errorAndFail(id int64, format string, args ...interface{}) error { | |
199 ctx := context.Background() | |
200 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { | |
201 tx, err := conn.BeginTx(ctx, nil) | |
202 if err != nil { | |
203 return err | |
204 } | |
205 defer tx.Rollback() | |
206 _, err = conn.ExecContext( | |
207 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) | |
208 if err != nil { | |
209 return err | |
210 } | |
211 _, err = conn.ExecContext( | |
212 ctx, updateStateSQL, "failed", id) | |
213 if err == nil { | |
214 err = tx.Commit() | |
215 } | |
216 return err | |
217 }) | |
218 return err | |
219 } | |
220 | |
111 func importLoop() { | 221 func importLoop() { |
112 for { | 222 for { |
113 var idj idJob | |
114 queueCond.L.Lock() | 223 queueCond.L.Lock() |
115 for queue.Len() == 0 { | 224 |
116 queueCond.Wait() | 225 var idj *idJob |
117 } | 226 var err error |
118 idj = queue.Remove(queue.Front()).(idJob) | 227 |
228 for idj == nil { | |
229 if idj, err = fetchJob(); err != nil { | |
230 log.Printf("db error: %v\n", err) | |
231 queueCond.Wait() | |
232 } else if idj == nil { | |
233 queueCond.Wait() | |
234 } | |
235 } | |
236 | |
119 queueCond.L.Unlock() | 237 queueCond.L.Unlock() |
120 | 238 |
121 log.Printf("starting import job %d\n", idj.id) | 239 log.Printf("starting import job %d\n", idj.id) |
122 | 240 |
123 jc := jobCreator(idj.kind) | 241 jc := jobCreator(idj.kind) |
124 if jc == nil { | 242 if jc == nil { |
125 log.Printf("Cannot find creatir for job kind '%s'.\n", idj.kind) | 243 errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) |
126 continue | 244 continue |
127 } | 245 } |
128 | 246 |
129 job, err := jc(idj.kind, idj.data) | 247 job, err := jc(idj.kind, idj.data) |
130 if err != nil { | 248 if err != nil { |
131 log.Printf("Failed to create job: %v\n", err) | 249 errorAndFail(idj.id, "Faild to create job: %v", err) |
132 continue | 250 continue |
133 } | 251 } |
134 | 252 |
135 do := survive(func() error { | 253 feedback := logFeedback(idj.id) |
254 | |
255 errDo := survive(func() error { | |
136 return auth.RunAs(idj.user, context.Background(), | 256 return auth.RunAs(idj.user, context.Background(), |
137 func(conn *sql.Conn) error { return job.Do(conn, logFeedback{}) }) | 257 func(conn *sql.Conn) error { return job.Do(conn, feedback) }) |
138 }) | 258 })() |
139 if err := do(); err != nil { | 259 if errDo != nil { |
140 log.Printf("import error (job %d): %v\n", idj.id, err) | 260 feedback.Error("error do: %v\n", errDo) |
141 } | 261 } |
142 if err := survive(job.CleanUp)(); err != nil { | 262 errCleanup := survive(job.CleanUp) |
143 log.Printf("cleanup error (job %d): %v\n", idj.id, err) | 263 if errCleanup != nil { |
144 } | 264 feedback.Error("error cleanup: %v\n", errCleanup) |
145 } | 265 } |
146 } | 266 |
267 if errDo != nil || errCleanup != nil { | |
268 err = updateState(idj.id, "failed") | |
269 } else { | |
270 err = updateState(idj.id, "successful") | |
271 } | |
272 if err != nil { | |
273 log.Printf("setting state of job %d failed: %v\n", idj.id, err) | |
274 } | |
275 } | |
276 } |