changeset 5028:d727641911a5

Moved import desision logic to import queue (where it belongs). Major change: StageDone of the import job is executed by the original user who does the import.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 18 Mar 2020 17:52:00 +0100
parents fa662af56a3d
children 0fedd50dbf52
files pkg/controllers/importqueue.go pkg/controllers/routes.go pkg/imports/queue.go
diffstat 3 files changed, 140 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Wed Mar 18 14:52:36 2020 +0100
+++ b/pkg/controllers/importqueue.go	Wed Mar 18 17:52:00 2020 +0100
@@ -583,28 +583,6 @@
 	return
 }
 
-const (
-	isPendingSQL = `
-SELECT state = 'pending'::import_state, kind
-FROM import.imports
-WHERE id = $1`
-
-	reviewSQL = `
-UPDATE import.imports SET
-  state = $1::import_state,
-  changed = CURRENT_TIMESTAMP,
-  signer = $2
-WHERE id = $3`
-
-	deleteImportDataSQL = `SELECT import.del_import($1)`
-
-	deleteImportTrackSQL = `
-DELETE FROM import.track_imports WHERE import_id = $1`
-
-	logDecisionSQL = `
-INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
-)
-
 func reviewImports(req *http.Request) (mw.JSONResult, error) {
 
 	rs := *mw.JSONInput(req).(*[]models.Review)
@@ -617,11 +595,9 @@
 
 	results := make([]reviewResult, len(rs))
 
-	conn := mw.JSONConn(req)
-
 	for i := range rs {
 		rev := &rs[i]
-		msg, err := decideImport(req, conn, rev.ID, string(rev.State))
+		msg, err := decideImport(req, rev.ID, string(rev.State))
 		var errString string
 		if err != nil {
 			errString = err.Error()
@@ -643,7 +619,7 @@
 	state := vars["state"]
 
 	var msg string
-	if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil {
+	if msg, err = decideImport(req, id, state); err != nil {
 		return
 	}
 
@@ -659,70 +635,21 @@
 
 func decideImport(
 	req *http.Request,
-	conn *sql.Conn,
 	id int64,
 	state string,
 ) (message string, err error) {
 	ctx := req.Context()
-	var tx *sql.Tx
-	if tx, err = conn.BeginTx(ctx, nil); err != nil {
-		return
-	}
-	defer tx.Rollback()
 
-	var pending bool
-	var kind string
+	accepted := state == "accepted"
 
-	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
-	switch {
-	case err == sql.ErrNoRows:
-		err = mw.JSONError{
-			Code:    http.StatusNotFound,
-			Message: fmt.Sprintf("cannot find import #%d", id),
-		}
-		return
-	case err != nil:
-		return
-	case !pending:
-		err = mw.JSONError{
-			Code:    http.StatusConflict,
-			Message: fmt.Sprintf("import #%d is not pending", id),
-		}
-		return
-	}
+	session, _ := auth.GetSession(req)
+	reviewer := session.User
 
-	if state == "accepted" {
-		if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
-			if err = jc.StageDone(ctx, tx, id); err != nil {
-				return
-			}
-		}
-
-	} else {
-		if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
-			return
+	if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
+		err = mw.JSONError{
+			Code:    http.StatusBadRequest,
+			Message: err.Error(),
 		}
-	}
-
-	// Remove the import track
-	if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
-		return
-	}
-
-	// Log the decision and set the final state.
-	session, _ := auth.GetSession(req)
-	who := session.User
-
-	if _, err = tx.ExecContext(ctx, logDecisionSQL, id,
-		fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil {
-		return
-	}
-
-	if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil {
-		return
-	}
-
-	if err = tx.Commit(); err != nil {
 		return
 	}
 
--- a/pkg/controllers/routes.go	Wed Mar 18 14:52:36 2020 +0100
+++ b/pkg/controllers/routes.go	Wed Mar 18 17:52:00 2020 +0100
@@ -313,6 +313,7 @@
 	api.Handle("/imports", waterwayAdmin(&mw.JSONHandler{
 		Input:  func(*http.Request) interface{} { return &[]models.Review{} },
 		Handle: reviewImports,
+		NoConn: true,
 	})).Methods(http.MethodPatch)
 
 	api.Handle("/imports/{id:[0-9]+}", waterwayAdmin(&mw.JSONHandler{
@@ -323,6 +324,7 @@
 	api.Handle("/imports/{id:[0-9]+}/{state:(?:accepted|declined)}",
 		waterwayAdmin(&mw.JSONHandler{
 			Handle: reviewImport,
+			NoConn: true,
 		})).Methods(http.MethodPut)
 
 	// Handler to serve data to the client.
--- a/pkg/imports/queue.go	Wed Mar 18 14:52:36 2020 +0100
+++ b/pkg/imports/queue.go	Wed Mar 18 17:52:00 2020 +0100
@@ -429,6 +429,135 @@
 		data)
 }
 
+const (
+	isPendingSQL = `
+SELECT
+	state = 'pending'::import_state,
+	kind,
+	user
+FROM import.imports
+WHERE id = $1`
+
+	reviewSQL = `
+UPDATE import.imports SET
+  state = $1::import_state,
+  changed = CURRENT_TIMESTAMP,
+  signer = $2
+WHERE id = $3`
+
+	deleteImportDataSQL = `SELECT import.del_import($1)`
+
+	deleteImportTrackSQL = `
+DELETE FROM import.track_imports WHERE import_id = $1`
+
+	logDecisionSQL = `
+INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
+)
+
+func (q *importQueue) decideImportTx(
+	ctx context.Context,
+	tx *sql.Tx,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	var (
+		pending bool
+		kind    string
+		user    string
+	)
+
+	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
+	case err == sql.ErrNoRows:
+		return fmt.Errorf("cannot find import #%d", id)
+	case err != nil:
+		return err
+	case !pending:
+		return fmt.Errorf("#%d is not pending", id)
+	}
+
+	jc := q.jobCreator(JobKind(kind))
+	if jc == nil {
+		return fmt.Errorf("no job creator for kind '%s'", kind)
+	}
+
+	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		txUser, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer txUser.Rollback()
+
+		if accepted {
+			err = jc.StageDone(ctx, txUser, id)
+		} else {
+			_, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
+		}
+
+		if err == nil {
+			err = txUser.Commit()
+		}
+
+		return err
+	}); err != nil {
+		return err
+	}
+
+	// Remove the import track
+	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
+		return err
+	}
+
+	var state string
+	if accepted {
+		state = "accepted"
+	} else {
+		state = "declined"
+	}
+
+	logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
+
+	if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
+		return err
+	}
+
+	_, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
+	return err
+}
+
+func (q *importQueue) decideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+
+	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+}
+
+func DecideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	return iqueue.decideImport(ctx, id, accepted, reviewer)
+}
+
 type logFeedback int64
 
 func (lf logFeedback) log(kind, format string, args ...interface{}) {