comparison pkg/misc/scheduler.go @ 1515:43a32358a016

Backend: Added a scheduler which allows to schedule the execution of functions after a specified duration. The functions are keyed so execution can be canceled before the actual execution. This is useful for configurable schedules. Futhermore this scheduler could be a building block to unify all regular schedules in the gemma server.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 06 Dec 2018 12:38:08 +0100
parents
children 08e1b38a4a8b
comparison
equal deleted inserted replaced
1514:cea6ad2be9ee 1515:43a32358a016
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13
14 package misc
15
16 import (
17 "container/heap"
18 "sync"
19 "time"
20 )
21
22 type schedulerFn struct {
23 kind interface{}
24 d time.Duration
25 fn func(*Scheduler)
26 }
27
28 type entry struct {
29 kind interface{}
30 due time.Time
31 fn func(*Scheduler)
32 }
33
34 type entries []*entry
35
36 // Scheduler manages function to be called later.
37 // They are keyed with a kind to it is possible
38 // To cancel them before they are executed.
39 type Scheduler struct {
40 notify chan struct{}
41 cancel chan interface{}
42 add chan schedulerFn
43 queue entries
44 }
45
46 var (
47 scheduler *Scheduler
48 schedulerOnce sync.Once
49 )
50
51 func startScheduler() {
52 scheduler = &Scheduler{
53 notify: make(chan struct{}),
54 cancel: make(chan interface{}),
55 add: make(chan schedulerFn),
56 }
57 go scheduler.run()
58 }
59
60 // ScheduleFunc adds a keyed function to the global scheduler
61 // to be executed after a specified duration.
62 func ScheduleFunc(key interface{}, d time.Duration, fn func(*Scheduler)) {
63 schedulerOnce.Do(startScheduler)
64 scheduler.Schedule(key, d, fn)
65 }
66
67 // Cancel cancels all functions in the global schedule waiting for
68 // execution with a given key.
69 func ScheduleCancel(key interface{}) {
70 schedulerOnce.Do(startScheduler)
71 scheduler.Cancel(key)
72 }
73
74 // Schedule schedules a keyed function to be executed after a
75 // specified duration.
76 func (s *Scheduler) Schedule(kind interface{}, d time.Duration, fn func(*Scheduler)) {
77 s.add <- schedulerFn{kind: kind, d: d, fn: fn}
78 }
79
80 // Cancel cancels all function waiting for execution with a given key.
81 func (s *Scheduler) Cancel(kind interface{}) {
82 s.cancel <- kind
83 }
84
85 func (es *entries) Len() int {
86 return len(*es)
87 }
88
89 func (es *entries) Swap(i, j int) {
90 (*es)[i], (*es)[j] = (*es)[j], (*es)[i]
91 }
92
93 func (es *entries) Less(i, j int) bool {
94 return (*es)[i].due.Before((*es)[j].due)
95 }
96
97 func (es *entries) Push(x interface{}) {
98 *es = append(*es, x.(*entry))
99 }
100
101 func (es *entries) Pop() interface{} {
102 n := len(*es) - 1
103 x := (*es)[n]
104 (*es)[n] = nil
105 *es = (*es)[:n]
106 return x
107 }
108
109 func (s *Scheduler) run() {
110 for {
111 select {
112 case sfn := <-s.add:
113 e := &entry{
114 kind: sfn.kind,
115 due: time.Now().Add(sfn.d),
116 fn: sfn.fn,
117 }
118 heap.Push(&s.queue, e)
119 go func() {
120 time.Sleep(sfn.d)
121 s.notify <- struct{}{}
122 }()
123
124 case <-s.notify:
125 for len(s.queue) > 0 && !s.queue[0].due.After(time.Now()) {
126 go func(fn func(*Scheduler)) { fn(s) }(s.queue[0].fn)
127 heap.Pop(&s.queue)
128 }
129
130 case k := <-s.cancel:
131 for i := len(s.queue) - 1; i >= 0; {
132 if s.queue[i].kind == k {
133 copy(s.queue[i:], s.queue[i+1:])
134 s.queue[len(s.queue)-1] = nil
135 s.queue = s.queue[:len(s.queue)-1]
136 }
137 }
138 heap.Init(&s.queue)
139 }
140 }
141 }