diff pkg/imports/queue.go @ 5028:d727641911a5

Moved import desision logic to import queue (where it belongs). Major change: StageDone of the import job is executed by the original user who does the import.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 18 Mar 2020 17:52:00 +0100
parents 47922c1a088d
children 0fedd50dbf52
line wrap: on
line diff
--- a/pkg/imports/queue.go	Wed Mar 18 14:52:36 2020 +0100
+++ b/pkg/imports/queue.go	Wed Mar 18 17:52:00 2020 +0100
@@ -429,6 +429,135 @@
 		data)
 }
 
+const (
+	isPendingSQL = `
+SELECT
+	state = 'pending'::import_state,
+	kind,
+	user
+FROM import.imports
+WHERE id = $1`
+
+	reviewSQL = `
+UPDATE import.imports SET
+  state = $1::import_state,
+  changed = CURRENT_TIMESTAMP,
+  signer = $2
+WHERE id = $3`
+
+	deleteImportDataSQL = `SELECT import.del_import($1)`
+
+	deleteImportTrackSQL = `
+DELETE FROM import.track_imports WHERE import_id = $1`
+
+	logDecisionSQL = `
+INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
+)
+
+func (q *importQueue) decideImportTx(
+	ctx context.Context,
+	tx *sql.Tx,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	var (
+		pending bool
+		kind    string
+		user    string
+	)
+
+	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
+	case err == sql.ErrNoRows:
+		return fmt.Errorf("cannot find import #%d", id)
+	case err != nil:
+		return err
+	case !pending:
+		return fmt.Errorf("#%d is not pending", id)
+	}
+
+	jc := q.jobCreator(JobKind(kind))
+	if jc == nil {
+		return fmt.Errorf("no job creator for kind '%s'", kind)
+	}
+
+	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		txUser, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer txUser.Rollback()
+
+		if accepted {
+			err = jc.StageDone(ctx, txUser, id)
+		} else {
+			_, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
+		}
+
+		if err == nil {
+			err = txUser.Commit()
+		}
+
+		return err
+	}); err != nil {
+		return err
+	}
+
+	// Remove the import track
+	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
+		return err
+	}
+
+	var state string
+	if accepted {
+		state = "accepted"
+	} else {
+		state = "declined"
+	}
+
+	logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
+
+	if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
+		return err
+	}
+
+	_, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
+	return err
+}
+
+func (q *importQueue) decideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+
+	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+}
+
+func DecideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	return iqueue.decideImport(ctx, id, accepted, reviewer)
+}
+
 type logFeedback int64
 
 func (lf logFeedback) log(kind, format string, args ...interface{}) {