Mercurial > gemma
comparison pkg/controllers/importqueue.go @ 5028:d727641911a5
Moved import desision logic to import queue (where it belongs).
Major change: StageDone of the import job is executed by the original user
who does the import.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 18 Mar 2020 17:52:00 +0100 |
parents | 1fef4679b07a |
children | 66270586031a |
comparison
equal
deleted
inserted
replaced
5027:fa662af56a3d | 5028:d727641911a5 |
---|---|
581 jr = mw.JSONResult{Code: http.StatusNoContent} | 581 jr = mw.JSONResult{Code: http.StatusNoContent} |
582 | 582 |
583 return | 583 return |
584 } | 584 } |
585 | 585 |
586 const ( | |
587 isPendingSQL = ` | |
588 SELECT state = 'pending'::import_state, kind | |
589 FROM import.imports | |
590 WHERE id = $1` | |
591 | |
592 reviewSQL = ` | |
593 UPDATE import.imports SET | |
594 state = $1::import_state, | |
595 changed = CURRENT_TIMESTAMP, | |
596 signer = $2 | |
597 WHERE id = $3` | |
598 | |
599 deleteImportDataSQL = `SELECT import.del_import($1)` | |
600 | |
601 deleteImportTrackSQL = ` | |
602 DELETE FROM import.track_imports WHERE import_id = $1` | |
603 | |
604 logDecisionSQL = ` | |
605 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` | |
606 ) | |
607 | |
608 func reviewImports(req *http.Request) (mw.JSONResult, error) { | 586 func reviewImports(req *http.Request) (mw.JSONResult, error) { |
609 | 587 |
610 rs := *mw.JSONInput(req).(*[]models.Review) | 588 rs := *mw.JSONInput(req).(*[]models.Review) |
611 | 589 |
612 type reviewResult struct { | 590 type reviewResult struct { |
615 Error string `json:"error,omitempty"` | 593 Error string `json:"error,omitempty"` |
616 } | 594 } |
617 | 595 |
618 results := make([]reviewResult, len(rs)) | 596 results := make([]reviewResult, len(rs)) |
619 | 597 |
620 conn := mw.JSONConn(req) | |
621 | |
622 for i := range rs { | 598 for i := range rs { |
623 rev := &rs[i] | 599 rev := &rs[i] |
624 msg, err := decideImport(req, conn, rev.ID, string(rev.State)) | 600 msg, err := decideImport(req, rev.ID, string(rev.State)) |
625 var errString string | 601 var errString string |
626 if err != nil { | 602 if err != nil { |
627 errString = err.Error() | 603 errString = err.Error() |
628 } | 604 } |
629 results[i] = reviewResult{ | 605 results[i] = reviewResult{ |
641 vars := mux.Vars(req) | 617 vars := mux.Vars(req) |
642 id, _ := strconv.ParseInt(vars["id"], 10, 64) | 618 id, _ := strconv.ParseInt(vars["id"], 10, 64) |
643 state := vars["state"] | 619 state := vars["state"] |
644 | 620 |
645 var msg string | 621 var msg string |
646 if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil { | 622 if msg, err = decideImport(req, id, state); err != nil { |
647 return | 623 return |
648 } | 624 } |
649 | 625 |
650 result := struct { | 626 result := struct { |
651 Message string `json:"message"` | 627 Message string `json:"message"` |
657 return | 633 return |
658 } | 634 } |
659 | 635 |
660 func decideImport( | 636 func decideImport( |
661 req *http.Request, | 637 req *http.Request, |
662 conn *sql.Conn, | |
663 id int64, | 638 id int64, |
664 state string, | 639 state string, |
665 ) (message string, err error) { | 640 ) (message string, err error) { |
666 ctx := req.Context() | 641 ctx := req.Context() |
667 var tx *sql.Tx | 642 |
668 if tx, err = conn.BeginTx(ctx, nil); err != nil { | 643 accepted := state == "accepted" |
669 return | 644 |
670 } | 645 session, _ := auth.GetSession(req) |
671 defer tx.Rollback() | 646 reviewer := session.User |
672 | 647 |
673 var pending bool | 648 if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { |
674 var kind string | |
675 | |
676 err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) | |
677 switch { | |
678 case err == sql.ErrNoRows: | |
679 err = mw.JSONError{ | 649 err = mw.JSONError{ |
680 Code: http.StatusNotFound, | 650 Code: http.StatusBadRequest, |
681 Message: fmt.Sprintf("cannot find import #%d", id), | 651 Message: err.Error(), |
682 } | 652 } |
683 return | |
684 case err != nil: | |
685 return | |
686 case !pending: | |
687 err = mw.JSONError{ | |
688 Code: http.StatusConflict, | |
689 Message: fmt.Sprintf("import #%d is not pending", id), | |
690 } | |
691 return | |
692 } | |
693 | |
694 if state == "accepted" { | |
695 if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { | |
696 if err = jc.StageDone(ctx, tx, id); err != nil { | |
697 return | |
698 } | |
699 } | |
700 | |
701 } else { | |
702 if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { | |
703 return | |
704 } | |
705 } | |
706 | |
707 // Remove the import track | |
708 if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { | |
709 return | |
710 } | |
711 | |
712 // Log the decision and set the final state. | |
713 session, _ := auth.GetSession(req) | |
714 who := session.User | |
715 | |
716 if _, err = tx.ExecContext(ctx, logDecisionSQL, id, | |
717 fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { | |
718 return | |
719 } | |
720 | |
721 if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { | |
722 return | |
723 } | |
724 | |
725 if err = tx.Commit(); err != nil { | |
726 return | 653 return |
727 } | 654 } |
728 | 655 |
729 message = fmt.Sprintf( | 656 message = fmt.Sprintf( |
730 "Import #%d successfully changed to state '%s'.", id, state) | 657 "Import #%d successfully changed to state '%s'.", id, state) |