comparison pkg/imports/queue.go @ 1708:49e047c2106e

Imports: Made imports re-runnable if they fail.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 08 Jan 2019 13:35:44 +0100
parents 4a2fad8f57de
children 8ddbedf296d7
comparison
equal deleted inserted replaced
1707:74b66527ae81 1708:49e047c2106e
38 Info(fmt string, args ...interface{}) 38 Info(fmt string, args ...interface{})
39 // Warn logs warnings. 39 // Warn logs warnings.
40 Warn(fmt string, args ...interface{}) 40 Warn(fmt string, args ...interface{})
41 // Error logs errors. 41 // Error logs errors.
42 Error(fmt string, args ...interface{}) 42 Error(fmt string, args ...interface{})
43 }
44
45 // RetryError is an error type to signal that
46 // the import should be tried again.
47 RetryError struct {
48 // Message is the error message.
49 Message string
50 // When is the new scheduled execution time.
51 When time.Time
43 } 52 }
44 53
45 // Job is the central abstraction of an import job 54 // Job is the central abstraction of an import job
46 // run by the import queue. 55 // run by the import queue.
47 Job interface { 56 Job interface {
86 95
87 idJob struct { 96 idJob struct {
88 id int64 97 id int64
89 kind JobKind 98 kind JobKind
90 user string 99 user string
100 trysLeft sql.NullInt64
91 sendEmail bool 101 sendEmail bool
92 autoAccept bool 102 autoAccept bool
93 data string 103 data string
94 } 104 }
95 ) 105 )
129 WHERE state = 'running'::waterway.import_state` 139 WHERE state = 'running'::waterway.import_state`
130 140
131 insertJobSQL = ` 141 insertJobSQL = `
132 INSERT INTO waterway.imports ( 142 INSERT INTO waterway.imports (
133 kind, 143 kind,
144 due,
145 trys_left,
134 username, 146 username,
135 send_email, 147 send_email,
136 auto_accept, 148 auto_accept,
137 data 149 data
138 ) VALUES ( 150 ) VALUES (
139 $1, 151 $1,
140 $2, 152 COALESCE($2, CURRENT_TIMESTAMP),
141 $3, 153 $3,
142 $4, 154 $4,
143 $5 155 $5,
156 $6,
157 $7
144 ) RETURNING id` 158 ) RETURNING id`
145 159
146 selectJobSQL = ` 160 selectJobSQL = `
147 SELECT 161 SELECT
148 id, 162 id,
149 kind, 163 kind,
164 trys_left,
150 username, 165 username,
151 send_email, 166 send_email,
152 auto_accept, 167 auto_accept,
153 data 168 data
154 FROM waterway.imports 169 FROM waterway.imports
155 WHERE state = 'queued'::waterway.import_state AND enqueued IN ( 170 WHERE
156 SELECT min(enqueued) 171 due <= CURRENT_TIMESTAMP AND
157 FROM waterway.imports 172 state = 'queued'::waterway.import_state AND enqueued IN (
158 WHERE state = 'queued'::waterway.import_state AND 173 SELECT min(enqueued)
159 kind = ANY($1) 174 FROM waterway.imports
160 ) 175 WHERE state = 'queued'::waterway.import_state AND
176 kind = ANY($1))
161 LIMIT 1` 177 LIMIT 1`
162 178
163 updateStateSQL = ` 179 updateStateSQL = `
164 UPDATE waterway.imports SET state = $1::waterway.import_state 180 UPDATE waterway.imports SET state = $1::waterway.import_state
165 WHERE id = $2` 181 WHERE id = $2`
184 200
185 func init() { 201 func init() {
186 go iqueue.importLoop() 202 go iqueue.importLoop()
187 } 203 }
188 204
205 // Error makes RetryError an error.
206 func (re *RetryError) Error() string {
207 return re.Message
208 }
209
189 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { 210 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
190 q.creatorsMu.Lock() 211 q.creatorsMu.Lock()
191 defer q.creatorsMu.Unlock() 212 defer q.creatorsMu.Unlock()
192 q.creators[kind] = jc 213 q.creators[kind] = jc
193 } 214 }
234 } 255 }
235 // XXX: Consider using sort.Strings to make output deterministic. 256 // XXX: Consider using sort.Strings to make output deterministic.
236 return names 257 return names
237 } 258 }
238 259
260 func (idj *idJob) trys() int {
261 if !idj.trysLeft.Valid {
262 return -1
263 }
264 return int(idj.trysLeft.Int64)
265 }
266
239 func (q *importQueue) jobCreator(kind JobKind) JobCreator { 267 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
240 q.creatorsMu.Lock() 268 q.creatorsMu.Lock()
241 defer q.creatorsMu.Unlock() 269 defer q.creatorsMu.Unlock()
242 return q.creators[kind] 270 return q.creators[kind]
243 } 271 }
244 272
245 func (q *importQueue) addJob( 273 func (q *importQueue) addJob(
246 kind JobKind, 274 kind JobKind,
275 due time.Time,
276 trysLeft int,
247 user string, 277 user string,
248 sendEmail, autoAccept bool, 278 sendEmail, autoAccept bool,
249 data string, 279 data string,
250 ) (int64, error) { 280 ) (int64, error) {
251 ctx := context.Background() 281 ctx := context.Background()
252 var id int64 282 var id int64
283 if due.IsZero() {
284 due = time.Now()
285 }
286 var tl sql.NullInt64
287 if trysLeft >= 0 {
288 tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true}
289 }
253 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { 290 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
254 return conn.QueryRowContext( 291 return conn.QueryRowContext(
255 ctx, 292 ctx,
256 insertJobSQL, 293 insertJobSQL,
257 string(kind), 294 string(kind),
295 due,
296 tl,
258 user, 297 user,
259 sendEmail, 298 sendEmail,
260 autoAccept, 299 autoAccept,
261 data).Scan(&id) 300 data).Scan(&id)
262 }) 301 })
268 } 307 }
269 return id, err 308 return id, err
270 } 309 }
271 310
272 // AddJob adds a job to the global import queue to be executed 311 // AddJob adds a job to the global import queue to be executed
273 // as soon as possible. This is gone in a separate Go routine 312 // as soon as possible after due.
313 // This is gone in a separate Go routine
274 // so this will not block. 314 // so this will not block.
275 func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) { 315 func AddJob(
276 return iqueue.addJob(kind, user, sendEmail, autoAccept, data) 316 kind JobKind,
317 due time.Time,
318 trysLeft int,
319 user string,
320 sendEmail, autoAccept bool,
321 data string,
322 ) (int64, error) {
323 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data)
277 } 324 }
278 325
279 type logFeedback int64 326 type logFeedback int64
280 327
281 func (lf logFeedback) log(kind, format string, args ...interface{}) { 328 func (lf logFeedback) log(kind, format string, args ...interface{}) {
355 } 402 }
356 defer tx.Rollback() 403 defer tx.Rollback()
357 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( 404 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
358 &ji.id, 405 &ji.id,
359 &ji.kind, 406 &ji.kind,
407 &ji.trysLeft,
360 &ji.user, 408 &ji.user,
361 &ji.sendEmail, 409 &ji.sendEmail,
362 &ji.autoAccept, 410 &ji.autoAccept,
363 &ji.data, 411 &ji.data,
364 ); err != nil { 412 ); err != nil {
500 }) 548 })
501 })() 549 })()
502 if errDo != nil { 550 if errDo != nil {
503 feedback.Error("error do: %v", errDo) 551 feedback.Error("error do: %v", errDo)
504 } 552 }
505 errCleanup := survive(job.CleanUp)() 553 // Should we try again?
506 if errCleanup != nil { 554 retry, shouldRetry := errDo.(*RetryError)
507 feedback.Error("error cleanup: %v", errCleanup) 555
556 if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less
557 if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 {
558 shouldRetry = false
559 }
560 }
561
562 var errCleanup error
563 if !shouldRetry { // cleanup debris
564 errCleanup = survive(job.CleanUp)()
565 if errCleanup != nil {
566 feedback.Error("error cleanup: %v", errCleanup)
567 }
508 } 568 }
509 569
510 var state string 570 var state string
511 switch { 571 switch {
512 case errDo != nil || errCleanup != nil: 572 case errDo != nil || errCleanup != nil:
522 // TODO: Send email if sendEmail is set. 582 // TODO: Send email if sendEmail is set.
523 log.Printf("import #%d finished: %s\n", idj.id, state) 583 log.Printf("import #%d finished: %s\n", idj.id, state)
524 if idj.sendEmail { 584 if idj.sendEmail {
525 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 585 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
526 } 586 }
587
588 if shouldRetry {
589 nid, err := q.addJob(
590 idj.kind,
591 retry.When, idj.trys(),
592 idj.user, idj.sendEmail, idj.autoAccept,
593 idj.data)
594 if err != nil {
595 log.Printf("error: retry enqueue failed: %v\n", err)
596 } else {
597 log.Printf("info: re-enqueued job with id %d\n", nid)
598 }
599 }
527 }(jc, idj) 600 }(jc, idj)
528 } 601 }
529 } 602 }