view pkg/misc/scheduler.go @ 1528:5874cedd7f91

Sounding result import: Accept *.txt files for XYZ data, too.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 07 Dec 2018 12:21:55 +0100
parents 08e1b38a4a8b
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package misc

import (
	"container/heap"
	"sync"
	"time"
)

type schedulerFn struct {
	kind interface{}
	d    time.Duration
	fn   func(*Scheduler)
}

type entry struct {
	kind interface{}
	due  time.Time
	fn   func(*Scheduler)
}

type entries []*entry

// Scheduler manages function to be called later.
// They are keyed with a kind to it is possible
// To cancel them before they are executed.
type Scheduler struct {
	notify chan struct{}
	cancel chan interface{}
	add    chan schedulerFn
	queue  entries
}

var (
	scheduler     *Scheduler
	schedulerOnce sync.Once
)

func startScheduler() {
	scheduler = &Scheduler{
		notify: make(chan struct{}),
		cancel: make(chan interface{}),
		add:    make(chan schedulerFn),
	}
	go scheduler.run()
}

// ScheduleFunc adds a keyed function to the global scheduler
// to be executed after a specified duration.
func ScheduleFunc(key interface{}, d time.Duration, fn func(*Scheduler)) {
	schedulerOnce.Do(startScheduler)
	scheduler.Schedule(key, d, fn)
}

// Cancel cancels all functions in the global schedule waiting for
// execution with a given key.
func ScheduleCancel(key interface{}) {
	schedulerOnce.Do(startScheduler)
	scheduler.Cancel(key)
}

// Schedule schedules a keyed function to be executed after a
// specified duration.
func (s *Scheduler) Schedule(kind interface{}, d time.Duration, fn func(*Scheduler)) {
	s.add <- schedulerFn{kind: kind, d: d, fn: fn}
}

// Cancel cancels all function waiting for execution with a given key.
func (s *Scheduler) Cancel(kind interface{}) {
	s.cancel <- kind
}

func (es *entries) Len() int {
	return len(*es)
}

func (es *entries) Swap(i, j int) {
	(*es)[i], (*es)[j] = (*es)[j], (*es)[i]
}

func (es *entries) Less(i, j int) bool {
	return (*es)[i].due.Before((*es)[j].due)
}

func (es *entries) Push(x interface{}) {
	*es = append(*es, x.(*entry))
}

func (es *entries) Pop() interface{} {
	n := len(*es) - 1
	x := (*es)[n]
	(*es)[n] = nil
	*es = (*es)[:n]
	return x
}

func (s *Scheduler) run() {
	for {
		select {
		case sfn := <-s.add:
			e := &entry{
				kind: sfn.kind,
				due:  time.Now().Add(sfn.d),
				fn:   sfn.fn,
			}
			heap.Push(&s.queue, e)
			go func() {
				time.Sleep(sfn.d)
				s.notify <- struct{}{}
			}()

		case <-s.notify:
			for len(s.queue) > 0 && !s.queue[0].due.After(time.Now()) {
				go func(fn func(*Scheduler)) { fn(s) }(s.queue[0].fn)
				heap.Pop(&s.queue)
			}

		case k := <-s.cancel:
			for i := len(s.queue) - 1; i >= 0; {
				if s.queue[i].kind == k {
					n := len(s.queue) - 1
					if i < n {
						copy(s.queue[i:], s.queue[i+1:])
					}
					s.queue[n] = nil
					s.queue = s.queue[:n]
				}
			}
			heap.Init(&s.queue)
		}
	}
}