comparison pkg/imports/queue.go @ 1985:8eeb0b5eb340

Imports: Made retries and the waiting between the attempts configurable.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 23 Jan 2019 17:58:57 +0100
parents d966f03ea819
children 59055c8301df
comparison
equal deleted inserted replaced
1983:f9f1babe52ae 1985:8eeb0b5eb340
38 Info(fmt string, args ...interface{}) 38 Info(fmt string, args ...interface{})
39 // Warn logs warnings. 39 // Warn logs warnings.
40 Warn(fmt string, args ...interface{}) 40 Warn(fmt string, args ...interface{})
41 // Error logs errors. 41 // Error logs errors.
42 Error(fmt string, args ...interface{}) 42 Error(fmt string, args ...interface{})
43 }
44
45 // RetryError is an error type to signal that
46 // the import should be tried again.
47 RetryError struct {
48 // Message is the error message.
49 Message string
50 // When is the new scheduled execution time.
51 When time.Time
52 } 43 }
53 44
54 // UnchangedError may be issued by Do of a Job to indicate 45 // UnchangedError may be issued by Do of a Job to indicate
55 // That the database has not changed. 46 // That the database has not changed.
56 UnchangedError string 47 UnchangedError string
102 93
103 idJob struct { 94 idJob struct {
104 id int64 95 id int64
105 kind JobKind 96 kind JobKind
106 user string 97 user string
98 waitRetry pgtype.Interval
107 trysLeft sql.NullInt64 99 trysLeft sql.NullInt64
108 sendEmail bool 100 sendEmail bool
109 data string 101 data string
110 } 102 }
111 ) 103 )
148 insertJobSQL = ` 140 insertJobSQL = `
149 INSERT INTO waterway.imports ( 141 INSERT INTO waterway.imports (
150 kind, 142 kind,
151 due, 143 due,
152 trys_left, 144 trys_left,
145 retry_wait,
153 username, 146 username,
154 send_email, 147 send_email,
155 data 148 data
156 ) VALUES ( 149 ) VALUES (
157 $1, 150 $1,
158 COALESCE($2, CURRENT_TIMESTAMP), 151 COALESCE($2, CURRENT_TIMESTAMP),
159 $3, 152 $3,
160 $4, 153 $4,
161 $5, 154 $5,
162 $6 155 $6,
156 $7
163 ) RETURNING id` 157 ) RETURNING id`
164 158
165 selectJobSQL = ` 159 selectJobSQL = `
166 SELECT 160 SELECT
167 id, 161 id,
168 kind, 162 kind,
169 trys_left, 163 trys_left,
164 retry_wait,
170 username, 165 username,
171 send_email, 166 send_email,
172 data 167 data
173 FROM waterway.imports 168 FROM waterway.imports
174 WHERE 169 WHERE
204 199
205 func init() { 200 func init() {
206 go iqueue.importLoop() 201 go iqueue.importLoop()
207 } 202 }
208 203
209 // Error makes RetryError an error.
210 func (re *RetryError) Error() string {
211 return re.Message
212 }
213
214 // Error makes UnchangedError an error. 204 // Error makes UnchangedError an error.
215 func (ue UnchangedError) Error() string { 205 func (ue UnchangedError) Error() string {
216 return string(ue) 206 return string(ue)
217 } 207 }
218 208
264 } 254 }
265 // XXX: Consider using sort.Strings to make output deterministic. 255 // XXX: Consider using sort.Strings to make output deterministic.
266 return names 256 return names
267 } 257 }
268 258
269 func (idj *idJob) trys() int { 259 func (idj *idJob) nextRetry(feedback Feedback) bool {
260 switch {
261 case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
262 return false
263 case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
264 return true
265 case idj.trysLeft.Valid:
266 if idj.trysLeft.Int64 < 1 {
267 feedback.Warn("import should be retried, but no retrys left")
268 } else {
269 idj.trysLeft.Int64--
270 feedback.Info("import failed but will be retried")
271 return true
272 }
273 }
274 return false
275 }
276
277 func (idj *idJob) nextDue() time.Time {
278 now := time.Now()
279 if idj.waitRetry.Status == pgtype.Present {
280 var d time.Duration
281 if err := idj.waitRetry.AssignTo(&d); err != nil {
282 log.Printf("error: converting waitRetry failed: %v\n", err)
283 } else {
284 now = now.Add(d)
285 }
286 }
287 return now
288 }
289
290 func (idj *idJob) trysLeftPointer() *int {
270 if !idj.trysLeft.Valid { 291 if !idj.trysLeft.Valid {
271 return -1 292 return nil
272 } 293 }
273 return int(idj.trysLeft.Int64) 294 t := int(idj.trysLeft.Int64)
295 return &t
296 }
297
298 func (idj *idJob) waitRetryPointer() *time.Duration {
299 if idj.waitRetry.Status != pgtype.Present {
300 return nil
301 }
302 d := new(time.Duration)
303 if err := idj.waitRetry.AssignTo(d); err != nil {
304 log.Printf("error: converting waitRetry failed: %v\n", err)
305 return nil
306 }
307 return d
274 } 308 }
275 309
276 func (q *importQueue) jobCreator(kind JobKind) JobCreator { 310 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
277 q.creatorsMu.Lock() 311 q.creatorsMu.Lock()
278 defer q.creatorsMu.Unlock() 312 defer q.creatorsMu.Unlock()
280 } 314 }
281 315
282 func (q *importQueue) addJob( 316 func (q *importQueue) addJob(
283 kind JobKind, 317 kind JobKind,
284 due time.Time, 318 due time.Time,
285 trysLeft int, 319 trysLeft *int,
320 waitRetry *time.Duration,
286 user string, 321 user string,
287 sendEmail bool, 322 sendEmail bool,
288 data string, 323 data string,
289 ) (int64, error) { 324 ) (int64, error) {
290 ctx := context.Background() 325
291 var id int64 326 var id int64
292 if due.IsZero() { 327 if due.IsZero() {
293 due = time.Now() 328 due = time.Now()
294 } 329 }
330
295 var tl sql.NullInt64 331 var tl sql.NullInt64
296 if trysLeft >= 0 { 332 if trysLeft != nil {
297 tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} 333 tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
298 } 334 }
335
336 var wr pgtype.Interval
337 if waitRetry != nil {
338 if err := wr.Set(*waitRetry); err != nil {
339 return 0, err
340 }
341 } else {
342 wr = pgtype.Interval{Status: pgtype.Null}
343 }
344
345 ctx := context.Background()
299 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { 346 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
300 return conn.QueryRowContext( 347 return conn.QueryRowContext(
301 ctx, 348 ctx,
302 insertJobSQL, 349 insertJobSQL,
303 string(kind), 350 string(kind),
304 due, 351 due,
305 tl, 352 tl,
353 &wr,
306 user, 354 user,
307 sendEmail, 355 sendEmail,
308 data).Scan(&id) 356 data).Scan(&id)
309 }) 357 })
310 if err == nil { 358 if err == nil {
321 // This is gone in a separate Go routine 369 // This is gone in a separate Go routine
322 // so this will not block. 370 // so this will not block.
323 func AddJob( 371 func AddJob(
324 kind JobKind, 372 kind JobKind,
325 due time.Time, 373 due time.Time,
326 trysLeft int, 374 trysLeft *int,
375 waitRetry *time.Duration,
327 user string, 376 user string,
328 sendEmail bool, 377 sendEmail bool,
329 data string, 378 data string,
330 ) (int64, error) { 379 ) (int64, error) {
331 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data) 380 return iqueue.addJob(
381 kind,
382 due,
383 trysLeft,
384 waitRetry,
385 user,
386 sendEmail,
387 data)
332 } 388 }
333 389
334 type logFeedback int64 390 type logFeedback int64
335 391
336 func (lf logFeedback) log(kind, format string, args ...interface{}) { 392 func (lf logFeedback) log(kind, format string, args ...interface{}) {
411 defer tx.Rollback() 467 defer tx.Rollback()
412 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( 468 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
413 &ji.id, 469 &ji.id,
414 &ji.kind, 470 &ji.kind,
415 &ji.trysLeft, 471 &ji.trysLeft,
472 &ji.waitRetry,
416 &ji.user, 473 &ji.user,
417 &ji.sendEmail, 474 &ji.sendEmail,
418 &ji.data, 475 &ji.data,
419 ); err != nil { 476 ); err != nil {
420 return err 477 return err
553 summary, err = job.Do(ctx, idj.id, conn, feedback) 610 summary, err = job.Do(ctx, idj.id, conn, feedback)
554 return err 611 return err
555 }) 612 })
556 })() 613 })()
557 614
558 var retry *RetryError 615 var unchanged, retry bool
559 var unchanged bool 616 if v, ok := errDo.(UnchangedError); ok {
560
561 switch v := errDo.(type) {
562 case *RetryError:
563 // NULL -> limit less
564 if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 {
565 feedback.Warn("import should be retried, but no retrys left")
566 } else {
567 if idj.trysLeft.Valid {
568 idj.trysLeft.Int64--
569 }
570 feedback.Info("import failed but will be retried")
571 retry = v
572 }
573 case UnchangedError:
574 feedback.Info("unchanged: %s", v.Error()) 617 feedback.Info("unchanged: %s", v.Error())
575 unchanged = true 618 unchanged = true
576 default: 619 } else if errDo != nil {
577 if errDo != nil { 620 feedback.Error("error in import: %v", errDo)
578 feedback.Error("error in import: %v", errDo) 621 retry = idj.nextRetry(feedback)
579 }
580 } 622 }
581 623
582 var errCleanup error 624 var errCleanup error
583 if retry == nil { // cleanup debris 625 if retry { // cleanup debris
584 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { 626 if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
585 feedback.Error("error cleanup: %v", errCleanup) 627 feedback.Error("error cleanup: %v", errCleanup)
586 } 628 }
587 } 629 }
588 630
603 log.Printf("info: import #%d finished: %s\n", idj.id, state) 645 log.Printf("info: import #%d finished: %s\n", idj.id, state)
604 if idj.sendEmail { 646 if idj.sendEmail {
605 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 647 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
606 } 648 }
607 649
608 if retry != nil { 650 if retry {
609 nid, err := q.addJob( 651 nid, err := q.addJob(
610 idj.kind, 652 idj.kind,
611 retry.When, idj.trys(), 653 idj.nextDue(),
654 idj.trysLeftPointer(),
655 idj.waitRetryPointer(),
612 idj.user, idj.sendEmail, 656 idj.user, idj.sendEmail,
613 idj.data) 657 idj.data)
614 if err != nil { 658 if err != nil {
615 log.Printf("error: retry enqueue failed: %v\n", err) 659 log.Printf("error: retry enqueue failed: %v\n", err)
616 } else { 660 } else {