changeset 1515:43a32358a016

Backend: Added a scheduler which allows to schedule the execution of functions after a specified duration. The functions are keyed so execution can be canceled before the actual execution. This is useful for configurable schedules. Futhermore this scheduler could be a building block to unify all regular schedules in the gemma server.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 06 Dec 2018 12:38:08 +0100
parents cea6ad2be9ee
children 6b3756676bbe
files pkg/misc/scheduler.go
diffstat 1 files changed, 141 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/misc/scheduler.go	Thu Dec 06 12:38:08 2018 +0100
@@ -0,0 +1,141 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package misc
+
+import (
+	"container/heap"
+	"sync"
+	"time"
+)
+
+type schedulerFn struct {
+	kind interface{}
+	d    time.Duration
+	fn   func(*Scheduler)
+}
+
+type entry struct {
+	kind interface{}
+	due  time.Time
+	fn   func(*Scheduler)
+}
+
+type entries []*entry
+
+// Scheduler manages function to be called later.
+// They are keyed with a kind to it is possible
+// To cancel them before they are executed.
+type Scheduler struct {
+	notify chan struct{}
+	cancel chan interface{}
+	add    chan schedulerFn
+	queue  entries
+}
+
+var (
+	scheduler     *Scheduler
+	schedulerOnce sync.Once
+)
+
+func startScheduler() {
+	scheduler = &Scheduler{
+		notify: make(chan struct{}),
+		cancel: make(chan interface{}),
+		add:    make(chan schedulerFn),
+	}
+	go scheduler.run()
+}
+
+// ScheduleFunc adds a keyed function to the global scheduler
+// to be executed after a specified duration.
+func ScheduleFunc(key interface{}, d time.Duration, fn func(*Scheduler)) {
+	schedulerOnce.Do(startScheduler)
+	scheduler.Schedule(key, d, fn)
+}
+
+// Cancel cancels all functions in the global schedule waiting for
+// execution with a given key.
+func ScheduleCancel(key interface{}) {
+	schedulerOnce.Do(startScheduler)
+	scheduler.Cancel(key)
+}
+
+// Schedule schedules a keyed function to be executed after a
+// specified duration.
+func (s *Scheduler) Schedule(kind interface{}, d time.Duration, fn func(*Scheduler)) {
+	s.add <- schedulerFn{kind: kind, d: d, fn: fn}
+}
+
+// Cancel cancels all function waiting for execution with a given key.
+func (s *Scheduler) Cancel(kind interface{}) {
+	s.cancel <- kind
+}
+
+func (es *entries) Len() int {
+	return len(*es)
+}
+
+func (es *entries) Swap(i, j int) {
+	(*es)[i], (*es)[j] = (*es)[j], (*es)[i]
+}
+
+func (es *entries) Less(i, j int) bool {
+	return (*es)[i].due.Before((*es)[j].due)
+}
+
+func (es *entries) Push(x interface{}) {
+	*es = append(*es, x.(*entry))
+}
+
+func (es *entries) Pop() interface{} {
+	n := len(*es) - 1
+	x := (*es)[n]
+	(*es)[n] = nil
+	*es = (*es)[:n]
+	return x
+}
+
+func (s *Scheduler) run() {
+	for {
+		select {
+		case sfn := <-s.add:
+			e := &entry{
+				kind: sfn.kind,
+				due:  time.Now().Add(sfn.d),
+				fn:   sfn.fn,
+			}
+			heap.Push(&s.queue, e)
+			go func() {
+				time.Sleep(sfn.d)
+				s.notify <- struct{}{}
+			}()
+
+		case <-s.notify:
+			for len(s.queue) > 0 && !s.queue[0].due.After(time.Now()) {
+				go func(fn func(*Scheduler)) { fn(s) }(s.queue[0].fn)
+				heap.Pop(&s.queue)
+			}
+
+		case k := <-s.cancel:
+			for i := len(s.queue) - 1; i >= 0; {
+				if s.queue[i].kind == k {
+					copy(s.queue[i:], s.queue[i+1:])
+					s.queue[len(s.queue)-1] = nil
+					s.queue = s.queue[:len(s.queue)-1]
+				}
+			}
+			heap.Init(&s.queue)
+		}
+	}
+}