comparison pkg/imports/queue.go @ 1754:807569b08513

Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 10 Jan 2019 16:19:26 +0100
parents c3a6aaf926c3
children 5b440fcae1a3
comparison
equal deleted inserted replaced
1753:68bd990dd8e5 1754:807569b08513
89 Depends() []string 89 Depends() []string
90 // StageDone is called if an import is positively reviewed 90 // StageDone is called if an import is positively reviewed
91 // (state = accepted). This can be used to finalize the imported 91 // (state = accepted). This can be used to finalize the imported
92 // data to move it e.g from the staging area. 92 // data to move it e.g from the staging area.
93 StageDone(context.Context, *sql.Tx, int64) error 93 StageDone(context.Context, *sql.Tx, int64) error
94 // AutoAccept indicates that imports of this kind
95 // don't need a review.
96 AutoAccept() bool
94 } 97 }
95 98
96 idJob struct { 99 idJob struct {
97 id int64 100 id int64
98 kind JobKind 101 kind JobKind
99 user string 102 user string
100 trysLeft sql.NullInt64 103 trysLeft sql.NullInt64
101 sendEmail bool 104 sendEmail bool
102 autoAccept bool 105 data string
103 data string
104 } 106 }
105 ) 107 )
106 108
107 const pollDuration = time.Second * 10 109 const pollDuration = time.Second * 10
108 110
143 kind, 145 kind,
144 due, 146 due,
145 trys_left, 147 trys_left,
146 username, 148 username,
147 send_email, 149 send_email,
148 auto_accept,
149 data 150 data
150 ) VALUES ( 151 ) VALUES (
151 $1, 152 $1,
152 COALESCE($2, CURRENT_TIMESTAMP), 153 COALESCE($2, CURRENT_TIMESTAMP),
153 $3, 154 $3,
154 $4, 155 $4,
155 $5, 156 $5,
156 $6, 157 $6,
157 $7
158 ) RETURNING id` 158 ) RETURNING id`
159 159
160 selectJobSQL = ` 160 selectJobSQL = `
161 SELECT 161 SELECT
162 id, 162 id,
163 kind, 163 kind,
164 trys_left, 164 trys_left,
165 username, 165 username,
166 send_email, 166 send_email,
167 auto_accept,
168 data 167 data
169 FROM waterway.imports 168 FROM waterway.imports
170 WHERE 169 WHERE
171 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND 170 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
172 state = 'queued'::waterway.import_state AND enqueued IN ( 171 state = 'queued'::waterway.import_state AND enqueued IN (
273 func (q *importQueue) addJob( 272 func (q *importQueue) addJob(
274 kind JobKind, 273 kind JobKind,
275 due time.Time, 274 due time.Time,
276 trysLeft int, 275 trysLeft int,
277 user string, 276 user string,
278 sendEmail, autoAccept bool, 277 sendEmail bool,
279 data string, 278 data string,
280 ) (int64, error) { 279 ) (int64, error) {
281 ctx := context.Background() 280 ctx := context.Background()
282 var id int64 281 var id int64
283 if due.IsZero() { 282 if due.IsZero() {
294 string(kind), 293 string(kind),
295 due, 294 due,
296 tl, 295 tl,
297 user, 296 user,
298 sendEmail, 297 sendEmail,
299 autoAccept,
300 data).Scan(&id) 298 data).Scan(&id)
301 }) 299 })
302 if err == nil { 300 if err == nil {
303 select { 301 select {
304 case q.signalChan <- struct{}{}: 302 case q.signalChan <- struct{}{}:
315 func AddJob( 313 func AddJob(
316 kind JobKind, 314 kind JobKind,
317 due time.Time, 315 due time.Time,
318 trysLeft int, 316 trysLeft int,
319 user string, 317 user string,
320 sendEmail, autoAccept bool, 318 sendEmail bool,
321 data string, 319 data string,
322 ) (int64, error) { 320 ) (int64, error) {
323 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data) 321 return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data)
324 } 322 }
325 323
326 type logFeedback int64 324 type logFeedback int64
327 325
328 func (lf logFeedback) log(kind, format string, args ...interface{}) { 326 func (lf logFeedback) log(kind, format string, args ...interface{}) {
405 &ji.id, 403 &ji.id,
406 &ji.kind, 404 &ji.kind,
407 &ji.trysLeft, 405 &ji.trysLeft,
408 &ji.user, 406 &ji.user,
409 &ji.sendEmail, 407 &ji.sendEmail,
410 &ji.autoAccept,
411 &ji.data, 408 &ji.data,
412 ); err != nil { 409 ); err != nil {
413 return err 410 return err
414 } 411 }
415 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) 412 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
569 566
570 var state string 567 var state string
571 switch { 568 switch {
572 case errDo != nil || errCleanup != nil: 569 case errDo != nil || errCleanup != nil:
573 state = "failed" 570 state = "failed"
574 case idj.autoAccept: 571 case jc.AutoAccept():
575 state = "accepted" 572 state = "accepted"
576 default: 573 default:
577 state = "pending" 574 state = "pending"
578 } 575 }
579 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { 576 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
586 583
587 if shouldRetry { 584 if shouldRetry {
588 nid, err := q.addJob( 585 nid, err := q.addJob(
589 idj.kind, 586 idj.kind,
590 retry.When, idj.trys(), 587 retry.When, idj.trys(),
591 idj.user, idj.sendEmail, idj.autoAccept, 588 idj.user, idj.sendEmail,
592 idj.data) 589 idj.data)
593 if err != nil { 590 if err != nil {
594 log.Printf("error: retry enqueue failed: %v\n", err) 591 log.Printf("error: retry enqueue failed: %v\n", err)
595 } else { 592 } else {
596 log.Printf("info: re-enqueued job with id %d\n", nid) 593 log.Printf("info: re-enqueued job with id %d\n", nid)