Mercurial > gemma
view pkg/misc/scheduler.go @ 1528:5874cedd7f91
Sounding result import: Accept *.txt files for XYZ data, too.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 07 Dec 2018 12:21:55 +0100 |
parents | 08e1b38a4a8b |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package misc import ( "container/heap" "sync" "time" ) type schedulerFn struct { kind interface{} d time.Duration fn func(*Scheduler) } type entry struct { kind interface{} due time.Time fn func(*Scheduler) } type entries []*entry // Scheduler manages function to be called later. // They are keyed with a kind to it is possible // To cancel them before they are executed. type Scheduler struct { notify chan struct{} cancel chan interface{} add chan schedulerFn queue entries } var ( scheduler *Scheduler schedulerOnce sync.Once ) func startScheduler() { scheduler = &Scheduler{ notify: make(chan struct{}), cancel: make(chan interface{}), add: make(chan schedulerFn), } go scheduler.run() } // ScheduleFunc adds a keyed function to the global scheduler // to be executed after a specified duration. func ScheduleFunc(key interface{}, d time.Duration, fn func(*Scheduler)) { schedulerOnce.Do(startScheduler) scheduler.Schedule(key, d, fn) } // Cancel cancels all functions in the global schedule waiting for // execution with a given key. func ScheduleCancel(key interface{}) { schedulerOnce.Do(startScheduler) scheduler.Cancel(key) } // Schedule schedules a keyed function to be executed after a // specified duration. func (s *Scheduler) Schedule(kind interface{}, d time.Duration, fn func(*Scheduler)) { s.add <- schedulerFn{kind: kind, d: d, fn: fn} } // Cancel cancels all function waiting for execution with a given key. func (s *Scheduler) Cancel(kind interface{}) { s.cancel <- kind } func (es *entries) Len() int { return len(*es) } func (es *entries) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } func (es *entries) Less(i, j int) bool { return (*es)[i].due.Before((*es)[j].due) } func (es *entries) Push(x interface{}) { *es = append(*es, x.(*entry)) } func (es *entries) Pop() interface{} { n := len(*es) - 1 x := (*es)[n] (*es)[n] = nil *es = (*es)[:n] return x } func (s *Scheduler) run() { for { select { case sfn := <-s.add: e := &entry{ kind: sfn.kind, due: time.Now().Add(sfn.d), fn: sfn.fn, } heap.Push(&s.queue, e) go func() { time.Sleep(sfn.d) s.notify <- struct{}{} }() case <-s.notify: for len(s.queue) > 0 && !s.queue[0].due.After(time.Now()) { go func(fn func(*Scheduler)) { fn(s) }(s.queue[0].fn) heap.Pop(&s.queue) } case k := <-s.cancel: for i := len(s.queue) - 1; i >= 0; { if s.queue[i].kind == k { n := len(s.queue) - 1 if i < n { copy(s.queue[i:], s.queue[i+1:]) } s.queue[n] = nil s.queue = s.queue[:n] } } heap.Init(&s.queue) } } }