diff pkg/imports/queue.go @ 5099:3cd736acbad3 queued-stage-done

First version of a reviewed job. I bet it does not work.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 24 Mar 2020 15:46:37 +0100
parents 59a99655f34d
children d3a24152b0be
line wrap: on
line diff
--- a/pkg/imports/queue.go	Tue Mar 24 13:07:24 2020 +0100
+++ b/pkg/imports/queue.go	Tue Mar 24 15:46:37 2020 +0100
@@ -70,19 +70,23 @@
 	// Choose a unique name for every import.
 	JobKind string
 
-	// JobCreator is used to bring a job to life as it is stored
-	// in pure meta-data form to the database.
-	JobCreator interface {
-		// Description is the long name of the import.
-		Description() string
-		// Create build the actual job.
-		Create() Job
+	Dependencies interface {
 		// Depends returns two lists of ressources locked by this type of import.
 		// Imports are run concurrently if they have disjoint sets
 		// of dependencies.
 		// The first list are locked exclusively.
 		// The second allows multiple read users but only one writing one.
 		Depends() [2][]string
+	}
+
+	// JobCreator is used to bring a job to life as it is stored
+	// in pure meta-data form to the database.
+	JobCreator interface {
+		Dependencies
+		// Description is the long name of the import.
+		Description() string
+		// Create build the actual job.
+		Create() Job
 		// StageDone is called if an import is positively reviewed
 		// (state = accepted). This can be used to finalize the imported
 		// data to move it e.g from the staging area.
@@ -131,6 +135,7 @@
 		"pending",
 		"accepted",
 		"declined",
+		"reviewed",
 	}
 )
 
@@ -324,8 +329,8 @@
 	return d
 }
 
-func (q *importQueue) lockDependencies(jc JobCreator) {
-	deps := jc.Depends()
+func (q *importQueue) lockDependencies(d Dependencies) {
+	deps := d.Depends()
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	for _, d := range deps[0] {
@@ -336,8 +341,8 @@
 	}
 }
 
-func (q *importQueue) unlockDependencies(jc JobCreator) {
-	deps := jc.Depends()
+func (q *importQueue) unlockDependencies(d Dependencies) {
+	deps := d.Depends()
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	for _, d := range deps[0] {
@@ -433,25 +438,9 @@
 	isPendingSQL = `
 SELECT
 	state = 'pending'::import_state,
-	kind,
-	username
+	kind
 FROM import.imports
 WHERE id = $1`
-
-	reviewSQL = `
-UPDATE import.imports SET
-  state = $1::import_state,
-  changed = CURRENT_TIMESTAMP,
-  signer = $2
-WHERE id = $3`
-
-	deleteImportDataSQL = `SELECT import.del_import($1)`
-
-	deleteImportTrackSQL = `
-DELETE FROM import.track_imports WHERE import_id = $1`
-
-	logDecisionSQL = `
-INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
 )
 
 func (q *importQueue) decideImportTx(
@@ -464,10 +453,9 @@
 	var (
 		pending bool
 		kind    string
-		user    string
 	)
 
-	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
+	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
 	case err == sql.ErrNoRows:
 		return fmt.Errorf("cannot find import #%d", id)
 	case err != nil:
@@ -481,48 +469,27 @@
 		return fmt.Errorf("no job creator for kind '%s'", kind)
 	}
 
-	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		txUser, err := conn.BeginTx(ctx, nil)
-		if err != nil {
-			return err
-		}
-		defer txUser.Rollback()
-
-		if accepted {
-			feedback := logFeedback(id)
-			err = jc.StageDone(ctx, txUser, id, feedback)
-		} else {
-			_, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
-		}
-
-		if err == nil {
-			err = txUser.Commit()
-		}
-
-		return err
-	}); err != nil {
+	r := &reviewed{
+		ID:       id,
+		Accepted: accepted,
+	}
+	serialized, err := common.ToJSONString(r)
+	if err != nil {
 		return err
 	}
-
-	// Remove the import track
-	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
+	rID, err := q.addJob(
+		reviewedJobKind,
+		time.Now(),
+		nil,
+		nil,
+		reviewer,
+		false,
+		serialized)
+	log.Printf("info: add review job %d\n", rID)
+	if err != nil {
 		return err
 	}
-
-	var state string
-	if accepted {
-		state = "accepted"
-	} else {
-		state = "declined"
-	}
-
-	logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
-
-	if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
-		return err
-	}
-
-	_, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
+	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
 	return err
 }
 
@@ -769,27 +736,34 @@
 			continue
 		}
 
+		job := jc.Create()
+		if err := common.FromJSONString(idj.data, job); err != nil {
+			errorAndFail(idj.id, "failed to create job for import #%d: %v",
+				idj.id, err)
+			continue
+		}
+
+		var dependencies Dependencies
+		if deps, ok := job.(Dependencies); ok {
+			dependencies = deps
+		} else {
+			dependencies = jc
+		}
+
 		// Lock dependencies.
-		q.lockDependencies(jc)
+		q.lockDependencies(dependencies)
 
 		go func(jc JobCreator, idj *idJob) {
 
 			// Unlock the dependencies.
 			defer func() {
-				q.unlockDependencies(jc)
+				q.unlockDependencies(dependencies)
 				select {
 				case q.signalChan <- struct{}{}:
 				default:
 				}
 			}()
 
-			job := jc.Create()
-			if err := common.FromJSONString(idj.data, job); err != nil {
-				errorAndFail(idj.id, "failed to create job for import #%d: %v",
-					idj.id, err)
-				return
-			}
-
 			feedback := logFeedback(idj.id)
 
 			feedback.Info("import #%d started", idj.id)