changeset 5111:90b0a14dd58b queued-stage-done

Enable jobs to be removed by the import queue.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 25 Mar 2020 16:51:45 +0100
parents 4dc2e6dc6c7d
children 4c113aa9a521
files pkg/imports/queue.go schema/auth.sql schema/updates/1432/01.allow_job_delete_sys_admin.sql schema/version.sql
diffstat 4 files changed, 42 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Wed Mar 25 15:33:11 2020 +0100
+++ b/pkg/imports/queue.go	Wed Mar 25 16:51:45 2020 +0100
@@ -66,6 +66,11 @@
 		CleanUp() error
 	}
 
+	FeedbackJob interface {
+		Job
+		CreateFeedback(int64) Feedback
+	}
+
 	// JobKind is the type of an import.
 	// Choose a unique name for every import.
 	JobKind string
@@ -92,8 +97,9 @@
 		AutoAccept() bool
 	}
 
-	FeedbackCreator interface {
-		CreateFeedback(int64) Feedback
+	JobRemover interface {
+		JobCreator
+		RemoveJob() bool
 	}
 
 	idJob struct {
@@ -205,6 +211,9 @@
    summary = $2
 WHERE id = $3`
 
+	deleteJobSQL = `
+DELETE FROM import.imports WHERE id = $1`
+
 	logMessageSQL = `
 INSERT INTO import.import_logs (
   import_id,
@@ -234,6 +243,10 @@
 	return true
 }
 
+func (*reviewedJobCreator) RemoveJob() bool {
+	return true
+}
+
 func (rjc *reviewedJobCreator) Depends() [2][]string {
 	return rjc.jobCreator.Depends()
 }
@@ -821,6 +834,13 @@
 	})
 }
 
+func deleteJob(ctx context.Context, id int64) error {
+	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, deleteJobSQL, id)
+		return err
+	})
+}
+
 func errorAndFail(id int64, format string, args ...interface{}) error {
 	ctx := context.Background()
 	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
@@ -898,7 +918,7 @@
 			}
 
 			var feedback Feedback
-			if fc, ok := job.(FeedbackCreator); ok {
+			if fc, ok := job.(FeedbackJob); ok {
 				feedback = fc.CreateFeedback(idj.id)
 			} else {
 				feedback = logFeedback(idj.id)
@@ -935,6 +955,11 @@
 				}
 			}
 
+			var remove bool
+			if remover, ok := jc.(JobRemover); ok {
+				remove = remover.RemoveJob()
+			}
+
 			var state string
 			switch {
 			case unchanged:
@@ -946,15 +971,18 @@
 			default:
 				state = "pending"
 			}
-			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
-				log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
+			if !remove {
+				if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
+					log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
+				}
+				log.Printf("info: import #%d finished: %s\n", idj.id, state)
 			}
-			log.Printf("info: import #%d finished: %s\n", idj.id, state)
 			if idj.sendEmail {
 				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
 			}
 
-			if retry {
+			switch {
+			case retry:
 				nid, err := q.addJob(
 					idj.kind,
 					idj.nextDue(),
@@ -967,6 +995,10 @@
 				} else {
 					log.Printf("info: re-enqueued job with id %d\n", nid)
 				}
+			case remove:
+				if err := deleteJob(ctx, idj.id); err != nil {
+					log.Printf("error: deleting job %d failed: %v\n", idj.id, err)
+				}
 			}
 		}(jc, idj)
 	}
--- a/schema/auth.sql	Wed Mar 25 15:33:11 2020 +0100
+++ b/schema/auth.sql	Wed Mar 25 16:51:45 2020 +0100
@@ -61,6 +61,7 @@
 GRANT INSERT, UPDATE ON sys_admin.system_config TO sys_admin;
 GRANT UPDATE ON sys_admin.published_services TO sys_admin;
 GRANT INSERT, DELETE, UPDATE ON sys_admin.password_reset_requests TO sys_admin;
+GRANT DELETE ON import.imports, import.import_logs TO sys_admin;
 
 --
 -- Privileges assigned directly to metamorph
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1432/01.allow_job_delete_sys_admin.sql	Wed Mar 25 16:51:45 2020 +0100
@@ -0,0 +1,1 @@
+GRANT DELETE ON import.imports, import.import_logs TO sys_admin;
--- a/schema/version.sql	Wed Mar 25 15:33:11 2020 +0100
+++ b/schema/version.sql	Wed Mar 25 16:51:45 2020 +0100
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1431);
+INSERT INTO gemma_schema_version(version) VALUES (1432);