changeset 1541:03fcad10104a

Added an internal cron job executor which binds actions to tuples of cron schedule, user name and an optional configuation id.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 10 Dec 2018 12:35:49 +0100
parents 251ee25accce
children 31c6c7bd6190
files pkg/scheduler/scheduler.go
diffstat 1 files changed, 134 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/scheduler/scheduler.go	Fri Dec 07 18:34:53 2018 +0100
+++ b/pkg/scheduler/scheduler.go	Mon Dec 10 12:35:49 2018 +0100
@@ -20,43 +20,162 @@
 	"github.com/robfig/cron"
 )
 
+// ErrNoSuchAction if no fitting action was found.
 var ErrNoSuchAction = errors.New("No such action")
 
-type funcJob struct {
-	bound bool
-	fn    func()
+// Action is called with a user and an optional configuration id.
+type Action func(user string, cfgID *int64)
+
+type userAction struct {
+	scheduler *scheduler
+	user      string
+	name      string
+	cfgID     *int64
 }
 
 type scheduler struct {
 	cr      *cron.Cron
-	actions map[string]*funcJob
+	actions map[string]Action
 	mu      sync.Mutex
 }
 
+// Run implements cron.Job.
+func (ua *userAction) Run() {
+	if a := ua.scheduler.action(ua.name); a != nil {
+		a(ua.user, ua.cfgID)
+	}
+}
+
 var global = scheduler{
 	cr:      cron.New(),
-	actions: make(map[string]*funcJob),
+	actions: make(map[string]Action),
+}
+
+// RegisterAction registers a named action to the global scheduler.
+func RegisterAction(name string, action Action) {
+	global.registerAction(name, action)
+}
+
+// UnregisterAction ungesiters a named action from the global scheduler.
+func UnregisterAction(name string) {
+	global.unregisterAction(name)
+}
+
+// BindAction binds a named action to a user, a cron spec and
+// an optional configuration id.
+func BindAction(name, spec, user string, cfgID *int64) error {
+	return global.bindAction(name, spec, user, cfgID)
+}
+
+// UnbindAction unbins a named action from a user and and
+// an optional configuration id.
+func UnbindAction(name, user string, cfgID *int64) error {
+	return global.unbindAction(name, user, cfgID)
+}
+
+func sameCfgID(a, b *int64) bool {
+	return (a == nil && b == nil) || (a != nil && b != nil && *a == *b)
 }
 
-func AddAction(name string, fn func()) {
-	global.addAction(name, fn)
-}
+func (s *scheduler) unbindAction(name, user string, cfgID *int64) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	entries := s.cr.Entries()
+
+	var found *userAction
+	for _, entry := range entries {
+		ua := entry.Job.(*userAction)
+		if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) {
+			// Already have such a user/action/cfg tuple  -> re-schedule.
+			found = ua
+			break
+		}
+	}
 
-func AddSchedule(spec, action string) error {
-	return global.addAction(spec, action)
+	if found == nil {
+		return ErrNoSuchAction
+	}
+
+	s.cr.Stop()
+	s.cr = cron.New()
+	for _, entry := range entries {
+		ua := entry.Job.(*userAction)
+		if ua != found {
+			s.cr.Schedule(entry.Schedule, entry.Job)
+		}
+	}
+	s.cr.Start()
+
+	return nil
 }
 
-func (s *scheduler) addSchedule(spec, action string) error {
+func (s *scheduler) bindAction(name, spec, user string, cfgID *int64) error {
+
+	schedule, err := cron.Parse(spec)
+	if err != nil {
+		return err
+	}
+
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.actions[action] == nil {
+
+	action := s.actions[name]
+	if action == nil {
 		return ErrNoSuchAction
 	}
-	// TODO: Implement me!
+
+	entries := s.cr.Entries()
+
+	var found *userAction
+	for _, entry := range entries {
+		ua := entry.Job.(*userAction)
+		if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) {
+			// Already have such a user/action/cfg tuple  -> re-schedule.
+			found = ua
+			break
+		}
+	}
+
+	if found == nil {
+		// Add to current plan.
+		job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID}
+		s.cr.Schedule(schedule, job)
+	} else {
+		// If found re-build all.
+		s.cr.Stop()
+		s.cr = cron.New()
+		for _, entry := range entries {
+			ua := entry.Job.(*userAction)
+			var sch cron.Schedule
+			if found == ua {
+				// replace with new schedule.
+				sch = schedule
+			} else {
+				sch = entry.Schedule
+			}
+			s.cr.Schedule(sch, entry.Job)
+		}
+	}
+	s.cr.Start()
+
+	return nil
 }
 
-func (s *scheduler) addAction(name string, fn func()) {
+func (s *scheduler) action(name string) Action {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.actions[name]
+}
+
+func (s *scheduler) registerAction(name string, action Action) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	s.actions[name] = &funcJob{fn: fn}
+	s.actions[name] = action
 }
+
+func (s *scheduler) unregisterAction(name string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	delete(s.actions, name)
+}