comparison pkg/imports/queue.go @ 5099:3cd736acbad3 queued-stage-done

First version of a reviewed job. I bet it does not work.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 24 Mar 2020 15:46:37 +0100
parents 59a99655f34d
children d3a24152b0be
comparison
equal deleted inserted replaced
5098:52aac557cbd7 5099:3cd736acbad3
68 68
69 // JobKind is the type of an import. 69 // JobKind is the type of an import.
70 // Choose a unique name for every import. 70 // Choose a unique name for every import.
71 JobKind string 71 JobKind string
72 72
73 // JobCreator is used to bring a job to life as it is stored 73 Dependencies interface {
74 // in pure meta-data form to the database.
75 JobCreator interface {
76 // Description is the long name of the import.
77 Description() string
78 // Create build the actual job.
79 Create() Job
80 // Depends returns two lists of ressources locked by this type of import. 74 // Depends returns two lists of ressources locked by this type of import.
81 // Imports are run concurrently if they have disjoint sets 75 // Imports are run concurrently if they have disjoint sets
82 // of dependencies. 76 // of dependencies.
83 // The first list are locked exclusively. 77 // The first list are locked exclusively.
84 // The second allows multiple read users but only one writing one. 78 // The second allows multiple read users but only one writing one.
85 Depends() [2][]string 79 Depends() [2][]string
80 }
81
82 // JobCreator is used to bring a job to life as it is stored
83 // in pure meta-data form to the database.
84 JobCreator interface {
85 Dependencies
86 // Description is the long name of the import.
87 Description() string
88 // Create build the actual job.
89 Create() Job
86 // StageDone is called if an import is positively reviewed 90 // StageDone is called if an import is positively reviewed
87 // (state = accepted). This can be used to finalize the imported 91 // (state = accepted). This can be used to finalize the imported
88 // data to move it e.g from the staging area. 92 // data to move it e.g from the staging area.
89 StageDone(context.Context, *sql.Tx, int64, Feedback) error 93 StageDone(context.Context, *sql.Tx, int64, Feedback) error
90 // AutoAccept indicates that imports of this kind 94 // AutoAccept indicates that imports of this kind
129 "failed", 133 "failed",
130 "unchanged", 134 "unchanged",
131 "pending", 135 "pending",
132 "accepted", 136 "accepted",
133 "declined", 137 "declined",
138 "reviewed",
134 } 139 }
135 ) 140 )
136 141
137 const ( 142 const (
138 queueUser = "sys_admin" 143 queueUser = "sys_admin"
322 return nil 327 return nil
323 } 328 }
324 return d 329 return d
325 } 330 }
326 331
327 func (q *importQueue) lockDependencies(jc JobCreator) { 332 func (q *importQueue) lockDependencies(d Dependencies) {
328 deps := jc.Depends() 333 deps := d.Depends()
329 q.creatorsMu.Lock() 334 q.creatorsMu.Lock()
330 defer q.creatorsMu.Unlock() 335 defer q.creatorsMu.Unlock()
331 for _, d := range deps[0] { 336 for _, d := range deps[0] {
332 q.usedDeps[d] = runExclusive 337 q.usedDeps[d] = runExclusive
333 } 338 }
334 for _, d := range deps[1] { 339 for _, d := range deps[1] {
335 q.usedDeps[d]++ 340 q.usedDeps[d]++
336 } 341 }
337 } 342 }
338 343
339 func (q *importQueue) unlockDependencies(jc JobCreator) { 344 func (q *importQueue) unlockDependencies(d Dependencies) {
340 deps := jc.Depends() 345 deps := d.Depends()
341 q.creatorsMu.Lock() 346 q.creatorsMu.Lock()
342 defer q.creatorsMu.Unlock() 347 defer q.creatorsMu.Unlock()
343 for _, d := range deps[0] { 348 for _, d := range deps[0] {
344 q.usedDeps[d] = 0 349 q.usedDeps[d] = 0
345 } 350 }
431 436
432 const ( 437 const (
433 isPendingSQL = ` 438 isPendingSQL = `
434 SELECT 439 SELECT
435 state = 'pending'::import_state, 440 state = 'pending'::import_state,
436 kind, 441 kind
437 username
438 FROM import.imports 442 FROM import.imports
439 WHERE id = $1` 443 WHERE id = $1`
440
441 reviewSQL = `
442 UPDATE import.imports SET
443 state = $1::import_state,
444 changed = CURRENT_TIMESTAMP,
445 signer = $2
446 WHERE id = $3`
447
448 deleteImportDataSQL = `SELECT import.del_import($1)`
449
450 deleteImportTrackSQL = `
451 DELETE FROM import.track_imports WHERE import_id = $1`
452
453 logDecisionSQL = `
454 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
455 ) 444 )
456 445
457 func (q *importQueue) decideImportTx( 446 func (q *importQueue) decideImportTx(
458 ctx context.Context, 447 ctx context.Context,
459 tx *sql.Tx, 448 tx *sql.Tx,
462 reviewer string, 451 reviewer string,
463 ) error { 452 ) error {
464 var ( 453 var (
465 pending bool 454 pending bool
466 kind string 455 kind string
467 user string
468 ) 456 )
469 457
470 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { 458 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
471 case err == sql.ErrNoRows: 459 case err == sql.ErrNoRows:
472 return fmt.Errorf("cannot find import #%d", id) 460 return fmt.Errorf("cannot find import #%d", id)
473 case err != nil: 461 case err != nil:
474 return err 462 return err
475 case !pending: 463 case !pending:
479 jc := q.jobCreator(JobKind(kind)) 467 jc := q.jobCreator(JobKind(kind))
480 if jc == nil { 468 if jc == nil {
481 return fmt.Errorf("no job creator for kind '%s'", kind) 469 return fmt.Errorf("no job creator for kind '%s'", kind)
482 } 470 }
483 471
484 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { 472 r := &reviewed{
485 txUser, err := conn.BeginTx(ctx, nil) 473 ID: id,
486 if err != nil { 474 Accepted: accepted,
487 return err 475 }
488 } 476 serialized, err := common.ToJSONString(r)
489 defer txUser.Rollback() 477 if err != nil {
490
491 if accepted {
492 feedback := logFeedback(id)
493 err = jc.StageDone(ctx, txUser, id, feedback)
494 } else {
495 _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
496 }
497
498 if err == nil {
499 err = txUser.Commit()
500 }
501
502 return err 478 return err
503 }); err != nil { 479 }
480 rID, err := q.addJob(
481 reviewedJobKind,
482 time.Now(),
483 nil,
484 nil,
485 reviewer,
486 false,
487 serialized)
488 log.Printf("info: add review job %d\n", rID)
489 if err != nil {
504 return err 490 return err
505 } 491 }
506 492 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
507 // Remove the import track
508 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
509 return err
510 }
511
512 var state string
513 if accepted {
514 state = "accepted"
515 } else {
516 state = "declined"
517 }
518
519 logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
520
521 if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
522 return err
523 }
524
525 _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
526 return err 493 return err
527 } 494 }
528 495
529 func (q *importQueue) decideImport( 496 func (q *importQueue) decideImport(
530 ctx context.Context, 497 ctx context.Context,
767 if jc == nil { 734 if jc == nil {
768 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) 735 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
769 continue 736 continue
770 } 737 }
771 738
739 job := jc.Create()
740 if err := common.FromJSONString(idj.data, job); err != nil {
741 errorAndFail(idj.id, "failed to create job for import #%d: %v",
742 idj.id, err)
743 continue
744 }
745
746 var dependencies Dependencies
747 if deps, ok := job.(Dependencies); ok {
748 dependencies = deps
749 } else {
750 dependencies = jc
751 }
752
772 // Lock dependencies. 753 // Lock dependencies.
773 q.lockDependencies(jc) 754 q.lockDependencies(dependencies)
774 755
775 go func(jc JobCreator, idj *idJob) { 756 go func(jc JobCreator, idj *idJob) {
776 757
777 // Unlock the dependencies. 758 // Unlock the dependencies.
778 defer func() { 759 defer func() {
779 q.unlockDependencies(jc) 760 q.unlockDependencies(dependencies)
780 select { 761 select {
781 case q.signalChan <- struct{}{}: 762 case q.signalChan <- struct{}{}:
782 default: 763 default:
783 } 764 }
784 }() 765 }()
785
786 job := jc.Create()
787 if err := common.FromJSONString(idj.data, job); err != nil {
788 errorAndFail(idj.id, "failed to create job for import #%d: %v",
789 idj.id, err)
790 return
791 }
792 766
793 feedback := logFeedback(idj.id) 767 feedback := logFeedback(idj.id)
794 768
795 feedback.Info("import #%d started", idj.id) 769 feedback.Info("import #%d started", idj.id)
796 770