view pkg/scheduler/scheduler.go @ 1590:2fdd8e57542d

Added DELETE /imports/scheduler/{id:[0-9]+} to delete a schedule from the scheduler.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 14 Dec 2018 12:34:12 +0100
parents 62171cd9a42b
children f39957ea08aa
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package scheduler

import (
	"errors"
	"log"
	"sync"

	"github.com/robfig/cron"
)

// ErrNoSuchAction if no fitting action was found.
var ErrNoSuchAction = errors.New("No such action")

// Action is called with a user and an optional configuration id.
type Action func(user string, cfgID *int64)

type userAction struct {
	scheduler *scheduler
	user      string
	name      string
	cfgID     *int64
}

type scheduler struct {
	cr      *cron.Cron
	actions map[string]Action
	mu      sync.Mutex
}

// Run implements cron.Job.
func (ua *userAction) Run() {
	if a := ua.scheduler.action(ua.name); a != nil {
		a(ua.user, ua.cfgID)
	} else {
		log.Printf("warn: scheduled action '%s' not found.", ua.name)
	}
}

var global = scheduler{
	cr:      cron.New(),
	actions: make(map[string]Action),
}

// RegisterAction registers a named action to the global scheduler.
func RegisterAction(name string, action Action) {
	global.registerAction(name, action)
}

// UnregisterAction ungesiters a named action from the global scheduler.
func UnregisterAction(name string) {
	global.unregisterAction(name)
}

// BoundAction is a complete set of infos for
// an action to be bound to a user, schedule and
// optional configuration id.
type BoundAction struct {
	Name  string
	Spec  string
	User  string
	CfgID *int64
}

// BootActions setup the global scheduler with a set
// of bound actions delivered by the next function.
func BootActions(next func(*BoundAction) (bool, error)) error {
	return global.bootActions(next)
}

func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error {

	cr := cron.New()

	for {
		var ba BoundAction
		ok, err := next(&ba)
		if err != nil {
			return err
		}
		if !ok {
			break
		}
		schedule, err := cron.Parse(ba.Spec)
		if err != nil {
			return err
		}
		job := &userAction{
			scheduler: s,
			user:      ba.User,
			name:      ba.Name,
			cfgID:     ba.CfgID,
		}
		cr.Schedule(schedule, job)
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	s.cr.Stop()
	s.cr = cr
	cr.Start()

	return nil
}

// BindAction binds a named action to a user, a cron spec and
// an optional configuration id.
func BindAction(name, spec, user string, cfgID *int64) error {
	return global.bindAction(name, spec, user, cfgID)
}

// UnbindAction unbinds a named action from a user and
// an optional configuration id.
func UnbindAction(name, user string, cfgID *int64) {
	global.unbindAction(name, user, cfgID)
}

// UnbindByID unbinds all schedules with a given id.
func UnbindByID(cfgID int64) {
	global.unbindByID(cfgID)
}

// UnbindUser unbinds all schedules for a given user.
func UnbindUser(user string) {
	global.unbindUser(user)
}

// HasAction asks if there is an action with a given name.
func HasAction(name string) bool {
	return global.hasAction(name)
}

func (s *scheduler) hasAction(name string) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name] != nil
}

func sameCfgID(a, b *int64) bool {
	return (a == nil && b == nil) || (a != nil && b != nil && *a == *b)
}

func (s *scheduler) unbindUser(user string) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	if len(entries) == 0 {
		return
	}

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.user == user {
			found = true
			break
		}
	}
	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.user != user {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindByID(cfgID int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID != nil && *ua.cfgID == cfgID {
			found = true
			break
		}
	}

	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID == nil || *ua.cfgID != cfgID {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindAction(name, user string, cfgID *int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) {
			// Already have such a user/action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua != found {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) bindAction(name, spec, user string, cfgID *int64) error {

	schedule, err := cron.Parse(spec)
	if err != nil {
		return err
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && ua.user == user && sameCfgID(cfgID, ua.cfgID) {
			// Already have such a user/action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		// Add to current plan.
		job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID}
		s.cr.Schedule(schedule, job)
	} else {
		// If found re-build all.
		s.cr.Stop()
		s.cr = cron.New()
		for _, entry := range entries {
			ua := entry.Job.(*userAction)
			var sch cron.Schedule
			if found == ua {
				// replace with new schedule.
				sch = schedule
			} else {
				sch = entry.Schedule
			}
			s.cr.Schedule(sch, entry.Job)
		}
	}
	s.cr.Start()

	return nil
}

func (s *scheduler) action(name string) Action {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name]
}

func (s *scheduler) registerAction(name string, action Action) {
	log.Printf("info: register action '%s' in scheduler.", name)
	s.mu.Lock()
	defer s.mu.Unlock()
	s.actions[name] = action
}

func (s *scheduler) unregisterAction(name string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.actions, name)
}