Mercurial > gemma
changeset 5135:53618d18e387
Merge queued-stage-done into default
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Fri, 27 Mar 2020 15:42:11 +0100 |
parents | 8d5e3ce27d20 (current diff) 590ccab9ab70 (diff) |
children | 03f28c5d2a88 3c748b2b4de6 |
files | schema/updates/1430/01.bottlenecks_geoserver_add_time.sql |
diffstat | 8 files changed, 373 insertions(+), 148 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go Fri Mar 27 15:30:15 2020 +0100 +++ b/pkg/controllers/importqueue.go Fri Mar 27 15:42:11 2020 +0100 @@ -23,6 +23,7 @@ "net/http" "strconv" "strings" + "sync" "time" "github.com/gorilla/mux" @@ -120,6 +121,9 @@ b = append(b, term) } + // Always filter review jobs. They are only for internal use. + cond(` NOT kind LIKE '%%` + imports.ReviewJobSuffix + `'`) + if query := req.FormValue("query"); query != "" { query = "%" + query + "%" cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ @@ -595,21 +599,46 @@ results := make([]reviewResult, len(rs)) - for i := range rs { - rev := &rs[i] - msg, err := decideImport(req, rev.ID, string(rev.State)) - var errString string - if err != nil { - errString = err.Error() - } - results[i] = reviewResult{ - ID: rev.ID, - Message: msg, - Error: errString, - } + for i := range results { + results[i].ID = rs[i].ID + results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID) } - return mw.JSONResult{Result: results}, nil + var wg sync.WaitGroup + var mu sync.Mutex + + for i := range rs { + wg.Add(1) + go func(idx int) { + defer wg.Done() + rev := &rs[idx] + msg, err := decideImport(req, rev.ID, string(rev.State)) + mu.Lock() + if err != nil { + results[idx].Error = err.Error() + } + results[idx].Message = msg + mu.Unlock() + }(i) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-time.After(5 * time.Second): + case <-done: + } + + out := make([]reviewResult, len(rs)) + mu.Lock() + copy(out, results) + mu.Unlock() + + return mw.JSONResult{Result: out}, nil } func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { @@ -638,23 +667,16 @@ id int64, state string, ) (message string, err error) { - ctx := req.Context() - - accepted := state == "accepted" session, _ := auth.GetSession(req) reviewer := session.User + ctx := req.Context() + accepted := state == "accepted" + if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil { - err = mw.JSONError{ - Code: http.StatusBadRequest, - Message: err.Error(), - } - return + return "", err } - message = fmt.Sprintf( - "Requested import #%d to be %s.", id, state) - - return + return fmt.Sprintf("Import #%d is %s.", id, state), nil }
--- a/pkg/imports/queue.go Fri Mar 27 15:30:15 2020 +0100 +++ b/pkg/imports/queue.go Fri Mar 27 15:42:11 2020 +0100 @@ -66,6 +66,11 @@ CleanUp() error } + FeedbackJob interface { + Job + CreateFeedback(int64) Feedback + } + // JobKind is the type of an import. // Choose a unique name for every import. JobKind string @@ -92,12 +97,17 @@ AutoAccept() bool } + JobRemover interface { + JobCreator + RemoveJob() bool + } + idJob struct { id int64 kind JobKind user string waitRetry pgtype.Interval - trysLeft sql.NullInt64 + triesLeft sql.NullInt64 sendEmail bool data string } @@ -108,17 +118,28 @@ runExclusive = -66666 ) +const ( + ReviewJobSuffix = "#review" + reviewJobRetries = 10 + reviewJobWait = time.Minute +) + type importQueue struct { - signalChan chan struct{} + cmdCh chan func(*importQueue) + creatorsMu sync.Mutex creators map[JobKind]JobCreator usedDeps map[string]int + + waiting map[int64]chan struct{} } var iqueue = importQueue{ - signalChan: make(chan struct{}), - creators: map[JobKind]JobCreator{}, - usedDeps: map[string]int{}, + cmdCh: make(chan func(*importQueue)), + + creators: map[JobKind]JobCreator{}, + usedDeps: map[string]int{}, + waiting: make(map[int64]chan struct{}), } var ( @@ -131,6 +152,7 @@ "pending", "accepted", "declined", + "reviewed", } ) @@ -162,8 +184,9 @@ $7 ) RETURNING id` + // Select oldest queued job but prioritize review jobs selectJobSQL = ` -SELECT +SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') id, kind, trys_left, @@ -174,11 +197,9 @@ FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND - state = 'queued'::import_state AND enqueued IN ( - SELECT min(enqueued) - FROM import.imports - WHERE state = 'queued'::import_state AND - kind = ANY($1)) + state = 'queued'::import_state AND + kind = ANY($1) +ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued LIMIT 1` updateStateSQL = ` @@ -194,6 +215,9 @@ summary = $2 WHERE id = $3` + deleteJobSQL = ` +DELETE FROM import.imports WHERE id = $1` + logMessageSQL = ` INSERT INTO import.import_logs ( import_id, @@ -215,10 +239,122 @@ return string(ue) } +type reviewedJobCreator struct { + jobCreator JobCreator +} + +func (*reviewedJobCreator) AutoAccept() bool { + return true +} + +func (*reviewedJobCreator) RemoveJob() bool { + return true +} + +func (rjc *reviewedJobCreator) Depends() [2][]string { + return rjc.jobCreator.Depends() +} + +func (rjc *reviewedJobCreator) Description() string { + return rjc.jobCreator.Description() + ReviewJobSuffix +} + +func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { + return nil +} + +type reviewedJob struct { + ID int64 `json:"id"` + Accepted bool `json:"accepted"` +} + +func (*reviewedJobCreator) Create() Job { + return new(reviewedJob) +} + +func (*reviewedJob) CleanUp() error { return nil } + +func (r *reviewedJob) CreateFeedback(int64) Feedback { + return logFeedback(r.ID) +} + +func (rj *reviewedJob) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + var signer string + if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { + return nil, err + } + + var user, kind string + if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { + return nil, err + } + + jc := FindJobCreator(JobKind(kind)) + if jc == nil { + return nil, fmt.Errorf("no job creator found for '%s'", kind) + } + + importFeedback := logFeedback(rj.ID) + + if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { + userTx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err + } + defer userTx.Rollback() + + if rj.Accepted { + err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) + } else { + _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) + } + if err == nil { + err = userTx.Commit() + } + return err + }); err != nil { + return nil, err + } + + // Remove the import track + if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { + return nil, err + } + + var state string + if rj.Accepted { + state = "accepted" + } else { + state = "declined" + } + + if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { + return nil, err + } + + importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) + + return nil, tx.Commit() +} + func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc + q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} + } // FindJobCreator looks up a JobCreator in the global import queue. @@ -275,16 +411,16 @@ func (idj *idJob) nextRetry(feedback Feedback) bool { switch { - case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: + case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: return false - case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: + case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: return true - case idj.trysLeft.Valid: - if idj.trysLeft.Int64 < 1 { - feedback.Warn("import should be retried, but no retrys left") + case idj.triesLeft.Valid: + if idj.triesLeft.Int64 < 1 { + feedback.Warn("no retries left") } else { - idj.trysLeft.Int64-- - feedback.Info("import failed but will be retried") + idj.triesLeft.Int64-- + feedback.Info("failed but will retry") return true } } @@ -304,11 +440,11 @@ return now } -func (idj *idJob) trysLeftPointer() *int { - if !idj.trysLeft.Valid { +func (idj *idJob) triesLeftPointer() *int { + if !idj.triesLeft.Valid { return nil } - t := int(idj.trysLeft.Int64) + t := int(idj.triesLeft.Int64) return &t } @@ -357,12 +493,13 @@ func (q *importQueue) addJob( kind JobKind, due time.Time, - trysLeft *int, + triesLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, -) (int64, error) { + sync bool, +) (int64, chan struct{}, error) { var id int64 if due.IsZero() { @@ -371,39 +508,47 @@ due = due.UTC() var tl sql.NullInt64 - if trysLeft != nil { - tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} + if triesLeft != nil { + tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true} } var wr pgtype.Interval if waitRetry != nil { if err := wr.Set(*waitRetry); err != nil { - return 0, err + return 0, nil, err } } else { wr = pgtype.Interval{Status: pgtype.Null} } - ctx := context.Background() - err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { - return conn.QueryRowContext( - ctx, - insertJobSQL, - string(kind), - due, - tl, - &wr, - user, - sendEmail, - data).Scan(&id) - }) - if err == nil { - select { - case q.signalChan <- struct{}{}: - default: - } + errCh := make(chan error) + var done chan struct{} + + q.cmdCh <- func(q *importQueue) { + ctx := context.Background() + errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error { + err := conn.QueryRowContext( + ctx, + insertJobSQL, + string(kind), + due, + tl, + &wr, + user, + sendEmail, + data).Scan(&id) + + if err == nil && sync { + log.Printf("info: register wait for %d\n", id) + done = make(chan struct{}) + q.waiting[id] = done + } + + return err + }) } - return id, err + + return id, done, <-errCh } // AddJob adds a job to the global import queue to be executed @@ -413,31 +558,38 @@ func AddJob( kind JobKind, due time.Time, - trysLeft *int, + triesLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { - return iqueue.addJob( + id, _, err := iqueue.addJob( kind, due, - trysLeft, + triesLeft, waitRetry, user, sendEmail, - data) + data, + false) + return id, err } const ( isPendingSQL = ` SELECT state = 'pending'::import_state, - kind, - username + kind FROM import.imports WHERE id = $1` + selectUserSQL = ` +SELECT username from import.imports WHERE ID = $1` + + selectUserKindSQL = ` +SELECT username, kind from import.imports WHERE ID = $1` + reviewSQL = ` UPDATE import.imports SET state = $1::import_state, @@ -449,9 +601,6 @@ deleteImportTrackSQL = ` DELETE FROM import.track_imports WHERE import_id = $1` - - logDecisionSQL = ` -INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` ) func (q *importQueue) decideImportTx( @@ -460,70 +609,62 @@ id int64, accepted bool, reviewer string, -) error { +) (chan struct{}, error) { var ( pending bool kind string - user string ) - switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { + switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { case err == sql.ErrNoRows: - return fmt.Errorf("cannot find import #%d", id) + return nil, fmt.Errorf("cannot find import #%d", id) case err != nil: - return err + return nil, err case !pending: - return fmt.Errorf("#%d is not pending", id) + return nil, fmt.Errorf("#%d is not pending", id) } jc := q.jobCreator(JobKind(kind)) if jc == nil { - return fmt.Errorf("no job creator for kind '%s'", kind) + return nil, fmt.Errorf("no job creator for kind '%s'", kind) + } + + r := &reviewedJob{ + ID: id, + Accepted: accepted, + } + serialized, err := common.ToJSONString(r) + if err != nil { + return nil, err } - if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { - txUser, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer txUser.Rollback() + // Try a little harder to persist the decision. + tries := reviewJobRetries + wait := reviewJobWait - if accepted { - feedback := logFeedback(id) - err = jc.StageDone(ctx, txUser, id, feedback) - } else { - _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) - } - - if err == nil { - err = txUser.Commit() - } - - return err - }); err != nil { - return err + rID, done, err := q.addJob( + JobKind(kind+ReviewJobSuffix), + time.Now(), + &tries, + &wait, + reviewer, + false, + serialized, + true) + if err != nil { + return nil, err } - - // Remove the import track - if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { - return err + log.Printf("info: add review job %d\n", rID) + _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) + if err != nil && done != nil { + go func() { + q.cmdCh <- func(q *importQueue) { + delete(q.waiting, rID) + } + }() + done = nil } - - var state string - if accepted { - state = "accepted" - } else { - state = "declined" - } - - logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) - - if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { - return err - } - - _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) - return err + return done, err } func (q *importQueue) decideImport( @@ -536,18 +677,25 @@ ctx = context.Background() } - return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { + var done chan struct{} + + if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() - err = q.decideImportTx(ctx, tx, id, accepted, reviewer) + done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer) if err == nil { err = tx.Commit() } return err - }) + }); err != nil { + return err + } + + <-done + return nil } func DecideImport( @@ -646,7 +794,7 @@ if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, - &ji.trysLeft, + &ji.triesLeft, &ji.waitRetry, &ji.user, &ji.sendEmail, @@ -714,6 +862,13 @@ }) } +func deleteJob(ctx context.Context, id int64) error { + return tryHardToStoreState(ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, deleteJobSQL, id) + return err + }) +} + func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() return tryHardToStoreState(ctx, func(conn *sql.Conn) error { @@ -756,7 +911,9 @@ break } select { - case <-q.signalChan: + case cmd := <-q.cmdCh: + cmd(q) + case <-time.After(pollDuration): } } @@ -774,12 +931,16 @@ go func(jc JobCreator, idj *idJob) { - // Unlock the dependencies. defer func() { + // Unlock the dependencies. q.unlockDependencies(jc) - select { - case q.signalChan <- struct{}{}: - default: + // Unlock waiting. + q.cmdCh <- func(q *importQueue) { + if w := q.waiting[idj.id]; w != nil { + log.Printf("info: unlock waiting %d\n", idj.id) + close(w) + delete(q.waiting, idj.id) + } } }() @@ -790,9 +951,12 @@ return } - feedback := logFeedback(idj.id) - - feedback.Info("import #%d started", idj.id) + var feedback Feedback + if fc, ok := job.(FeedbackJob); ok { + feedback = fc.CreateFeedback(idj.id) + } else { + feedback = logFeedback(idj.id) + } ctx := context.Background() var summary interface{} @@ -823,6 +987,11 @@ } } + var remove bool + if remover, ok := jc.(JobRemover); ok { + remove = remover.RemoveJob() + } + var state string switch { case unchanged: @@ -834,28 +1003,36 @@ default: state = "pending" } - if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { - log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) + if !remove { + if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { + log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) + } + log.Printf("info: import #%d finished: %s\n", idj.id, state) } - log.Printf("info: import #%d finished: %s\n", idj.id, state) if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } if retry { - nid, err := q.addJob( + nid, _, err := q.addJob( idj.kind, idj.nextDue(), - idj.trysLeftPointer(), + idj.triesLeftPointer(), idj.waitRetryPointer(), idj.user, idj.sendEmail, - idj.data) + idj.data, + false) if err != nil { log.Printf("error: retry enqueue failed: %v\n", err) } else { log.Printf("info: re-enqueued job with id %d\n", nid) } } + if remove { + if err := deleteJob(ctx, idj.id); err != nil { + log.Printf("error: deleting job %d failed: %v\n", idj.id, err) + } + } }(jc, idj) } }
--- a/schema/auth.sql Fri Mar 27 15:30:15 2020 +0100 +++ b/schema/auth.sql Fri Mar 27 15:42:11 2020 +0100 @@ -61,6 +61,7 @@ GRANT INSERT, UPDATE ON sys_admin.system_config TO sys_admin; GRANT UPDATE ON sys_admin.published_services TO sys_admin; GRANT INSERT, DELETE, UPDATE ON sys_admin.password_reset_requests TO sys_admin; +GRANT DELETE ON import.imports, import.import_logs TO sys_admin; -- -- Privileges assigned directly to metamorph
--- a/schema/gemma.sql Fri Mar 27 15:30:15 2020 +0100 +++ b/schema/gemma.sql Fri Mar 27 15:42:11 2020 +0100 @@ -1220,7 +1220,7 @@ 'queued', 'running', 'failed', 'unchanged', 'pending', - 'accepted', 'declined' + 'accepted', 'declined', 'reviewed' ); CREATE TYPE log_type AS ENUM ('info', 'warn', 'error'); @@ -1270,8 +1270,10 @@ data TEXT, summary TEXT ) - - CREATE INDEX enqueued_idx ON imports(enqueued, state) + -- Mainly for listing imports in clients: + CREATE INDEX enqueued_idx ON imports(enqueued) + -- For fast retrieval of queued imports by the import queue in backend: + CREATE INDEX state_idx ON imports(state) CREATE TABLE import_logs ( import_id int NOT NULL REFERENCES imports(id)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1433/01.add_state.sql Fri Mar 27 15:42:11 2020 +0100 @@ -0,0 +1,19 @@ +-- DROP and re-CREATE type because adding a value isn't possible in transaction +-- https://www.postgresql.org/docs/11/sql-altertype.html#id-1.9.3.42.7 + +ALTER TABLE import.imports + ALTER COLUMN state DROP DEFAULT, + ALTER COLUMN state TYPE varchar; + +DROP TYPE import_state; + +CREATE TYPE import_state AS ENUM ( + 'queued', + 'running', + 'failed', 'unchanged', 'pending', + 'accepted', 'declined', 'reviewed' +); + +ALTER TABLE import.imports + ALTER COLUMN state TYPE import_state USING CAST(state AS import_state), + ALTER COLUMN state SET DEFAULT 'queued';
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1434/01.allow_job_delete_sys_admin.sql Fri Mar 27 15:42:11 2020 +0100 @@ -0,0 +1,1 @@ +GRANT DELETE ON import.imports, import.import_logs TO sys_admin;