Mercurial > gemma
comparison pkg/imports/queue.go @ 5099:3cd736acbad3 queued-stage-done
First version of a reviewed job. I bet it does not work.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 24 Mar 2020 15:46:37 +0100 |
parents | 59a99655f34d |
children | d3a24152b0be |
comparison
equal
deleted
inserted
replaced
5098:52aac557cbd7 | 5099:3cd736acbad3 |
---|---|
68 | 68 |
69 // JobKind is the type of an import. | 69 // JobKind is the type of an import. |
70 // Choose a unique name for every import. | 70 // Choose a unique name for every import. |
71 JobKind string | 71 JobKind string |
72 | 72 |
73 // JobCreator is used to bring a job to life as it is stored | 73 Dependencies interface { |
74 // in pure meta-data form to the database. | |
75 JobCreator interface { | |
76 // Description is the long name of the import. | |
77 Description() string | |
78 // Create build the actual job. | |
79 Create() Job | |
80 // Depends returns two lists of ressources locked by this type of import. | 74 // Depends returns two lists of ressources locked by this type of import. |
81 // Imports are run concurrently if they have disjoint sets | 75 // Imports are run concurrently if they have disjoint sets |
82 // of dependencies. | 76 // of dependencies. |
83 // The first list are locked exclusively. | 77 // The first list are locked exclusively. |
84 // The second allows multiple read users but only one writing one. | 78 // The second allows multiple read users but only one writing one. |
85 Depends() [2][]string | 79 Depends() [2][]string |
80 } | |
81 | |
82 // JobCreator is used to bring a job to life as it is stored | |
83 // in pure meta-data form to the database. | |
84 JobCreator interface { | |
85 Dependencies | |
86 // Description is the long name of the import. | |
87 Description() string | |
88 // Create build the actual job. | |
89 Create() Job | |
86 // StageDone is called if an import is positively reviewed | 90 // StageDone is called if an import is positively reviewed |
87 // (state = accepted). This can be used to finalize the imported | 91 // (state = accepted). This can be used to finalize the imported |
88 // data to move it e.g from the staging area. | 92 // data to move it e.g from the staging area. |
89 StageDone(context.Context, *sql.Tx, int64, Feedback) error | 93 StageDone(context.Context, *sql.Tx, int64, Feedback) error |
90 // AutoAccept indicates that imports of this kind | 94 // AutoAccept indicates that imports of this kind |
129 "failed", | 133 "failed", |
130 "unchanged", | 134 "unchanged", |
131 "pending", | 135 "pending", |
132 "accepted", | 136 "accepted", |
133 "declined", | 137 "declined", |
138 "reviewed", | |
134 } | 139 } |
135 ) | 140 ) |
136 | 141 |
137 const ( | 142 const ( |
138 queueUser = "sys_admin" | 143 queueUser = "sys_admin" |
322 return nil | 327 return nil |
323 } | 328 } |
324 return d | 329 return d |
325 } | 330 } |
326 | 331 |
327 func (q *importQueue) lockDependencies(jc JobCreator) { | 332 func (q *importQueue) lockDependencies(d Dependencies) { |
328 deps := jc.Depends() | 333 deps := d.Depends() |
329 q.creatorsMu.Lock() | 334 q.creatorsMu.Lock() |
330 defer q.creatorsMu.Unlock() | 335 defer q.creatorsMu.Unlock() |
331 for _, d := range deps[0] { | 336 for _, d := range deps[0] { |
332 q.usedDeps[d] = runExclusive | 337 q.usedDeps[d] = runExclusive |
333 } | 338 } |
334 for _, d := range deps[1] { | 339 for _, d := range deps[1] { |
335 q.usedDeps[d]++ | 340 q.usedDeps[d]++ |
336 } | 341 } |
337 } | 342 } |
338 | 343 |
339 func (q *importQueue) unlockDependencies(jc JobCreator) { | 344 func (q *importQueue) unlockDependencies(d Dependencies) { |
340 deps := jc.Depends() | 345 deps := d.Depends() |
341 q.creatorsMu.Lock() | 346 q.creatorsMu.Lock() |
342 defer q.creatorsMu.Unlock() | 347 defer q.creatorsMu.Unlock() |
343 for _, d := range deps[0] { | 348 for _, d := range deps[0] { |
344 q.usedDeps[d] = 0 | 349 q.usedDeps[d] = 0 |
345 } | 350 } |
431 | 436 |
432 const ( | 437 const ( |
433 isPendingSQL = ` | 438 isPendingSQL = ` |
434 SELECT | 439 SELECT |
435 state = 'pending'::import_state, | 440 state = 'pending'::import_state, |
436 kind, | 441 kind |
437 username | |
438 FROM import.imports | 442 FROM import.imports |
439 WHERE id = $1` | 443 WHERE id = $1` |
440 | |
441 reviewSQL = ` | |
442 UPDATE import.imports SET | |
443 state = $1::import_state, | |
444 changed = CURRENT_TIMESTAMP, | |
445 signer = $2 | |
446 WHERE id = $3` | |
447 | |
448 deleteImportDataSQL = `SELECT import.del_import($1)` | |
449 | |
450 deleteImportTrackSQL = ` | |
451 DELETE FROM import.track_imports WHERE import_id = $1` | |
452 | |
453 logDecisionSQL = ` | |
454 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` | |
455 ) | 444 ) |
456 | 445 |
457 func (q *importQueue) decideImportTx( | 446 func (q *importQueue) decideImportTx( |
458 ctx context.Context, | 447 ctx context.Context, |
459 tx *sql.Tx, | 448 tx *sql.Tx, |
462 reviewer string, | 451 reviewer string, |
463 ) error { | 452 ) error { |
464 var ( | 453 var ( |
465 pending bool | 454 pending bool |
466 kind string | 455 kind string |
467 user string | |
468 ) | 456 ) |
469 | 457 |
470 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { | 458 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { |
471 case err == sql.ErrNoRows: | 459 case err == sql.ErrNoRows: |
472 return fmt.Errorf("cannot find import #%d", id) | 460 return fmt.Errorf("cannot find import #%d", id) |
473 case err != nil: | 461 case err != nil: |
474 return err | 462 return err |
475 case !pending: | 463 case !pending: |
479 jc := q.jobCreator(JobKind(kind)) | 467 jc := q.jobCreator(JobKind(kind)) |
480 if jc == nil { | 468 if jc == nil { |
481 return fmt.Errorf("no job creator for kind '%s'", kind) | 469 return fmt.Errorf("no job creator for kind '%s'", kind) |
482 } | 470 } |
483 | 471 |
484 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { | 472 r := &reviewed{ |
485 txUser, err := conn.BeginTx(ctx, nil) | 473 ID: id, |
486 if err != nil { | 474 Accepted: accepted, |
487 return err | 475 } |
488 } | 476 serialized, err := common.ToJSONString(r) |
489 defer txUser.Rollback() | 477 if err != nil { |
490 | |
491 if accepted { | |
492 feedback := logFeedback(id) | |
493 err = jc.StageDone(ctx, txUser, id, feedback) | |
494 } else { | |
495 _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) | |
496 } | |
497 | |
498 if err == nil { | |
499 err = txUser.Commit() | |
500 } | |
501 | |
502 return err | 478 return err |
503 }); err != nil { | 479 } |
480 rID, err := q.addJob( | |
481 reviewedJobKind, | |
482 time.Now(), | |
483 nil, | |
484 nil, | |
485 reviewer, | |
486 false, | |
487 serialized) | |
488 log.Printf("info: add review job %d\n", rID) | |
489 if err != nil { | |
504 return err | 490 return err |
505 } | 491 } |
506 | 492 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
507 // Remove the import track | |
508 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { | |
509 return err | |
510 } | |
511 | |
512 var state string | |
513 if accepted { | |
514 state = "accepted" | |
515 } else { | |
516 state = "declined" | |
517 } | |
518 | |
519 logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) | |
520 | |
521 if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { | |
522 return err | |
523 } | |
524 | |
525 _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) | |
526 return err | 493 return err |
527 } | 494 } |
528 | 495 |
529 func (q *importQueue) decideImport( | 496 func (q *importQueue) decideImport( |
530 ctx context.Context, | 497 ctx context.Context, |
767 if jc == nil { | 734 if jc == nil { |
768 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) | 735 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
769 continue | 736 continue |
770 } | 737 } |
771 | 738 |
739 job := jc.Create() | |
740 if err := common.FromJSONString(idj.data, job); err != nil { | |
741 errorAndFail(idj.id, "failed to create job for import #%d: %v", | |
742 idj.id, err) | |
743 continue | |
744 } | |
745 | |
746 var dependencies Dependencies | |
747 if deps, ok := job.(Dependencies); ok { | |
748 dependencies = deps | |
749 } else { | |
750 dependencies = jc | |
751 } | |
752 | |
772 // Lock dependencies. | 753 // Lock dependencies. |
773 q.lockDependencies(jc) | 754 q.lockDependencies(dependencies) |
774 | 755 |
775 go func(jc JobCreator, idj *idJob) { | 756 go func(jc JobCreator, idj *idJob) { |
776 | 757 |
777 // Unlock the dependencies. | 758 // Unlock the dependencies. |
778 defer func() { | 759 defer func() { |
779 q.unlockDependencies(jc) | 760 q.unlockDependencies(dependencies) |
780 select { | 761 select { |
781 case q.signalChan <- struct{}{}: | 762 case q.signalChan <- struct{}{}: |
782 default: | 763 default: |
783 } | 764 } |
784 }() | 765 }() |
785 | |
786 job := jc.Create() | |
787 if err := common.FromJSONString(idj.data, job); err != nil { | |
788 errorAndFail(idj.id, "failed to create job for import #%d: %v", | |
789 idj.id, err) | |
790 return | |
791 } | |
792 | 766 |
793 feedback := logFeedback(idj.id) | 767 feedback := logFeedback(idj.id) |
794 | 768 |
795 feedback.Info("import #%d started", idj.id) | 769 feedback.Info("import #%d started", idj.id) |
796 | 770 |