Mercurial > gemma
changeset 5100:d3a24152b0be queued-stage-done
Register a kind#rev job factory for any job factory.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 24 Mar 2020 16:42:10 +0100 |
parents | 3cd736acbad3 |
children | 1b0b13e70bc1 |
files | pkg/imports/queue.go pkg/imports/reviewed.go |
diffstat | 2 files changed, 143 insertions(+), 194 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Tue Mar 24 15:46:37 2020 +0100 +++ b/pkg/imports/queue.go Tue Mar 24 16:42:10 2020 +0100 @@ -70,19 +70,15 @@ // Choose a unique name for every import. JobKind string - Dependencies interface { + // JobCreator is used to bring a job to life as it is stored + // in pure meta-data form to the database. + JobCreator interface { // Depends returns two lists of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. // The first list are locked exclusively. // The second allows multiple read users but only one writing one. Depends() [2][]string - } - - // JobCreator is used to bring a job to life as it is stored - // in pure meta-data form to the database. - JobCreator interface { - Dependencies // Description is the long name of the import. Description() string // Create build the actual job. @@ -220,10 +216,114 @@ return string(ue) } +type reviewedJobCreator struct { + dependencies [2][]string +} + +func (*reviewedJobCreator) AutoAccept() bool { + return true +} + +func (r *reviewedJobCreator) Depends() [2][]string { + return r.dependencies +} + +func (*reviewedJobCreator) Description() string { + return "review job creator" +} + +func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { + return nil +} + +type reviewedJob struct { + ID int64 `json:"id"` + Accepted bool `json:"bool"` +} + +func (*reviewedJobCreator) Create() Job { + return new(reviewedJob) +} + +func (*reviewedJob) CleanUp() error { return nil } + +func (rj *reviewedJob) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + var signer string + if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { + return nil, err + } + + var user, kind string + if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { + return nil, err + } + + jc := FindJobCreator(JobKind(kind)) + if jc == nil { + return nil, fmt.Errorf("no job creator found for '%s'", kind) + } + + importFeedback := logFeedback(rj.ID) + + if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { + userTx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer userTx.Rollback() + + if rj.Accepted { + err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) + } else { + _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) + } + if err == nil { + err = userTx.Commit() + } + return err + }); err != nil { + return nil, err + } + + // Remove the import track + if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { + return nil, err + } + + var state string + if rj.Accepted { + state = "accepted" + } else { + state = "declined" + } + + if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { + return nil, err + } + + importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) + + return nil, tx.Commit() +} + func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc + q.creators[kind+"#review"] = &reviewedJobCreator{dependencies: jc.Depends()} + } // FindJobCreator looks up a JobCreator in the global import queue. @@ -329,8 +429,8 @@ return d } -func (q *importQueue) lockDependencies(d Dependencies) { - deps := d.Depends() +func (q *importQueue) lockDependencies(jc JobCreator) { + deps := jc.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { @@ -341,8 +441,8 @@ } } -func (q *importQueue) unlockDependencies(d Dependencies) { - deps := d.Depends() +func (q *importQueue) unlockDependencies(jc JobCreator) { + deps := jc.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { @@ -441,6 +541,27 @@ kind FROM import.imports WHERE id = $1` + + selectKindSQL = ` +SELECT kind from import.imports WHERE ID = $1` + + selectUserSQL = ` +SELECT username from import.imports WHERE ID = $1` + + selectUserKindSQL = ` +SELECT username, kind from import.imports WHERE ID = $1` + + reviewSQL = ` +UPDATE import.imports SET + state = $1::import_state, + changed = CURRENT_TIMESTAMP, + signer = $2 +WHERE id = $3` + + deleteImportDataSQL = `SELECT import.del_import($1)` + + deleteImportTrackSQL = ` +DELETE FROM import.track_imports WHERE import_id = $1` ) func (q *importQueue) decideImportTx( @@ -469,7 +590,7 @@ return fmt.Errorf("no job creator for kind '%s'", kind) } - r := &reviewed{ + r := &reviewedJob{ ID: id, Accepted: accepted, } @@ -478,7 +599,7 @@ return err } rID, err := q.addJob( - reviewedJobKind, + JobKind(kind+"#review"), time.Now(), nil, nil, @@ -736,34 +857,27 @@ continue } - job := jc.Create() - if err := common.FromJSONString(idj.data, job); err != nil { - errorAndFail(idj.id, "failed to create job for import #%d: %v", - idj.id, err) - continue - } - - var dependencies Dependencies - if deps, ok := job.(Dependencies); ok { - dependencies = deps - } else { - dependencies = jc - } - // Lock dependencies. - q.lockDependencies(dependencies) + q.lockDependencies(jc) go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { - q.unlockDependencies(dependencies) + q.unlockDependencies(jc) select { case q.signalChan <- struct{}{}: default: } }() + job := jc.Create() + if err := common.FromJSONString(idj.data, job); err != nil { + errorAndFail(idj.id, "failed to create job for import #%d: %v", + idj.id, err) + return + } + feedback := logFeedback(idj.id) feedback.Info("import #%d started", idj.id)
--- a/pkg/imports/reviewed.go Tue Mar 24 15:46:37 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,165 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2018, 2019, 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Sascha L. Teichmann <sascha.teichmann@intevation.de> - -package imports - -import ( - "context" - "database/sql" - "fmt" - "log" - - "gemma.intevation.de/gemma/pkg/auth" -) - -type reviewed struct { - ID int64 `json:"id"` - Accepted bool `json:"bool"` -} - -const ( - selectKindSQL = ` -SELECT kind from import.imports WHERE ID = $1` - - selectUserSQL = ` -SELECT username from import.imports WHERE ID = $1` - - selectUserKindSQL = ` -SELECT username, kind from import.imports WHERE ID = $1` - - reviewSQL = ` -UPDATE import.imports SET - state = $1::import_state, - changed = CURRENT_TIMESTAMP, - signer = $2 -WHERE id = $3` - - deleteImportDataSQL = `SELECT import.del_import($1)` - - deleteImportTrackSQL = ` -DELETE FROM import.track_imports WHERE import_id = $1` -) - -type reviewedCreator struct{} - -const reviewedJobKind JobKind = "reviewed" - -func init() { RegisterJobCreator(reviewedJobKind, reviewedCreator{}) } - -func (reviewedCreator) Description() string { return "reviewed imports" } - -func (reviewedCreator) Depends() [2][]string { - return [2][]string{{"reviewed"}} -} - -func (reviewedCreator) AutoAccept() bool { return true } - -func (reviewedCreator) Create() Job { return new(reviewed) } - -func (r *reviewed) Depends() (deps [2][]string) { - ctx := context.Background() - var kind string - if err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { - return conn.QueryRowContext(ctx, selectKindSQL, r.ID).Scan(&kind) - }); err != nil { - log.Printf("error: %v\n", err) - return - } - jc := FindJobCreator(JobKind(kind)) - if jc == nil { - log.Printf("error: no job creator found for '%s'", kind) - } - deps = jc.Depends() - n := make([]string, len(deps[0])+1) - n[0] = "reviewed" - copy(n[1:], deps[0]) - deps[0] = n - return -} - -func (*reviewed) CleanUp() error { return nil } - -func (reviewedCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { - return nil -} - -func (r *reviewed) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - - var signer string - if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { - return nil, err - } - - var user, kind string - if err := tx.QueryRowContext(ctx, selectUserKindSQL, r.ID).Scan(&user, &kind); err != nil { - return nil, err - } - - jc := FindJobCreator(JobKind(kind)) - if jc == nil { - return nil, fmt.Errorf("no job creator found for '%s'", kind) - } - - importFeedback := logFeedback(r.ID) - - if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { - userTx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer userTx.Rollback() - - if r.Accepted { - err = jc.StageDone(ctx, userTx, r.ID, importFeedback) - } else { - _, err = userTx.ExecContext(ctx, deleteImportDataSQL, r.ID) - } - if err == nil { - err = userTx.Commit() - } - return err - }); err != nil { - return nil, err - } - - // Remove the import track - if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, r.ID); err != nil { - return nil, err - } - - var state string - if r.Accepted { - state = "accepted" - } else { - state = "declined" - } - - if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, r.ID); err != nil { - return nil, err - } - - importFeedback.Info("User '%s' %s import %d.", signer, state, r.ID) - - return nil, tx.Commit() -}