changeset 1654:85386ad17d34

Scheduled imports: Don't track the user in the running scheduler. The user is stored in the database already.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 20 Dec 2018 18:39:19 +0100
parents 06f08458d666
children 34315277f2d6
files pkg/controllers/importconfig.go pkg/controllers/user.go pkg/imports/gmsched.go pkg/scheduler/boot.go pkg/scheduler/scheduler.go
diffstat 5 files changed, 60 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importconfig.go	Thu Dec 20 18:14:28 2018 +0100
+++ b/pkg/controllers/importconfig.go	Thu Dec 20 18:39:19 2018 +0100
@@ -149,7 +149,6 @@
 		if err = scheduler.BindAction(
 			string(importConfig.Kind),
 			cron.String,
-			session.User,
 			id,
 		); err != nil {
 			return
@@ -321,7 +320,6 @@
 		if err = scheduler.BindAction(
 			string(importConfig.Kind),
 			string(*importConfig.Cron),
-			session.User,
 			id,
 		); err != nil {
 			return
--- a/pkg/controllers/user.go	Thu Dec 20 18:14:28 2018 +0100
+++ b/pkg/controllers/user.go	Thu Dec 20 18:39:19 2018 +0100
@@ -17,6 +17,7 @@
 
 import (
 	"bytes"
+	"context"
 	"database/sql"
 	"fmt"
 	"log"
@@ -71,6 +72,10 @@
   ST_XMax(map_extent), ST_YMax(map_extent)
 FROM users.list_users
 WHERE username = $1`
+
+	scheduledIDsSQL = `
+SELECT id from waterway.import_configuration
+WHERE username = $1`
 )
 
 var (
@@ -96,6 +101,30 @@
 (inkluding import errors) and details on the concerned import.`))
 )
 
+func scheduledIDs(
+	ctx context.Context,
+	conn *sql.Conn,
+	user string,
+) (map[int64]struct{}, error) {
+	ids := map[int64]struct{}{}
+	rows, err := conn.QueryContext(ctx, scheduledIDsSQL, user)
+	if err != nil {
+		return nil, nil
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var id int64
+		if err := rows.Scan(&id); err != nil {
+			return nil, err
+		}
+		ids[id] = struct{}{}
+	}
+	if err := rows.Err(); err != nil {
+		return nil, err
+	}
+	return ids, nil
+}
+
 func deleteUser(
 	_ interface{}, req *http.Request,
 	db *sql.Conn,
@@ -113,6 +142,16 @@
 		return
 	}
 
+	// Remove scheduled tasks.
+	ids, err2 := scheduledIDs(req.Context(), db, user)
+	if err2 == nil {
+		if len(ids) > 0 {
+			go func() { scheduler.UnbindByIDs(ids) }()
+		}
+	} else {
+		log.Printf("error: %v\n", err2)
+	}
+
 	var res sql.Result
 
 	if res, err = db.ExecContext(req.Context(), deleteUserSQL, user); err != nil {
@@ -129,7 +168,6 @@
 
 	// Running in a go routine should not be necessary.
 	go func() { auth.Sessions.Logout(user) }()
-	go func() { scheduler.UnbindUser(user) }()
 
 	jr = JSONResult{Code: http.StatusNoContent}
 	return
@@ -207,7 +245,6 @@
 	if user != newUser.User {
 		// Running in a go routine should not be necessary.
 		go func() { auth.Sessions.Logout(string(user)) }()
-		go func() { scheduler.UnbindUser(string(user)) }()
 	}
 
 	jr = JSONResult{
--- a/pkg/imports/gmsched.go	Thu Dec 20 18:14:28 2018 +0100
+++ b/pkg/imports/gmsched.go	Thu Dec 20 18:39:19 2018 +0100
@@ -23,7 +23,7 @@
 	scheduler.RegisterAction("gm", scheduledGM)
 }
 
-func scheduledGM(user string, id int64) {
+func scheduledGM(id int64) {
 	log.Println("info: scheduled GM import")
 	cfg, err := loadIDConfig(id)
 	if err != nil {
--- a/pkg/scheduler/boot.go	Thu Dec 20 18:14:28 2018 +0100
+++ b/pkg/scheduler/boot.go	Thu Dec 20 18:39:19 2018 +0100
@@ -57,7 +57,6 @@
 				var id int64
 				if err = rows.Scan(
 					&id,
-					&ba.User,
 					&ba.Name,
 					&ba.Spec,
 				); err != nil {
--- a/pkg/scheduler/scheduler.go	Thu Dec 20 18:14:28 2018 +0100
+++ b/pkg/scheduler/scheduler.go	Thu Dec 20 18:39:19 2018 +0100
@@ -24,12 +24,11 @@
 // ErrNoSuchAction if no fitting action was found.
 var ErrNoSuchAction = errors.New("No such action")
 
-// Action is called with a user and a configuration id.
-type Action func(user string, cfgID int64)
+// Action is called with a configuration id.
+type Action func(cfgID int64)
 
 type userAction struct {
 	scheduler *scheduler
-	user      string
 	name      string
 	cfgID     int64
 }
@@ -43,7 +42,7 @@
 // Run implements cron.Job.
 func (ua *userAction) Run() {
 	if a := ua.scheduler.action(ua.name); a != nil {
-		a(ua.user, ua.cfgID)
+		a(ua.cfgID)
 	} else {
 		log.Printf("warn: scheduled action '%s' not found.", ua.name)
 	}
@@ -65,12 +64,11 @@
 }
 
 // BoundAction is a complete set of infos for
-// an action to be bound to a user, schedule and
+// an action to be bound to a schedule and
 // configuration id.
 type BoundAction struct {
 	Name  string
 	Spec  string
-	User  string
 	CfgID int64
 }
 
@@ -99,8 +97,6 @@
 		}
 		job := &userAction{
 			scheduler: s,
-			user:      ba.User,
-			name:      ba.Name,
 			cfgID:     ba.CfgID,
 		}
 		cr.Schedule(schedule, job)
@@ -116,16 +112,16 @@
 	return nil
 }
 
-// BindAction binds a named action to a user, a cron spec and
+// BindAction binds a named action to a cron spec and
 // a configuration id.
-func BindAction(name, spec, user string, cfgID int64) error {
-	return global.bindAction(name, spec, user, cfgID)
+func BindAction(name, spec string, cfgID int64) error {
+	return global.bindAction(name, spec, cfgID)
 }
 
 // UnbindAction unbinds a named action from a user and
 // a configuration id.
-func UnbindAction(name, user string, cfgID int64) {
-	global.unbindAction(name, user, cfgID)
+func UnbindAction(name string, cfgID int64) {
+	global.unbindAction(name, cfgID)
 }
 
 // UnbindByID unbinds all schedules with a given id.
@@ -134,8 +130,8 @@
 }
 
 // UnbindUser unbinds all schedules for a given user.
-func UnbindUser(user string) {
-	global.unbindUser(user)
+func UnbindByIDs(ids map[int64]struct{}) {
+	global.unbindByIDs(ids)
 }
 
 // HasAction asks if there is an action with a given name.
@@ -149,7 +145,7 @@
 	return s.actions[name] != nil
 }
 
-func (s *scheduler) unbindUser(user string) {
+func (s *scheduler) unbindByIDs(ids map[int64]struct{}) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -162,8 +158,7 @@
 	var found bool
 	for _, entry := range entries {
 		ua := entry.Job.(*userAction)
-		if ua.user == user {
-			found = true
+		if _, found = ids[ua.cfgID]; found {
 			break
 		}
 	}
@@ -175,7 +170,7 @@
 	s.cr = cron.New()
 	for _, entry := range entries {
 		ua := entry.Job.(*userAction)
-		if ua.user != user {
+		if _, found := ids[ua.cfgID]; !found {
 			s.cr.Schedule(entry.Schedule, entry.Job)
 		}
 	}
@@ -212,7 +207,7 @@
 	s.cr.Start()
 }
 
-func (s *scheduler) unbindAction(name, user string, cfgID int64) {
+func (s *scheduler) unbindAction(name string, cfgID int64) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
@@ -221,8 +216,8 @@
 	var found *userAction
 	for _, entry := range entries {
 		ua := entry.Job.(*userAction)
-		if ua.name == name && ua.user == user && cfgID == ua.cfgID {
-			// Already have such a user/action/cfg tuple  -> re-schedule.
+		if ua.name == name && cfgID == ua.cfgID {
+			// Already have such a action/cfg tuple  -> re-schedule.
 			found = ua
 			break
 		}
@@ -243,7 +238,7 @@
 	s.cr.Start()
 }
 
-func (s *scheduler) bindAction(name, spec, user string, cfgID int64) error {
+func (s *scheduler) bindAction(name, spec string, cfgID int64) error {
 
 	schedule, err := cron.Parse(spec)
 	if err != nil {
@@ -258,7 +253,7 @@
 	var found *userAction
 	for _, entry := range entries {
 		ua := entry.Job.(*userAction)
-		if ua.name == name && ua.user == user && cfgID == ua.cfgID {
+		if ua.name == name && cfgID == ua.cfgID {
 			// Already have such a user/action/cfg tuple  -> re-schedule.
 			found = ua
 			break
@@ -267,7 +262,7 @@
 
 	if found == nil {
 		// Add to current plan.
-		job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID}
+		job := &userAction{scheduler: s, name: name, cfgID: cfgID}
 		s.cr.Schedule(schedule, job)
 	} else {
 		// If found re-build all.