Mercurial > gemma
view pkg/scheduler/scheduler.go @ 4645:946689a56fc2
Forgot a bbox check when evaluating STRTree speeding it significantly.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 13 Oct 2019 16:01:06 +0200 |
parents | 5826d4de0e40 |
children | 5f47eeea988d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package scheduler import ( "log" "sort" "strings" "sync" cron "gopkg.in/robfig/cron.v1" ) // Action is called with a configuration id. type Action func(cfgID int64) type userAction struct { scheduler *scheduler name string cfgID int64 } type scheduler struct { cr *cron.Cron actions map[string]Action mu sync.Mutex } // Run implements cron.Job. func (ua *userAction) Run() { if a := ua.scheduler.action(ua.name); a != nil { a(ua.cfgID) } else { log.Printf("warn: scheduled action '%s' not found.", ua.name) } } var global = scheduler{ cr: cron.New(), actions: make(map[string]Action), } // RegisterAction registers a named action to the global scheduler. func RegisterAction(name string, action Action) { global.registerAction(name, action) } // UnregisterAction ungesiters a named action from the global scheduler. func UnregisterAction(name string) { global.unregisterAction(name) } // BoundAction is a complete set of infos for // an action to be bound to a schedule and // configuration id. type BoundAction struct { Name string Spec string CfgID int64 } // BootActions setup the global scheduler with a set // of bound actions delivered by the next function. func BootActions(next func(*BoundAction) (bool, error)) error { return global.bootActions(next) } func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error { cr := cron.New() var numJobs int for { var ba BoundAction ok, err := next(&ba) if err != nil { return err } if !ok { break } schedule, err := cron.Parse(ba.Spec) if err != nil { return err } job := &userAction{ scheduler: s, name: ba.Name, cfgID: ba.CfgID, } cr.Schedule(schedule, job) numJobs++ } log.Printf("info: booting %d scheduler jobs from database.\n", numJobs) s.mu.Lock() defer s.mu.Unlock() s.cr.Stop() s.cr = cr cr.Start() return nil } // BindAction binds a named action to a cron spec and // a configuration id. func BindAction(name, spec string, cfgID int64) error { return global.bindAction(name, spec, cfgID) } // UnbindAction unbinds a named action from a user and // a configuration id. func UnbindAction(name string, cfgID int64) { global.unbindAction(name, cfgID) } // UnbindByID unbinds all schedules with a given id. func UnbindByID(cfgID int64) { global.unbindByID(cfgID) } // UnbindByIDs unbinds all schedules for a given user. func UnbindByIDs(ids map[int64]struct{}) { global.unbindByIDs(ids) } // HasAction asks if there is an action with a given name. func HasAction(name string) bool { return global.hasAction(name) } func (s *scheduler) hasAction(name string) bool { s.mu.Lock() defer s.mu.Unlock() return s.actions[name] != nil } func (s *scheduler) unbindByIDs(ids map[int64]struct{}) { s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() if len(entries) == 0 { return } var found bool for _, entry := range entries { ua := entry.Job.(*userAction) if _, found = ids[ua.cfgID]; found { break } } if !found { return } s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) if _, found := ids[ua.cfgID]; !found { s.cr.Schedule(entry.Schedule, entry.Job) } } s.cr.Start() } func (s *scheduler) unbindByID(cfgID int64) { s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() var found bool for _, entry := range entries { ua := entry.Job.(*userAction) if ua.cfgID == cfgID { found = true break } } if !found { return } s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) if ua.cfgID != cfgID { s.cr.Schedule(entry.Schedule, entry.Job) } } s.cr.Start() } func (s *scheduler) unbindAction(name string, cfgID int64) { s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) if ua.name == name && cfgID == ua.cfgID { // Already have such a action/cfg tuple -> re-schedule. found = ua break } } if found == nil { return } s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) if ua != found { s.cr.Schedule(entry.Schedule, entry.Job) } } s.cr.Start() } func (s *scheduler) bindAction(name, spec string, cfgID int64) error { schedule, err := cron.Parse(spec) if err != nil { return err } s.mu.Lock() defer s.mu.Unlock() entries := s.cr.Entries() var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) if ua.name == name && cfgID == ua.cfgID { // Already have such a user/action/cfg tuple -> re-schedule. found = ua break } } if found == nil { // Add to current plan. job := &userAction{scheduler: s, name: name, cfgID: cfgID} s.cr.Schedule(schedule, job) } else { // If found re-build all. s.cr.Stop() s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) var sch cron.Schedule if found == ua { // replace with new schedule. sch = schedule } else { sch = entry.Schedule } s.cr.Schedule(sch, entry.Job) } } s.cr.Start() return nil } func (s *scheduler) action(name string) Action { s.mu.Lock() defer s.mu.Unlock() return s.actions[name] } // LogActionNames logs a sorted list of registered scheduler functions. func LogActionNames() { names := global.actionNames() sort.Strings(names) log.Printf("info: actions registered to scheduler: %s\n", strings.Join(names, ", ")) } func (s *scheduler) actionNames() []string { s.mu.Lock() defer s.mu.Unlock() names := make([]string, len(s.actions)) var i int for k := range s.actions { names[i] = k i++ } return names } func (s *scheduler) registerAction(name string, action Action) { s.mu.Lock() defer s.mu.Unlock() s.actions[name] = action } func (s *scheduler) unregisterAction(name string) { s.mu.Lock() defer s.mu.Unlock() delete(s.actions, name) }