Mercurial > gemma
changeset 1515:43a32358a016
Backend: Added a scheduler which allows to schedule the execution of functions
after a specified duration. The functions are keyed so execution can
be canceled before the actual execution.
This is useful for configurable schedules.
Futhermore this scheduler could be a building block to unify all regular
schedules in the gemma server.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 06 Dec 2018 12:38:08 +0100 |
parents | cea6ad2be9ee |
children | 6b3756676bbe |
files | pkg/misc/scheduler.go |
diffstat | 1 files changed, 141 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/misc/scheduler.go Thu Dec 06 12:38:08 2018 +0100 @@ -0,0 +1,141 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package misc + +import ( + "container/heap" + "sync" + "time" +) + +type schedulerFn struct { + kind interface{} + d time.Duration + fn func(*Scheduler) +} + +type entry struct { + kind interface{} + due time.Time + fn func(*Scheduler) +} + +type entries []*entry + +// Scheduler manages function to be called later. +// They are keyed with a kind to it is possible +// To cancel them before they are executed. +type Scheduler struct { + notify chan struct{} + cancel chan interface{} + add chan schedulerFn + queue entries +} + +var ( + scheduler *Scheduler + schedulerOnce sync.Once +) + +func startScheduler() { + scheduler = &Scheduler{ + notify: make(chan struct{}), + cancel: make(chan interface{}), + add: make(chan schedulerFn), + } + go scheduler.run() +} + +// ScheduleFunc adds a keyed function to the global scheduler +// to be executed after a specified duration. +func ScheduleFunc(key interface{}, d time.Duration, fn func(*Scheduler)) { + schedulerOnce.Do(startScheduler) + scheduler.Schedule(key, d, fn) +} + +// Cancel cancels all functions in the global schedule waiting for +// execution with a given key. +func ScheduleCancel(key interface{}) { + schedulerOnce.Do(startScheduler) + scheduler.Cancel(key) +} + +// Schedule schedules a keyed function to be executed after a +// specified duration. +func (s *Scheduler) Schedule(kind interface{}, d time.Duration, fn func(*Scheduler)) { + s.add <- schedulerFn{kind: kind, d: d, fn: fn} +} + +// Cancel cancels all function waiting for execution with a given key. +func (s *Scheduler) Cancel(kind interface{}) { + s.cancel <- kind +} + +func (es *entries) Len() int { + return len(*es) +} + +func (es *entries) Swap(i, j int) { + (*es)[i], (*es)[j] = (*es)[j], (*es)[i] +} + +func (es *entries) Less(i, j int) bool { + return (*es)[i].due.Before((*es)[j].due) +} + +func (es *entries) Push(x interface{}) { + *es = append(*es, x.(*entry)) +} + +func (es *entries) Pop() interface{} { + n := len(*es) - 1 + x := (*es)[n] + (*es)[n] = nil + *es = (*es)[:n] + return x +} + +func (s *Scheduler) run() { + for { + select { + case sfn := <-s.add: + e := &entry{ + kind: sfn.kind, + due: time.Now().Add(sfn.d), + fn: sfn.fn, + } + heap.Push(&s.queue, e) + go func() { + time.Sleep(sfn.d) + s.notify <- struct{}{} + }() + + case <-s.notify: + for len(s.queue) > 0 && !s.queue[0].due.After(time.Now()) { + go func(fn func(*Scheduler)) { fn(s) }(s.queue[0].fn) + heap.Pop(&s.queue) + } + + case k := <-s.cancel: + for i := len(s.queue) - 1; i >= 0; { + if s.queue[i].kind == k { + copy(s.queue[i:], s.queue[i+1:]) + s.queue[len(s.queue)-1] = nil + s.queue = s.queue[:len(s.queue)-1] + } + } + heap.Init(&s.queue) + } + } +}