view pkg/scheduler/scheduler.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 5826d4de0e40
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package scheduler

import (
	"sort"
	"strings"
	"sync"

	cron "gopkg.in/robfig/cron.v1"

	"gemma.intevation.de/gemma/pkg/log"
)

// Action is called with a configuration id.
type Action func(cfgID int64)

type userAction struct {
	scheduler *scheduler
	name      string
	cfgID     int64
}

type scheduler struct {
	cr      *cron.Cron
	actions map[string]Action
	mu      sync.Mutex
}

// Run implements cron.Job.
func (ua *userAction) Run() {
	if a := ua.scheduler.action(ua.name); a != nil {
		a(ua.cfgID)
	} else {
		log.Warnf("scheduled action '%s' not found.", ua.name)
	}
}

var global = scheduler{
	cr:      cron.New(),
	actions: make(map[string]Action),
}

// RegisterAction registers a named action to the global scheduler.
func RegisterAction(name string, action Action) {
	global.registerAction(name, action)
}

// UnregisterAction ungesiters a named action from the global scheduler.
func UnregisterAction(name string) {
	global.unregisterAction(name)
}

// BoundAction is a complete set of infos for
// an action to be bound to a schedule and
// configuration id.
type BoundAction struct {
	Name  string
	Spec  string
	CfgID int64
}

// BootActions setup the global scheduler with a set
// of bound actions delivered by the next function.
func BootActions(next func(*BoundAction) (bool, error)) error {
	return global.bootActions(next)
}

func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error {

	cr := cron.New()

	var numJobs int

	for {
		var ba BoundAction
		ok, err := next(&ba)
		if err != nil {
			return err
		}
		if !ok {
			break
		}
		schedule, err := cron.Parse(ba.Spec)
		if err != nil {
			return err
		}
		job := &userAction{
			scheduler: s,
			name:      ba.Name,
			cfgID:     ba.CfgID,
		}
		cr.Schedule(schedule, job)
		numJobs++
	}

	log.Infof("booting %d scheduler jobs from database.\n", numJobs)

	s.mu.Lock()
	defer s.mu.Unlock()

	s.cr.Stop()
	s.cr = cr
	cr.Start()

	return nil
}

// BindAction binds a named action to a cron spec and
// a configuration id.
func BindAction(name, spec string, cfgID int64) error {
	return global.bindAction(name, spec, cfgID)
}

// UnbindAction unbinds a named action from a user and
// a configuration id.
func UnbindAction(name string, cfgID int64) {
	global.unbindAction(name, cfgID)
}

// UnbindByID unbinds all schedules with a given id.
func UnbindByID(cfgID int64) {
	global.unbindByID(cfgID)
}

// UnbindByIDs unbinds all schedules for a given user.
func UnbindByIDs(ids map[int64]struct{}) {
	global.unbindByIDs(ids)
}

// HasAction asks if there is an action with a given name.
func HasAction(name string) bool {
	return global.hasAction(name)
}

func (s *scheduler) hasAction(name string) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name] != nil
}

func (s *scheduler) unbindByIDs(ids map[int64]struct{}) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	if len(entries) == 0 {
		return
	}

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if _, found = ids[ua.cfgID]; found {
			break
		}
	}
	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if _, found := ids[ua.cfgID]; !found {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindByID(cfgID int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID == cfgID {
			found = true
			break
		}
	}

	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID != cfgID {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindAction(name string, cfgID int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && cfgID == ua.cfgID {
			// Already have such a action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua != found {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) bindAction(name, spec string, cfgID int64) error {

	schedule, err := cron.Parse(spec)
	if err != nil {
		return err
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && cfgID == ua.cfgID {
			// Already have such a user/action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		// Add to current plan.
		job := &userAction{scheduler: s, name: name, cfgID: cfgID}
		s.cr.Schedule(schedule, job)
	} else {
		// If found re-build all.
		s.cr.Stop()
		s.cr = cron.New()
		for _, entry := range entries {
			ua := entry.Job.(*userAction)
			var sch cron.Schedule
			if found == ua {
				// replace with new schedule.
				sch = schedule
			} else {
				sch = entry.Schedule
			}
			s.cr.Schedule(sch, entry.Job)
		}
	}
	s.cr.Start()

	return nil
}

func (s *scheduler) action(name string) Action {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name]
}

// LogActionNames logs a sorted list of registered scheduler functions.
func LogActionNames() {
	names := global.actionNames()
	sort.Strings(names)
	log.Infof("actions registered to scheduler: %s\n",
		strings.Join(names, ", "))
}

func (s *scheduler) actionNames() []string {
	s.mu.Lock()
	defer s.mu.Unlock()
	names := make([]string, len(s.actions))
	var i int
	for k := range s.actions {
		names[i] = k
		i++
	}
	return names
}

func (s *scheduler) registerAction(name string, action Action) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.actions[name] = action
}

func (s *scheduler) unregisterAction(name string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.actions, name)
}