Mercurial > gemma
changeset 1541:03fcad10104a
Added an internal cron job executor which binds actions to tuples of cron schedule, user name and an optional configuation id.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 10 Dec 2018 12:35:49 +0100 |
parents | 251ee25accce |
children | 31c6c7bd6190 |
files | pkg/scheduler/scheduler.go |
diffstat | 1 files changed, 134 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/scheduler/scheduler.go Fri Dec 07 18:34:53 2018 +0100 +++ b/pkg/scheduler/scheduler.go Mon Dec 10 12:35:49 2018 +0100 @@ -20,43 +20,162 @@ "github.com/robfig/cron" ) +// ErrNoSuchAction if no fitting action was found. var ErrNoSuchAction = errors.New("No such action") -type funcJob struct { - bound bool - fn func() +// Action is called with a user and an optional configuration id. +type Action func(user string, cfgID *int64) + +type userAction struct { + scheduler *scheduler + user string + name string + cfgID *int64 } type scheduler struct { cr *cron.Cron - actions map[string]*funcJob + actions map[string]Action mu sync.Mutex } +// Run implements cron.Job. +func (ua *userAction) Run() { + if a := ua.scheduler.action(ua.name); a != nil { + a(ua.user, ua.cfgID) + } +} + var global = scheduler{ cr: cron.New(), - actions: make(map[string]*funcJob), + actions: make(map[string]Action), +} + +// RegisterAction registers a named action to the global scheduler. +func RegisterAction(name string, action Action) { + global.registerAction(name, action) +} + +// UnregisterAction ungesiters a named action from the global scheduler. +func UnregisterAction(name string) { + global.unregisterAction(name) +} + +// BindAction binds a named action to a user, a cron spec and +// an optional configuration id. +func BindAction(name, spec, user string, cfgID *int64) error { + return global.bindAction(name, spec, user, cfgID) +} + +// UnbindAction unbins a named action from a user and and +// an optional configuration id. +func UnbindAction(name, user string, cfgID *int64) error { + return global.unbindAction(name, user, cfgID) +} + +func sameCfgID(a, b *int64) bool { + return (a == nil && b == nil) || (a != nil && b != nil && *a == *b) } -func AddAction(name string, fn func()) { - global.addAction(name, fn) -} +func (s *scheduler) unbindAction(name, user string, cfgID *int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + entries := s.cr.Entries() + + var found *userAction + for _, entry := range entries { + ua := entry.Job.(*userAction) + if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) { + // Already have such a user/action/cfg tuple -> re-schedule. + found = ua + break + } + } -func AddSchedule(spec, action string) error { - return global.addAction(spec, action) + if found == nil { + return ErrNoSuchAction + } + + s.cr.Stop() + s.cr = cron.New() + for _, entry := range entries { + ua := entry.Job.(*userAction) + if ua != found { + s.cr.Schedule(entry.Schedule, entry.Job) + } + } + s.cr.Start() + + return nil } -func (s *scheduler) addSchedule(spec, action string) error { +func (s *scheduler) bindAction(name, spec, user string, cfgID *int64) error { + + schedule, err := cron.Parse(spec) + if err != nil { + return err + } + s.mu.Lock() defer s.mu.Unlock() - if s.actions[action] == nil { + + action := s.actions[name] + if action == nil { return ErrNoSuchAction } - // TODO: Implement me! + + entries := s.cr.Entries() + + var found *userAction + for _, entry := range entries { + ua := entry.Job.(*userAction) + if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) { + // Already have such a user/action/cfg tuple -> re-schedule. + found = ua + break + } + } + + if found == nil { + // Add to current plan. + job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID} + s.cr.Schedule(schedule, job) + } else { + // If found re-build all. + s.cr.Stop() + s.cr = cron.New() + for _, entry := range entries { + ua := entry.Job.(*userAction) + var sch cron.Schedule + if found == ua { + // replace with new schedule. + sch = schedule + } else { + sch = entry.Schedule + } + s.cr.Schedule(sch, entry.Job) + } + } + s.cr.Start() + + return nil } -func (s *scheduler) addAction(name string, fn func()) { +func (s *scheduler) action(name string) Action { + s.mu.Lock() + defer s.mu.Unlock() + return s.actions[name] +} + +func (s *scheduler) registerAction(name string, action Action) { s.mu.Lock() defer s.mu.Unlock() - s.actions[name] = &funcJob{fn: fn} + s.actions[name] = action } + +func (s *scheduler) unregisterAction(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.actions, name) +}