Mercurial > gemma
changeset 5028:d727641911a5
Moved import desision logic to import queue (where it belongs).
Major change: StageDone of the import job is executed by the original user
who does the import.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 18 Mar 2020 17:52:00 +0100 |
parents | fa662af56a3d |
children | 0fedd50dbf52 |
files | pkg/controllers/importqueue.go pkg/controllers/routes.go pkg/imports/queue.go |
diffstat | 3 files changed, 140 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go Wed Mar 18 14:52:36 2020 +0100 +++ b/pkg/controllers/importqueue.go Wed Mar 18 17:52:00 2020 +0100 @@ -583,28 +583,6 @@ return } -const ( - isPendingSQL = ` -SELECT state = 'pending'::import_state, kind -FROM import.imports -WHERE id = $1` - - reviewSQL = ` -UPDATE import.imports SET - state = $1::import_state, - changed = CURRENT_TIMESTAMP, - signer = $2 -WHERE id = $3` - - deleteImportDataSQL = `SELECT import.del_import($1)` - - deleteImportTrackSQL = ` -DELETE FROM import.track_imports WHERE import_id = $1` - - logDecisionSQL = ` -INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` -) - func reviewImports(req *http.Request) (mw.JSONResult, error) { rs := *mw.JSONInput(req).(*[]models.Review) @@ -617,11 +595,9 @@ results := make([]reviewResult, len(rs)) - conn := mw.JSONConn(req) - for i := range rs { rev := &rs[i] - msg, err := decideImport(req, conn, rev.ID, string(rev.State)) + msg, err := decideImport(req, rev.ID, string(rev.State)) var errString string if err != nil { errString = err.Error() @@ -643,7 +619,7 @@ state := vars["state"] var msg string - if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil { + if msg, err = decideImport(req, id, state); err != nil { return } @@ -659,70 +635,21 @@ func decideImport( req *http.Request, - conn *sql.Conn, id int64, state string, ) (message string, err error) { ctx := req.Context() - var tx *sql.Tx - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() - var pending bool - var kind string + accepted := state == "accepted" - err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) - switch { - case err == sql.ErrNoRows: - err = mw.JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("cannot find import #%d", id), - } - return - case err != nil: - return - case !pending: - err = mw.JSONError{ - Code: http.StatusConflict, - Message: fmt.Sprintf("import #%d is not pending", id), - } - return - } + session, _ := auth.GetSession(req) + reviewer := session.User - if state == "accepted" { - if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { - if err = jc.StageDone(ctx, tx, id); err != nil { - return - } - } - - } else { - if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { - return + if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { + err = mw.JSONError{ + Code: http.StatusBadRequest, + Message: err.Error(), } - } - - // Remove the import track - if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { - return - } - - // Log the decision and set the final state. - session, _ := auth.GetSession(req) - who := session.User - - if _, err = tx.ExecContext(ctx, logDecisionSQL, id, - fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { - return - } - - if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { - return - } - - if err = tx.Commit(); err != nil { return }
--- a/pkg/controllers/routes.go Wed Mar 18 14:52:36 2020 +0100 +++ b/pkg/controllers/routes.go Wed Mar 18 17:52:00 2020 +0100 @@ -313,6 +313,7 @@ api.Handle("/imports", waterwayAdmin(&mw.JSONHandler{ Input: func(*http.Request) interface{} { return &[]models.Review{} }, Handle: reviewImports, + NoConn: true, })).Methods(http.MethodPatch) api.Handle("/imports/{id:[0-9]+}", waterwayAdmin(&mw.JSONHandler{ @@ -323,6 +324,7 @@ api.Handle("/imports/{id:[0-9]+}/{state:(?:accepted|declined)}", waterwayAdmin(&mw.JSONHandler{ Handle: reviewImport, + NoConn: true, })).Methods(http.MethodPut) // Handler to serve data to the client.
--- a/pkg/imports/queue.go Wed Mar 18 14:52:36 2020 +0100 +++ b/pkg/imports/queue.go Wed Mar 18 17:52:00 2020 +0100 @@ -429,6 +429,135 @@ data) } +const ( + isPendingSQL = ` +SELECT + state = 'pending'::import_state, + kind, + user +FROM import.imports +WHERE id = $1` + + reviewSQL = ` +UPDATE import.imports SET + state = $1::import_state, + changed = CURRENT_TIMESTAMP, + signer = $2 +WHERE id = $3` + + deleteImportDataSQL = `SELECT import.del_import($1)` + + deleteImportTrackSQL = ` +DELETE FROM import.track_imports WHERE import_id = $1` + + logDecisionSQL = ` +INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` +) + +func (q *importQueue) decideImportTx( + ctx context.Context, + tx *sql.Tx, + id int64, + accepted bool, + reviewer string, +) error { + var ( + pending bool + kind string + user string + ) + + switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { + case err == sql.ErrNoRows: + return fmt.Errorf("cannot find import #%d", id) + case err != nil: + return err + case !pending: + return fmt.Errorf("#%d is not pending", id) + } + + jc := q.jobCreator(JobKind(kind)) + if jc == nil { + return fmt.Errorf("no job creator for kind '%s'", kind) + } + + if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { + txUser, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer txUser.Rollback() + + if accepted { + err = jc.StageDone(ctx, txUser, id) + } else { + _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) + } + + if err == nil { + err = txUser.Commit() + } + + return err + }); err != nil { + return err + } + + // Remove the import track + if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { + return err + } + + var state string + if accepted { + state = "accepted" + } else { + state = "declined" + } + + logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) + + if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { + return err + } + + _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) + return err +} + +func (q *importQueue) decideImport( + ctx context.Context, + id int64, + accepted bool, + reviewer string, +) error { + if ctx == nil { + ctx = context.Background() + } + + return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + err = q.decideImportTx(ctx, tx, id, accepted, reviewer) + if err == nil { + err = tx.Commit() + } + return err + }) +} + +func DecideImport( + ctx context.Context, + id int64, + accepted bool, + reviewer string, +) error { + return iqueue.decideImport(ctx, id, accepted, reviewer) +} + type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) {