Mercurial > gemma
comparison pkg/imports/queue.go @ 1754:807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 10 Jan 2019 16:19:26 +0100 |
parents | c3a6aaf926c3 |
children | 5b440fcae1a3 |
comparison
equal
deleted
inserted
replaced
1753:68bd990dd8e5 | 1754:807569b08513 |
---|---|
89 Depends() []string | 89 Depends() []string |
90 // StageDone is called if an import is positively reviewed | 90 // StageDone is called if an import is positively reviewed |
91 // (state = accepted). This can be used to finalize the imported | 91 // (state = accepted). This can be used to finalize the imported |
92 // data to move it e.g from the staging area. | 92 // data to move it e.g from the staging area. |
93 StageDone(context.Context, *sql.Tx, int64) error | 93 StageDone(context.Context, *sql.Tx, int64) error |
94 // AutoAccept indicates that imports of this kind | |
95 // don't need a review. | |
96 AutoAccept() bool | |
94 } | 97 } |
95 | 98 |
96 idJob struct { | 99 idJob struct { |
97 id int64 | 100 id int64 |
98 kind JobKind | 101 kind JobKind |
99 user string | 102 user string |
100 trysLeft sql.NullInt64 | 103 trysLeft sql.NullInt64 |
101 sendEmail bool | 104 sendEmail bool |
102 autoAccept bool | 105 data string |
103 data string | |
104 } | 106 } |
105 ) | 107 ) |
106 | 108 |
107 const pollDuration = time.Second * 10 | 109 const pollDuration = time.Second * 10 |
108 | 110 |
143 kind, | 145 kind, |
144 due, | 146 due, |
145 trys_left, | 147 trys_left, |
146 username, | 148 username, |
147 send_email, | 149 send_email, |
148 auto_accept, | |
149 data | 150 data |
150 ) VALUES ( | 151 ) VALUES ( |
151 $1, | 152 $1, |
152 COALESCE($2, CURRENT_TIMESTAMP), | 153 COALESCE($2, CURRENT_TIMESTAMP), |
153 $3, | 154 $3, |
154 $4, | 155 $4, |
155 $5, | 156 $5, |
156 $6, | 157 $6, |
157 $7 | |
158 ) RETURNING id` | 158 ) RETURNING id` |
159 | 159 |
160 selectJobSQL = ` | 160 selectJobSQL = ` |
161 SELECT | 161 SELECT |
162 id, | 162 id, |
163 kind, | 163 kind, |
164 trys_left, | 164 trys_left, |
165 username, | 165 username, |
166 send_email, | 166 send_email, |
167 auto_accept, | |
168 data | 167 data |
169 FROM waterway.imports | 168 FROM waterway.imports |
170 WHERE | 169 WHERE |
171 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND | 170 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND |
172 state = 'queued'::waterway.import_state AND enqueued IN ( | 171 state = 'queued'::waterway.import_state AND enqueued IN ( |
273 func (q *importQueue) addJob( | 272 func (q *importQueue) addJob( |
274 kind JobKind, | 273 kind JobKind, |
275 due time.Time, | 274 due time.Time, |
276 trysLeft int, | 275 trysLeft int, |
277 user string, | 276 user string, |
278 sendEmail, autoAccept bool, | 277 sendEmail bool, |
279 data string, | 278 data string, |
280 ) (int64, error) { | 279 ) (int64, error) { |
281 ctx := context.Background() | 280 ctx := context.Background() |
282 var id int64 | 281 var id int64 |
283 if due.IsZero() { | 282 if due.IsZero() { |
294 string(kind), | 293 string(kind), |
295 due, | 294 due, |
296 tl, | 295 tl, |
297 user, | 296 user, |
298 sendEmail, | 297 sendEmail, |
299 autoAccept, | |
300 data).Scan(&id) | 298 data).Scan(&id) |
301 }) | 299 }) |
302 if err == nil { | 300 if err == nil { |
303 select { | 301 select { |
304 case q.signalChan <- struct{}{}: | 302 case q.signalChan <- struct{}{}: |
315 func AddJob( | 313 func AddJob( |
316 kind JobKind, | 314 kind JobKind, |
317 due time.Time, | 315 due time.Time, |
318 trysLeft int, | 316 trysLeft int, |
319 user string, | 317 user string, |
320 sendEmail, autoAccept bool, | 318 sendEmail bool, |
321 data string, | 319 data string, |
322 ) (int64, error) { | 320 ) (int64, error) { |
323 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data) | 321 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data) |
324 } | 322 } |
325 | 323 |
326 type logFeedback int64 | 324 type logFeedback int64 |
327 | 325 |
328 func (lf logFeedback) log(kind, format string, args ...interface{}) { | 326 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
405 &ji.id, | 403 &ji.id, |
406 &ji.kind, | 404 &ji.kind, |
407 &ji.trysLeft, | 405 &ji.trysLeft, |
408 &ji.user, | 406 &ji.user, |
409 &ji.sendEmail, | 407 &ji.sendEmail, |
410 &ji.autoAccept, | |
411 &ji.data, | 408 &ji.data, |
412 ); err != nil { | 409 ); err != nil { |
413 return err | 410 return err |
414 } | 411 } |
415 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) | 412 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
569 | 566 |
570 var state string | 567 var state string |
571 switch { | 568 switch { |
572 case errDo != nil || errCleanup != nil: | 569 case errDo != nil || errCleanup != nil: |
573 state = "failed" | 570 state = "failed" |
574 case idj.autoAccept: | 571 case jc.AutoAccept(): |
575 state = "accepted" | 572 state = "accepted" |
576 default: | 573 default: |
577 state = "pending" | 574 state = "pending" |
578 } | 575 } |
579 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { | 576 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
586 | 583 |
587 if shouldRetry { | 584 if shouldRetry { |
588 nid, err := q.addJob( | 585 nid, err := q.addJob( |
589 idj.kind, | 586 idj.kind, |
590 retry.When, idj.trys(), | 587 retry.When, idj.trys(), |
591 idj.user, idj.sendEmail, idj.autoAccept, | 588 idj.user, idj.sendEmail, |
592 idj.data) | 589 idj.data) |
593 if err != nil { | 590 if err != nil { |
594 log.Printf("error: retry enqueue failed: %v\n", err) | 591 log.Printf("error: retry enqueue failed: %v\n", err) |
595 } else { | 592 } else { |
596 log.Printf("info: re-enqueued job with id %d\n", nid) | 593 log.Printf("info: re-enqueued job with id %d\n", nid) |