changeset 1642:49c04bb64e0a

Import queue: Implemented auto-accept and email sending. TODO: Do actual email sending.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 20 Dec 2018 13:45:32 +0100
parents 334d13e63342
children 1fde2f48977b
files pkg/controllers/bnimports.go pkg/controllers/gmimports.go pkg/controllers/srimports.go pkg/imports/queue.go schema/gemma.sql
diffstat 5 files changed, 61 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/bnimports.go	Thu Dec 20 13:19:54 2018 +0100
+++ b/pkg/controllers/bnimports.go	Thu Dec 20 13:45:32 2018 +0100
@@ -45,7 +45,11 @@
 
 	session, _ := auth.GetSession(req)
 
-	jobID, err := imports.AddJob(imports.BNJobKind, session.User, serialized)
+	jobID, err := imports.AddJob(
+		imports.BNJobKind, session.User,
+		false, false,
+		serialized)
+
 	if err != nil {
 		return
 	}
--- a/pkg/controllers/gmimports.go	Thu Dec 20 13:19:54 2018 +0100
+++ b/pkg/controllers/gmimports.go	Thu Dec 20 13:45:32 2018 +0100
@@ -45,7 +45,12 @@
 
 	session, _ := auth.GetSession(req)
 
-	jobID, err := imports.AddJob(imports.GMJobKind, session.User, serialized)
+	jobID, err := imports.AddJob(
+		imports.GMJobKind,
+		session.User,
+		false, true,
+		serialized)
+
 	if err != nil {
 		return
 	}
--- a/pkg/controllers/srimports.go	Thu Dec 20 13:19:54 2018 +0100
+++ b/pkg/controllers/srimports.go	Thu Dec 20 13:45:32 2018 +0100
@@ -161,7 +161,12 @@
 
 	session, _ := auth.GetSession(req)
 
-	jobID, err := imports.AddJob(imports.SRJobKind, session.User, serialized)
+	jobID, err := imports.AddJob(
+		imports.SRJobKind,
+		session.User,
+		false, false,
+		serialized)
+
 	if err != nil {
 		log.Printf("error: %v\n", err)
 		http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError)
--- a/pkg/imports/queue.go	Thu Dec 20 13:19:54 2018 +0100
+++ b/pkg/imports/queue.go	Thu Dec 20 13:45:32 2018 +0100
@@ -83,10 +83,12 @@
 	}
 
 	idJob struct {
-		id   int64
-		kind JobKind
-		user string
-		data string
+		id         int64
+		kind       JobKind
+		user       string
+		sendEmail  bool
+		autoAccept bool
+		data       string
 	}
 )
 
@@ -128,11 +130,15 @@
 INSERT INTO waterway.imports (
   kind,
   username,
+  send_email,
+  auto_accept,
   data
 ) VALUES (
   $1,
   $2,
-  $3
+  $3,
+  $4,
+  $5
 ) RETURNING id`
 
 	selectJobSQL = `
@@ -140,6 +146,8 @@
   id,
   kind,
   username,
+  send_email,
+  auto_accept,
   data
 FROM waterway.imports
 WHERE state = 'queued'::waterway.import_state AND enqueued IN (
@@ -232,11 +240,23 @@
 	return q.creators[kind]
 }
 
-func (q *importQueue) addJob(kind JobKind, user, data string) (int64, error) {
+func (q *importQueue) addJob(
+	kind JobKind,
+	user string,
+	sendEmail, autoAccept bool,
+	data string,
+) (int64, error) {
 	ctx := context.Background()
 	var id int64
 	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
-		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
+		return conn.QueryRowContext(
+			ctx,
+			insertJobSQL,
+			string(kind),
+			user,
+			sendEmail,
+			autoAccept,
+			data).Scan(&id)
 	})
 	if err == nil {
 		select {
@@ -250,8 +270,8 @@
 // AddJob adds a job to the global import queue to be executed
 // as soon as possible. This is gone in a separate Go routine
 // so this will not block.
-func AddJob(kind JobKind, user, data string) (int64, error) {
-	return iqueue.addJob(kind, user, data)
+func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) {
+	return iqueue.addJob(kind, user, sendEmail, autoAccept, data)
 }
 
 type logFeedback int64
@@ -333,7 +353,13 @@
 		}
 		defer tx.Rollback()
 		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
-			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
+			&ji.id,
+			&ji.kind,
+			&ji.user,
+			&ji.sendEmail,
+			&ji.autoAccept,
+			&ji.data,
+		); err != nil {
 			return err
 		}
 		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
@@ -480,14 +506,18 @@
 			}
 
 			var state string
-			if errDo != nil || errCleanup != nil {
+			switch {
+			case errDo != nil || errCleanup != nil:
 				state = "failed"
-			} else {
+			case idj.autoAccept:
+				state = "accepted"
+			default:
 				state = "pending"
 			}
 			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
 				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
 			}
+			// TODO: Send email if sendEmail is set.
 			log.Printf("import #%d finished: %s\n", idj.id, state)
 		}(jc, idj)
 	}
--- a/schema/gemma.sql	Thu Dec 20 13:19:54 2018 +0100
+++ b/schema/gemma.sql	Thu Dec 20 13:45:32 2018 +0100
@@ -587,6 +587,8 @@
         REFERENCES internal.user_profiles(username)
             ON DELETE SET NULL
             ON UPDATE CASCADE,
+    send_email boolean NOT NULL DEFAULT false,
+    auto_accept boolean NOT NULL DEFAULT false,
     data TEXT,
     summary TEXT
 );