Mercurial > gemma
view pkg/scheduler/scheduler.go @ 1547:d4b7a6d054cd
Add table to store import configuration.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 10 Dec 2018 15:55:10 +0100 |
parents | 5f80ec319a4f |
children | fe633765e05b |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package scheduler import ( "errors" "sync" "github.com/robfig/cron" ) // ErrNoSuchAction if no fitting action was found. var ErrNoSuchAction = errors.New("No such action") // Action is called with a user and an optional configuration id. type Action func(user string, cfgID *int64) type userAction struct { scheduler *scheduler user string name string cfgID *int64 } type scheduler struct { cr *cron.Cron actions map[string]Action mu sync.Mutex } // Run implements cron.Job. func (ua *userAction) Run() { if a := ua.scheduler.action(ua.name); a != nil { a(ua.user, ua.cfgID) } } var global = scheduler{ cr: cron.New(), actions: make(map[string]Action), } // RegisterAction registers a named action to the global scheduler. func RegisterAction(name string, action Action) { global.registerAction(name, action) } // UnregisterAction ungesiters a named action from the global scheduler. func UnregisterAction(name string) { global.unregisterAction(name) } // BindAction binds a named action to a user, a cron spec and // an optional configuration id. func BindAction(name, spec, user string, cfgID *int64) error { return global.bindAction(name, spec, user, cfgID) } // UnbindAction unbins a named action from a user and and // an optional configuration id. func UnbindAction(name, user string, cfgID *int64) error { return global.unbindAction(name, user, cfgID) } func UnbindUser(user string) { global.unbindUser(user) } func sameCfgID(a, b *int64) bool { return (a == nil && b == nil) || (a != nil && b != nil && *a == *b) } func (s *scheduler) unbindUser(user string) { s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() if len(entries) == 0 { return } var found bool for _, entry := range entries { ua := entry.Job.(*userAction) if ua.user == user { found = true break } } if !found { return } s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) if ua.user != user { s.cr.Schedule(entry.Schedule, entry.Job) } } s.cr.Start() } func (s *scheduler) unbindAction(name, user string, cfgID *int64) error { s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) { // Already have such a user/action/cfg tuple -> re-schedule. found = ua break } } if found == nil { return ErrNoSuchAction } s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) if ua != found { s.cr.Schedule(entry.Schedule, entry.Job) } } s.cr.Start() return nil } func (s *scheduler) bindAction(name, spec, user string, cfgID *int64) error { schedule, err := cron.Parse(spec) if err != nil { return err } s.mu.Lock() defer s.mu.Unlock() action := s.actions[name] if action == nil { return ErrNoSuchAction } entries := s.cr.Entries() var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) { // Already have such a user/action/cfg tuple -> re-schedule. found = ua break } } if found == nil { // Add to current plan. job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID} s.cr.Schedule(schedule, job) } else { // If found re-build all. s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) var sch cron.Schedule if found == ua { // replace with new schedule. sch = schedule } else { sch = entry.Schedule } s.cr.Schedule(sch, entry.Job) } } s.cr.Start() return nil } func (s *scheduler) action(name string) Action { s.mu.Lock() defer s.mu.Unlock() return s.actions[name] } func (s *scheduler) registerAction(name string, action Action) { s.mu.Lock() defer s.mu.Unlock() s.actions[name] = action } func (s *scheduler) unregisterAction(name string) { s.mu.Lock() defer s.mu.Unlock() delete(s.actions, name) }