comparison pkg/controllers/importqueue.go @ 5028:d727641911a5

Moved import desision logic to import queue (where it belongs). Major change: StageDone of the import job is executed by the original user who does the import.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 18 Mar 2020 17:52:00 +0100
parents 1fef4679b07a
children 66270586031a
comparison
equal deleted inserted replaced
5027:fa662af56a3d 5028:d727641911a5
581 jr = mw.JSONResult{Code: http.StatusNoContent} 581 jr = mw.JSONResult{Code: http.StatusNoContent}
582 582
583 return 583 return
584 } 584 }
585 585
586 const (
587 isPendingSQL = `
588 SELECT state = 'pending'::import_state, kind
589 FROM import.imports
590 WHERE id = $1`
591
592 reviewSQL = `
593 UPDATE import.imports SET
594 state = $1::import_state,
595 changed = CURRENT_TIMESTAMP,
596 signer = $2
597 WHERE id = $3`
598
599 deleteImportDataSQL = `SELECT import.del_import($1)`
600
601 deleteImportTrackSQL = `
602 DELETE FROM import.track_imports WHERE import_id = $1`
603
604 logDecisionSQL = `
605 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
606 )
607
608 func reviewImports(req *http.Request) (mw.JSONResult, error) { 586 func reviewImports(req *http.Request) (mw.JSONResult, error) {
609 587
610 rs := *mw.JSONInput(req).(*[]models.Review) 588 rs := *mw.JSONInput(req).(*[]models.Review)
611 589
612 type reviewResult struct { 590 type reviewResult struct {
615 Error string `json:"error,omitempty"` 593 Error string `json:"error,omitempty"`
616 } 594 }
617 595
618 results := make([]reviewResult, len(rs)) 596 results := make([]reviewResult, len(rs))
619 597
620 conn := mw.JSONConn(req)
621
622 for i := range rs { 598 for i := range rs {
623 rev := &rs[i] 599 rev := &rs[i]
624 msg, err := decideImport(req, conn, rev.ID, string(rev.State)) 600 msg, err := decideImport(req, rev.ID, string(rev.State))
625 var errString string 601 var errString string
626 if err != nil { 602 if err != nil {
627 errString = err.Error() 603 errString = err.Error()
628 } 604 }
629 results[i] = reviewResult{ 605 results[i] = reviewResult{
641 vars := mux.Vars(req) 617 vars := mux.Vars(req)
642 id, _ := strconv.ParseInt(vars["id"], 10, 64) 618 id, _ := strconv.ParseInt(vars["id"], 10, 64)
643 state := vars["state"] 619 state := vars["state"]
644 620
645 var msg string 621 var msg string
646 if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil { 622 if msg, err = decideImport(req, id, state); err != nil {
647 return 623 return
648 } 624 }
649 625
650 result := struct { 626 result := struct {
651 Message string `json:"message"` 627 Message string `json:"message"`
657 return 633 return
658 } 634 }
659 635
660 func decideImport( 636 func decideImport(
661 req *http.Request, 637 req *http.Request,
662 conn *sql.Conn,
663 id int64, 638 id int64,
664 state string, 639 state string,
665 ) (message string, err error) { 640 ) (message string, err error) {
666 ctx := req.Context() 641 ctx := req.Context()
667 var tx *sql.Tx 642
668 if tx, err = conn.BeginTx(ctx, nil); err != nil { 643 accepted := state == "accepted"
669 return 644
670 } 645 session, _ := auth.GetSession(req)
671 defer tx.Rollback() 646 reviewer := session.User
672 647
673 var pending bool 648 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
674 var kind string
675
676 err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
677 switch {
678 case err == sql.ErrNoRows:
679 err = mw.JSONError{ 649 err = mw.JSONError{
680 Code: http.StatusNotFound, 650 Code: http.StatusBadRequest,
681 Message: fmt.Sprintf("cannot find import #%d", id), 651 Message: err.Error(),
682 } 652 }
683 return
684 case err != nil:
685 return
686 case !pending:
687 err = mw.JSONError{
688 Code: http.StatusConflict,
689 Message: fmt.Sprintf("import #%d is not pending", id),
690 }
691 return
692 }
693
694 if state == "accepted" {
695 if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
696 if err = jc.StageDone(ctx, tx, id); err != nil {
697 return
698 }
699 }
700
701 } else {
702 if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
703 return
704 }
705 }
706
707 // Remove the import track
708 if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
709 return
710 }
711
712 // Log the decision and set the final state.
713 session, _ := auth.GetSession(req)
714 who := session.User
715
716 if _, err = tx.ExecContext(ctx, logDecisionSQL, id,
717 fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil {
718 return
719 }
720
721 if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil {
722 return
723 }
724
725 if err = tx.Commit(); err != nil {
726 return 653 return
727 } 654 }
728 655
729 message = fmt.Sprintf( 656 message = fmt.Sprintf(
730 "Import #%d successfully changed to state '%s'.", id, state) 657 "Import #%d successfully changed to state '%s'.", id, state)