Mercurial > gemma
diff pkg/imports/queue.go @ 5028:d727641911a5
Moved import desision logic to import queue (where it belongs).
Major change: StageDone of the import job is executed by the original user
who does the import.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 18 Mar 2020 17:52:00 +0100 |
parents | 47922c1a088d |
children | 0fedd50dbf52 |
line wrap: on
line diff
--- a/pkg/imports/queue.go Wed Mar 18 14:52:36 2020 +0100 +++ b/pkg/imports/queue.go Wed Mar 18 17:52:00 2020 +0100 @@ -429,6 +429,135 @@ data) } +const ( + isPendingSQL = ` +SELECT + state = 'pending'::import_state, + kind, + user +FROM import.imports +WHERE id = $1` + + reviewSQL = ` +UPDATE import.imports SET + state = $1::import_state, + changed = CURRENT_TIMESTAMP, + signer = $2 +WHERE id = $3` + + deleteImportDataSQL = `SELECT import.del_import($1)` + + deleteImportTrackSQL = ` +DELETE FROM import.track_imports WHERE import_id = $1` + + logDecisionSQL = ` +INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` +) + +func (q *importQueue) decideImportTx( + ctx context.Context, + tx *sql.Tx, + id int64, + accepted bool, + reviewer string, +) error { + var ( + pending bool + kind string + user string + ) + + switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { + case err == sql.ErrNoRows: + return fmt.Errorf("cannot find import #%d", id) + case err != nil: + return err + case !pending: + return fmt.Errorf("#%d is not pending", id) + } + + jc := q.jobCreator(JobKind(kind)) + if jc == nil { + return fmt.Errorf("no job creator for kind '%s'", kind) + } + + if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { + txUser, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer txUser.Rollback() + + if accepted { + err = jc.StageDone(ctx, txUser, id) + } else { + _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) + } + + if err == nil { + err = txUser.Commit() + } + + return err + }); err != nil { + return err + } + + // Remove the import track + if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { + return err + } + + var state string + if accepted { + state = "accepted" + } else { + state = "declined" + } + + logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) + + if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { + return err + } + + _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) + return err +} + +func (q *importQueue) decideImport( + ctx context.Context, + id int64, + accepted bool, + reviewer string, +) error { + if ctx == nil { + ctx = context.Background() + } + + return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + err = q.decideImportTx(ctx, tx, id, accepted, reviewer) + if err == nil { + err = tx.Commit() + } + return err + }) +} + +func DecideImport( + ctx context.Context, + id int64, + accepted bool, + reviewer string, +) error { + return iqueue.decideImport(ctx, id, accepted, reviewer) +} + type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) {