Mercurial > gemma
changeset 5111:90b0a14dd58b queued-stage-done
Enable jobs to be removed by the import queue.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 25 Mar 2020 16:51:45 +0100 |
parents | 4dc2e6dc6c7d |
children | 4c113aa9a521 |
files | pkg/imports/queue.go schema/auth.sql schema/updates/1432/01.allow_job_delete_sys_admin.sql schema/version.sql |
diffstat | 4 files changed, 42 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/queue.go Wed Mar 25 15:33:11 2020 +0100 +++ b/pkg/imports/queue.go Wed Mar 25 16:51:45 2020 +0100 @@ -66,6 +66,11 @@ CleanUp() error } + FeedbackJob interface { + Job + CreateFeedback(int64) Feedback + } + // JobKind is the type of an import. // Choose a unique name for every import. JobKind string @@ -92,8 +97,9 @@ AutoAccept() bool } - FeedbackCreator interface { - CreateFeedback(int64) Feedback + JobRemover interface { + JobCreator + RemoveJob() bool } idJob struct { @@ -205,6 +211,9 @@ summary = $2 WHERE id = $3` + deleteJobSQL = ` +DELETE FROM import.imports WHERE id = $1` + logMessageSQL = ` INSERT INTO import.import_logs ( import_id, @@ -234,6 +243,10 @@ return true } +func (*reviewedJobCreator) RemoveJob() bool { + return true +} + func (rjc *reviewedJobCreator) Depends() [2][]string { return rjc.jobCreator.Depends() } @@ -821,6 +834,13 @@ }) } +func deleteJob(ctx context.Context, id int64) error { + return tryHardToStoreState(ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, deleteJobSQL, id) + return err + }) +} + func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() return tryHardToStoreState(ctx, func(conn *sql.Conn) error { @@ -898,7 +918,7 @@ } var feedback Feedback - if fc, ok := job.(FeedbackCreator); ok { + if fc, ok := job.(FeedbackJob); ok { feedback = fc.CreateFeedback(idj.id) } else { feedback = logFeedback(idj.id) @@ -935,6 +955,11 @@ } } + var remove bool + if remover, ok := jc.(JobRemover); ok { + remove = remover.RemoveJob() + } + var state string switch { case unchanged: @@ -946,15 +971,18 @@ default: state = "pending" } - if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { - log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) + if !remove { + if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { + log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) + } + log.Printf("info: import #%d finished: %s\n", idj.id, state) } - log.Printf("info: import #%d finished: %s\n", idj.id, state) if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } - if retry { + switch { + case retry: nid, err := q.addJob( idj.kind, idj.nextDue(), @@ -967,6 +995,10 @@ } else { log.Printf("info: re-enqueued job with id %d\n", nid) } + case remove: + if err := deleteJob(ctx, idj.id); err != nil { + log.Printf("error: deleting job %d failed: %v\n", idj.id, err) + } } }(jc, idj) }
--- a/schema/auth.sql Wed Mar 25 15:33:11 2020 +0100 +++ b/schema/auth.sql Wed Mar 25 16:51:45 2020 +0100 @@ -61,6 +61,7 @@ GRANT INSERT, UPDATE ON sys_admin.system_config TO sys_admin; GRANT UPDATE ON sys_admin.published_services TO sys_admin; GRANT INSERT, DELETE, UPDATE ON sys_admin.password_reset_requests TO sys_admin; +GRANT DELETE ON import.imports, import.import_logs TO sys_admin; -- -- Privileges assigned directly to metamorph