changeset 5100:d3a24152b0be queued-stage-done

Register a kind#rev job factory for any job factory.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 24 Mar 2020 16:42:10 +0100
parents 3cd736acbad3
children 1b0b13e70bc1
files pkg/imports/queue.go pkg/imports/reviewed.go
diffstat 2 files changed, 143 insertions(+), 194 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Tue Mar 24 15:46:37 2020 +0100
+++ b/pkg/imports/queue.go	Tue Mar 24 16:42:10 2020 +0100
@@ -70,19 +70,15 @@
 	// Choose a unique name for every import.
 	JobKind string
 
-	Dependencies interface {
+	// JobCreator is used to bring a job to life as it is stored
+	// in pure meta-data form to the database.
+	JobCreator interface {
 		// Depends returns two lists of ressources locked by this type of import.
 		// Imports are run concurrently if they have disjoint sets
 		// of dependencies.
 		// The first list are locked exclusively.
 		// The second allows multiple read users but only one writing one.
 		Depends() [2][]string
-	}
-
-	// JobCreator is used to bring a job to life as it is stored
-	// in pure meta-data form to the database.
-	JobCreator interface {
-		Dependencies
 		// Description is the long name of the import.
 		Description() string
 		// Create build the actual job.
@@ -220,10 +216,114 @@
 	return string(ue)
 }
 
+type reviewedJobCreator struct {
+	dependencies [2][]string
+}
+
+func (*reviewedJobCreator) AutoAccept() bool {
+	return true
+}
+
+func (r *reviewedJobCreator) Depends() [2][]string {
+	return r.dependencies
+}
+
+func (*reviewedJobCreator) Description() string {
+	return "review job creator"
+}
+
+func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
+	return nil
+}
+
+type reviewedJob struct {
+	ID       int64 `json:"id"`
+	Accepted bool  `json:"bool"`
+}
+
+func (*reviewedJobCreator) Create() Job {
+	return new(reviewedJob)
+}
+
+func (*reviewedJob) CleanUp() error { return nil }
+
+func (rj *reviewedJob) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	var signer string
+	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
+		return nil, err
+	}
+
+	var user, kind string
+	if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
+		return nil, err
+	}
+
+	jc := FindJobCreator(JobKind(kind))
+	if jc == nil {
+		return nil, fmt.Errorf("no job creator found for '%s'", kind)
+	}
+
+	importFeedback := logFeedback(rj.ID)
+
+	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		userTx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer userTx.Rollback()
+
+		if rj.Accepted {
+			err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
+		} else {
+			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
+		}
+		if err == nil {
+			err = userTx.Commit()
+		}
+		return err
+	}); err != nil {
+		return nil, err
+	}
+
+	// Remove the import track
+	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
+		return nil, err
+	}
+
+	var state string
+	if rj.Accepted {
+		state = "accepted"
+	} else {
+		state = "declined"
+	}
+
+	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
+		return nil, err
+	}
+
+	importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)
+
+	return nil, tx.Commit()
+}
+
 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	q.creators[kind] = jc
+	q.creators[kind+"#review"] = &reviewedJobCreator{dependencies: jc.Depends()}
+
 }
 
 // FindJobCreator looks up a JobCreator in the global import queue.
@@ -329,8 +429,8 @@
 	return d
 }
 
-func (q *importQueue) lockDependencies(d Dependencies) {
-	deps := d.Depends()
+func (q *importQueue) lockDependencies(jc JobCreator) {
+	deps := jc.Depends()
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	for _, d := range deps[0] {
@@ -341,8 +441,8 @@
 	}
 }
 
-func (q *importQueue) unlockDependencies(d Dependencies) {
-	deps := d.Depends()
+func (q *importQueue) unlockDependencies(jc JobCreator) {
+	deps := jc.Depends()
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	for _, d := range deps[0] {
@@ -441,6 +541,27 @@
 	kind
 FROM import.imports
 WHERE id = $1`
+
+	selectKindSQL = `
+SELECT kind from import.imports WHERE ID = $1`
+
+	selectUserSQL = `
+SELECT username from import.imports WHERE ID = $1`
+
+	selectUserKindSQL = `
+SELECT username, kind from import.imports WHERE ID = $1`
+
+	reviewSQL = `
+UPDATE import.imports SET
+  state = $1::import_state,
+  changed = CURRENT_TIMESTAMP,
+  signer = $2
+WHERE id = $3`
+
+	deleteImportDataSQL = `SELECT import.del_import($1)`
+
+	deleteImportTrackSQL = `
+DELETE FROM import.track_imports WHERE import_id = $1`
 )
 
 func (q *importQueue) decideImportTx(
@@ -469,7 +590,7 @@
 		return fmt.Errorf("no job creator for kind '%s'", kind)
 	}
 
-	r := &reviewed{
+	r := &reviewedJob{
 		ID:       id,
 		Accepted: accepted,
 	}
@@ -478,7 +599,7 @@
 		return err
 	}
 	rID, err := q.addJob(
-		reviewedJobKind,
+		JobKind(kind+"#review"),
 		time.Now(),
 		nil,
 		nil,
@@ -736,34 +857,27 @@
 			continue
 		}
 
-		job := jc.Create()
-		if err := common.FromJSONString(idj.data, job); err != nil {
-			errorAndFail(idj.id, "failed to create job for import #%d: %v",
-				idj.id, err)
-			continue
-		}
-
-		var dependencies Dependencies
-		if deps, ok := job.(Dependencies); ok {
-			dependencies = deps
-		} else {
-			dependencies = jc
-		}
-
 		// Lock dependencies.
-		q.lockDependencies(dependencies)
+		q.lockDependencies(jc)
 
 		go func(jc JobCreator, idj *idJob) {
 
 			// Unlock the dependencies.
 			defer func() {
-				q.unlockDependencies(dependencies)
+				q.unlockDependencies(jc)
 				select {
 				case q.signalChan <- struct{}{}:
 				default:
 				}
 			}()
 
+			job := jc.Create()
+			if err := common.FromJSONString(idj.data, job); err != nil {
+				errorAndFail(idj.id, "failed to create job for import #%d: %v",
+					idj.id, err)
+				return
+			}
+
 			feedback := logFeedback(idj.id)
 
 			feedback.Info("import #%d started", idj.id)
--- a/pkg/imports/reviewed.go	Tue Mar 24 15:46:37 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,165 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2018, 2019, 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-	"fmt"
-	"log"
-
-	"gemma.intevation.de/gemma/pkg/auth"
-)
-
-type reviewed struct {
-	ID       int64 `json:"id"`
-	Accepted bool  `json:"bool"`
-}
-
-const (
-	selectKindSQL = `
-SELECT kind from import.imports WHERE ID = $1`
-
-	selectUserSQL = `
-SELECT username from import.imports WHERE ID = $1`
-
-	selectUserKindSQL = `
-SELECT username, kind from import.imports WHERE ID = $1`
-
-	reviewSQL = `
-UPDATE import.imports SET
-  state = $1::import_state,
-  changed = CURRENT_TIMESTAMP,
-  signer = $2
-WHERE id = $3`
-
-	deleteImportDataSQL = `SELECT import.del_import($1)`
-
-	deleteImportTrackSQL = `
-DELETE FROM import.track_imports WHERE import_id = $1`
-)
-
-type reviewedCreator struct{}
-
-const reviewedJobKind JobKind = "reviewed"
-
-func init() { RegisterJobCreator(reviewedJobKind, reviewedCreator{}) }
-
-func (reviewedCreator) Description() string { return "reviewed imports" }
-
-func (reviewedCreator) Depends() [2][]string {
-	return [2][]string{{"reviewed"}}
-}
-
-func (reviewedCreator) AutoAccept() bool { return true }
-
-func (reviewedCreator) Create() Job { return new(reviewed) }
-
-func (r *reviewed) Depends() (deps [2][]string) {
-	ctx := context.Background()
-	var kind string
-	if err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
-		return conn.QueryRowContext(ctx, selectKindSQL, r.ID).Scan(&kind)
-	}); err != nil {
-		log.Printf("error: %v\n", err)
-		return
-	}
-	jc := FindJobCreator(JobKind(kind))
-	if jc == nil {
-		log.Printf("error: no job creator found for '%s'", kind)
-	}
-	deps = jc.Depends()
-	n := make([]string, len(deps[0])+1)
-	n[0] = "reviewed"
-	copy(n[1:], deps[0])
-	deps[0] = n
-	return
-}
-
-func (*reviewed) CleanUp() error { return nil }
-
-func (reviewedCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
-	return nil
-}
-
-func (r *reviewed) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
-	var signer string
-	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
-		return nil, err
-	}
-
-	var user, kind string
-	if err := tx.QueryRowContext(ctx, selectUserKindSQL, r.ID).Scan(&user, &kind); err != nil {
-		return nil, err
-	}
-
-	jc := FindJobCreator(JobKind(kind))
-	if jc == nil {
-		return nil, fmt.Errorf("no job creator found for '%s'", kind)
-	}
-
-	importFeedback := logFeedback(r.ID)
-
-	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		userTx, err := conn.BeginTx(ctx, nil)
-		if err != nil {
-			return err
-		}
-		defer userTx.Rollback()
-
-		if r.Accepted {
-			err = jc.StageDone(ctx, userTx, r.ID, importFeedback)
-		} else {
-			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, r.ID)
-		}
-		if err == nil {
-			err = userTx.Commit()
-		}
-		return err
-	}); err != nil {
-		return nil, err
-	}
-
-	// Remove the import track
-	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, r.ID); err != nil {
-		return nil, err
-	}
-
-	var state string
-	if r.Accepted {
-		state = "accepted"
-	} else {
-		state = "declined"
-	}
-
-	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, r.ID); err != nil {
-		return nil, err
-	}
-
-	importFeedback.Info("User '%s' %s import %d.", signer, state, r.ID)
-
-	return nil, tx.Commit()
-}