# HG changeset patch # User Sascha L. Teichmann # Date 1585061197 -3600 # Node ID 3cd736acbad3f1b67e5e4a1cb625906bcbcc1e67 # Parent 52aac557cbd736c81b137925ea284e0d32b29f1c First version of a reviewed job. I bet it does not work. diff -r 52aac557cbd7 -r 3cd736acbad3 pkg/imports/queue.go --- a/pkg/imports/queue.go Tue Mar 24 13:07:24 2020 +0100 +++ b/pkg/imports/queue.go Tue Mar 24 15:46:37 2020 +0100 @@ -70,19 +70,23 @@ // Choose a unique name for every import. JobKind string - // JobCreator is used to bring a job to life as it is stored - // in pure meta-data form to the database. - JobCreator interface { - // Description is the long name of the import. - Description() string - // Create build the actual job. - Create() Job + Dependencies interface { // Depends returns two lists of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. // The first list are locked exclusively. // The second allows multiple read users but only one writing one. Depends() [2][]string + } + + // JobCreator is used to bring a job to life as it is stored + // in pure meta-data form to the database. + JobCreator interface { + Dependencies + // Description is the long name of the import. + Description() string + // Create build the actual job. + Create() Job // StageDone is called if an import is positively reviewed // (state = accepted). This can be used to finalize the imported // data to move it e.g from the staging area. @@ -131,6 +135,7 @@ "pending", "accepted", "declined", + "reviewed", } ) @@ -324,8 +329,8 @@ return d } -func (q *importQueue) lockDependencies(jc JobCreator) { - deps := jc.Depends() +func (q *importQueue) lockDependencies(d Dependencies) { + deps := d.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { @@ -336,8 +341,8 @@ } } -func (q *importQueue) unlockDependencies(jc JobCreator) { - deps := jc.Depends() +func (q *importQueue) unlockDependencies(d Dependencies) { + deps := d.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { @@ -433,25 +438,9 @@ isPendingSQL = ` SELECT state = 'pending'::import_state, - kind, - username + kind FROM import.imports WHERE id = $1` - - reviewSQL = ` -UPDATE import.imports SET - state = $1::import_state, - changed = CURRENT_TIMESTAMP, - signer = $2 -WHERE id = $3` - - deleteImportDataSQL = `SELECT import.del_import($1)` - - deleteImportTrackSQL = ` -DELETE FROM import.track_imports WHERE import_id = $1` - - logDecisionSQL = ` -INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` ) func (q *importQueue) decideImportTx( @@ -464,10 +453,9 @@ var ( pending bool kind string - user string ) - switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { + switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { case err == sql.ErrNoRows: return fmt.Errorf("cannot find import #%d", id) case err != nil: @@ -481,48 +469,27 @@ return fmt.Errorf("no job creator for kind '%s'", kind) } - if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { - txUser, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer txUser.Rollback() - - if accepted { - feedback := logFeedback(id) - err = jc.StageDone(ctx, txUser, id, feedback) - } else { - _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) - } - - if err == nil { - err = txUser.Commit() - } - - return err - }); err != nil { + r := &reviewed{ + ID: id, + Accepted: accepted, + } + serialized, err := common.ToJSONString(r) + if err != nil { return err } - - // Remove the import track - if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { + rID, err := q.addJob( + reviewedJobKind, + time.Now(), + nil, + nil, + reviewer, + false, + serialized) + log.Printf("info: add review job %d\n", rID) + if err != nil { return err } - - var state string - if accepted { - state = "accepted" - } else { - state = "declined" - } - - logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) - - if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { - return err - } - - _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) + _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) return err } @@ -769,27 +736,34 @@ continue } + job := jc.Create() + if err := common.FromJSONString(idj.data, job); err != nil { + errorAndFail(idj.id, "failed to create job for import #%d: %v", + idj.id, err) + continue + } + + var dependencies Dependencies + if deps, ok := job.(Dependencies); ok { + dependencies = deps + } else { + dependencies = jc + } + // Lock dependencies. - q.lockDependencies(jc) + q.lockDependencies(dependencies) go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { - q.unlockDependencies(jc) + q.unlockDependencies(dependencies) select { case q.signalChan <- struct{}{}: default: } }() - job := jc.Create() - if err := common.FromJSONString(idj.data, job); err != nil { - errorAndFail(idj.id, "failed to create job for import #%d: %v", - idj.id, err) - return - } - feedback := logFeedback(idj.id) feedback.Info("import #%d started", idj.id) diff -r 52aac557cbd7 -r 3cd736acbad3 pkg/imports/reviewed.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/reviewed.go Tue Mar 24 15:46:37 2020 +0100 @@ -0,0 +1,165 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018, 2019, 2020 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package imports + +import ( + "context" + "database/sql" + "fmt" + "log" + + "gemma.intevation.de/gemma/pkg/auth" +) + +type reviewed struct { + ID int64 `json:"id"` + Accepted bool `json:"bool"` +} + +const ( + selectKindSQL = ` +SELECT kind from import.imports WHERE ID = $1` + + selectUserSQL = ` +SELECT username from import.imports WHERE ID = $1` + + selectUserKindSQL = ` +SELECT username, kind from import.imports WHERE ID = $1` + + reviewSQL = ` +UPDATE import.imports SET + state = $1::import_state, + changed = CURRENT_TIMESTAMP, + signer = $2 +WHERE id = $3` + + deleteImportDataSQL = `SELECT import.del_import($1)` + + deleteImportTrackSQL = ` +DELETE FROM import.track_imports WHERE import_id = $1` +) + +type reviewedCreator struct{} + +const reviewedJobKind JobKind = "reviewed" + +func init() { RegisterJobCreator(reviewedJobKind, reviewedCreator{}) } + +func (reviewedCreator) Description() string { return "reviewed imports" } + +func (reviewedCreator) Depends() [2][]string { + return [2][]string{{"reviewed"}} +} + +func (reviewedCreator) AutoAccept() bool { return true } + +func (reviewedCreator) Create() Job { return new(reviewed) } + +func (r *reviewed) Depends() (deps [2][]string) { + ctx := context.Background() + var kind string + if err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { + return conn.QueryRowContext(ctx, selectKindSQL, r.ID).Scan(&kind) + }); err != nil { + log.Printf("error: %v\n", err) + return + } + jc := FindJobCreator(JobKind(kind)) + if jc == nil { + log.Printf("error: no job creator found for '%s'", kind) + } + deps = jc.Depends() + n := make([]string, len(deps[0])+1) + n[0] = "reviewed" + copy(n[1:], deps[0]) + deps[0] = n + return +} + +func (*reviewed) CleanUp() error { return nil } + +func (reviewedCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { + return nil +} + +func (r *reviewed) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + var signer string + if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { + return nil, err + } + + var user, kind string + if err := tx.QueryRowContext(ctx, selectUserKindSQL, r.ID).Scan(&user, &kind); err != nil { + return nil, err + } + + jc := FindJobCreator(JobKind(kind)) + if jc == nil { + return nil, fmt.Errorf("no job creator found for '%s'", kind) + } + + importFeedback := logFeedback(r.ID) + + if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { + userTx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer userTx.Rollback() + + if r.Accepted { + err = jc.StageDone(ctx, userTx, r.ID, importFeedback) + } else { + _, err = userTx.ExecContext(ctx, deleteImportDataSQL, r.ID) + } + if err == nil { + err = userTx.Commit() + } + return err + }); err != nil { + return nil, err + } + + // Remove the import track + if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, r.ID); err != nil { + return nil, err + } + + var state string + if r.Accepted { + state = "accepted" + } else { + state = "declined" + } + + if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, r.ID); err != nil { + return nil, err + } + + importFeedback.Info("User '%s' %s import %d.", signer, state, r.ID) + + return nil, tx.Commit() +}