comparison pkg/imports/queue.go @ 1975:d966f03ea819

Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 23 Jan 2019 11:20:14 +0100
parents 40cbfd268aa9
children 8eeb0b5eb340
comparison
equal deleted inserted replaced
1966:da6dc9b73f34 1975:d966f03ea819
48 // Message is the error message. 48 // Message is the error message.
49 Message string 49 Message string
50 // When is the new scheduled execution time. 50 // When is the new scheduled execution time.
51 When time.Time 51 When time.Time
52 } 52 }
53
54 // UnchangedError may be issued by Do of a Job to indicate
55 // That the database has not changed.
56 UnchangedError string
53 57
54 // Job is the central abstraction of an import job 58 // Job is the central abstraction of an import job
55 // run by the import queue. 59 // run by the import queue.
56 Job interface { 60 Job interface {
57 // Do is called to do the actual import. 61 // Do is called to do the actual import.
125 // ImportStateNames is a list of the states a job can be in. 129 // ImportStateNames is a list of the states a job can be in.
126 ImportStateNames = []string{ 130 ImportStateNames = []string{
127 "queued", 131 "queued",
128 "running", 132 "running",
129 "failed", 133 "failed",
134 "unchanged",
130 "pending", 135 "pending",
131 "accepted", 136 "accepted",
132 "declined", 137 "declined",
133 } 138 }
134 ) 139 )
204 // Error makes RetryError an error. 209 // Error makes RetryError an error.
205 func (re *RetryError) Error() string { 210 func (re *RetryError) Error() string {
206 return re.Message 211 return re.Message
207 } 212 }
208 213
214 // Error makes UnchangedError an error.
215 func (ue UnchangedError) Error() string {
216 return string(ue)
217 }
218
209 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { 219 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
210 q.creatorsMu.Lock() 220 q.creatorsMu.Lock()
211 defer q.creatorsMu.Unlock() 221 defer q.creatorsMu.Unlock()
212 q.creators[kind] = jc 222 q.creators[kind] = jc
213 } 223 }
542 var err error 552 var err error
543 summary, err = job.Do(ctx, idj.id, conn, feedback) 553 summary, err = job.Do(ctx, idj.id, conn, feedback)
544 return err 554 return err
545 }) 555 })
546 })() 556 })()
547 if errDo != nil { 557
548 feedback.Error("error do: %v", errDo) 558 var retry *RetryError
549 } 559 var unchanged bool
550 // Should we try again? 560
551 retry, shouldRetry := errDo.(*RetryError) 561 switch v := errDo.(type) {
552 562 case *RetryError:
553 if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less 563 // NULL -> limit less
554 if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 { 564 if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 {
555 shouldRetry = false 565 feedback.Warn("import should be retried, but no retrys left")
566 } else {
567 if idj.trysLeft.Valid {
568 idj.trysLeft.Int64--
569 }
570 feedback.Info("import failed but will be retried")
571 retry = v
556 } 572 }
573 case UnchangedError:
574 feedback.Info("unchanged: %s", v.Error())
575 unchanged = true
576 default:
577 if errDo != nil {
578 feedback.Error("error in import: %v", errDo)
579 }
557 } 580 }
558 581
559 var errCleanup error 582 var errCleanup error
560 if !shouldRetry { // cleanup debris 583 if retry == nil { // cleanup debris
561 errCleanup = survive(job.CleanUp)() 584 if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
562 if errCleanup != nil {
563 feedback.Error("error cleanup: %v", errCleanup) 585 feedback.Error("error cleanup: %v", errCleanup)
564 } 586 }
565 } 587 }
566 588
567 var state string 589 var state string
568 switch { 590 switch {
591 case unchanged:
592 state = "unchanged"
569 case errDo != nil || errCleanup != nil: 593 case errDo != nil || errCleanup != nil:
570 state = "failed" 594 state = "failed"
571 case jc.AutoAccept(): 595 case jc.AutoAccept():
572 state = "accepted" 596 state = "accepted"
573 default: 597 default:
579 log.Printf("info: import #%d finished: %s\n", idj.id, state) 603 log.Printf("info: import #%d finished: %s\n", idj.id, state)
580 if idj.sendEmail { 604 if idj.sendEmail {
581 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 605 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
582 } 606 }
583 607
584 if shouldRetry { 608 if retry != nil {
585 nid, err := q.addJob( 609 nid, err := q.addJob(
586 idj.kind, 610 idj.kind,
587 retry.When, idj.trys(), 611 retry.When, idj.trys(),
588 idj.user, idj.sendEmail, 612 idj.user, idj.sendEmail,
589 idj.data) 613 idj.data)