Mercurial > gemma
comparison pkg/imports/queue.go @ 1975:d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 23 Jan 2019 11:20:14 +0100 |
parents | 40cbfd268aa9 |
children | 8eeb0b5eb340 |
comparison
equal
deleted
inserted
replaced
1966:da6dc9b73f34 | 1975:d966f03ea819 |
---|---|
48 // Message is the error message. | 48 // Message is the error message. |
49 Message string | 49 Message string |
50 // When is the new scheduled execution time. | 50 // When is the new scheduled execution time. |
51 When time.Time | 51 When time.Time |
52 } | 52 } |
53 | |
54 // UnchangedError may be issued by Do of a Job to indicate | |
55 // That the database has not changed. | |
56 UnchangedError string | |
53 | 57 |
54 // Job is the central abstraction of an import job | 58 // Job is the central abstraction of an import job |
55 // run by the import queue. | 59 // run by the import queue. |
56 Job interface { | 60 Job interface { |
57 // Do is called to do the actual import. | 61 // Do is called to do the actual import. |
125 // ImportStateNames is a list of the states a job can be in. | 129 // ImportStateNames is a list of the states a job can be in. |
126 ImportStateNames = []string{ | 130 ImportStateNames = []string{ |
127 "queued", | 131 "queued", |
128 "running", | 132 "running", |
129 "failed", | 133 "failed", |
134 "unchanged", | |
130 "pending", | 135 "pending", |
131 "accepted", | 136 "accepted", |
132 "declined", | 137 "declined", |
133 } | 138 } |
134 ) | 139 ) |
204 // Error makes RetryError an error. | 209 // Error makes RetryError an error. |
205 func (re *RetryError) Error() string { | 210 func (re *RetryError) Error() string { |
206 return re.Message | 211 return re.Message |
207 } | 212 } |
208 | 213 |
214 // Error makes UnchangedError an error. | |
215 func (ue UnchangedError) Error() string { | |
216 return string(ue) | |
217 } | |
218 | |
209 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { | 219 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
210 q.creatorsMu.Lock() | 220 q.creatorsMu.Lock() |
211 defer q.creatorsMu.Unlock() | 221 defer q.creatorsMu.Unlock() |
212 q.creators[kind] = jc | 222 q.creators[kind] = jc |
213 } | 223 } |
542 var err error | 552 var err error |
543 summary, err = job.Do(ctx, idj.id, conn, feedback) | 553 summary, err = job.Do(ctx, idj.id, conn, feedback) |
544 return err | 554 return err |
545 }) | 555 }) |
546 })() | 556 })() |
547 if errDo != nil { | 557 |
548 feedback.Error("error do: %v", errDo) | 558 var retry *RetryError |
549 } | 559 var unchanged bool |
550 // Should we try again? | 560 |
551 retry, shouldRetry := errDo.(*RetryError) | 561 switch v := errDo.(type) { |
552 | 562 case *RetryError: |
553 if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less | 563 // NULL -> limit less |
554 if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 { | 564 if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 { |
555 shouldRetry = false | 565 feedback.Warn("import should be retried, but no retrys left") |
566 } else { | |
567 if idj.trysLeft.Valid { | |
568 idj.trysLeft.Int64-- | |
569 } | |
570 feedback.Info("import failed but will be retried") | |
571 retry = v | |
556 } | 572 } |
573 case UnchangedError: | |
574 feedback.Info("unchanged: %s", v.Error()) | |
575 unchanged = true | |
576 default: | |
577 if errDo != nil { | |
578 feedback.Error("error in import: %v", errDo) | |
579 } | |
557 } | 580 } |
558 | 581 |
559 var errCleanup error | 582 var errCleanup error |
560 if !shouldRetry { // cleanup debris | 583 if retry == nil { // cleanup debris |
561 errCleanup = survive(job.CleanUp)() | 584 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
562 if errCleanup != nil { | |
563 feedback.Error("error cleanup: %v", errCleanup) | 585 feedback.Error("error cleanup: %v", errCleanup) |
564 } | 586 } |
565 } | 587 } |
566 | 588 |
567 var state string | 589 var state string |
568 switch { | 590 switch { |
591 case unchanged: | |
592 state = "unchanged" | |
569 case errDo != nil || errCleanup != nil: | 593 case errDo != nil || errCleanup != nil: |
570 state = "failed" | 594 state = "failed" |
571 case jc.AutoAccept(): | 595 case jc.AutoAccept(): |
572 state = "accepted" | 596 state = "accepted" |
573 default: | 597 default: |
579 log.Printf("info: import #%d finished: %s\n", idj.id, state) | 603 log.Printf("info: import #%d finished: %s\n", idj.id, state) |
580 if idj.sendEmail { | 604 if idj.sendEmail { |
581 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) | 605 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
582 } | 606 } |
583 | 607 |
584 if shouldRetry { | 608 if retry != nil { |
585 nid, err := q.addJob( | 609 nid, err := q.addJob( |
586 idj.kind, | 610 idj.kind, |
587 retry.When, idj.trys(), | 611 retry.When, idj.trys(), |
588 idj.user, idj.sendEmail, | 612 idj.user, idj.sendEmail, |
589 idj.data) | 613 idj.data) |