view pkg/scheduler/scheduler.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package scheduler

import (
	"sort"
	"strings"
	"sync"

	cron "gopkg.in/robfig/cron.v1"

	"gemma.intevation.de/gemma/pkg/log"
)

// Action is called with a configuration id.
type Action func(cfgID int64)

type userAction struct {
	scheduler *scheduler
	name      string
	cfgID     int64
}

type scheduler struct {
	cr      *cron.Cron
	actions map[string]Action
	mu      sync.Mutex
}

// Run implements cron.Job.
func (ua *userAction) Run() {
	if a := ua.scheduler.action(ua.name); a != nil {
		a(ua.cfgID)
	} else {
		log.Warnf("scheduled action '%s' not found.", ua.name)
	}
}

var global = scheduler{
	cr:      cron.New(),
	actions: make(map[string]Action),
}

// RegisterAction registers a named action to the global scheduler.
func RegisterAction(name string, action Action) {
	global.registerAction(name, action)
}

// UnregisterAction ungesiters a named action from the global scheduler.
func UnregisterAction(name string) {
	global.unregisterAction(name)
}

// BoundAction is a complete set of infos for
// an action to be bound to a schedule and
// configuration id.
type BoundAction struct {
	Name  string
	Spec  string
	CfgID int64
}

// BootActions setup the global scheduler with a set
// of bound actions delivered by the next function.
func BootActions(next func(*BoundAction) (bool, error)) error {
	return global.bootActions(next)
}

func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error {

	cr := cron.New()

	var numJobs int

	for {
		var ba BoundAction
		ok, err := next(&ba)
		if err != nil {
			return err
		}
		if !ok {
			break
		}
		schedule, err := cron.Parse(ba.Spec)
		if err != nil {
			return err
		}
		job := &userAction{
			scheduler: s,
			name:      ba.Name,
			cfgID:     ba.CfgID,
		}
		cr.Schedule(schedule, job)
		numJobs++
	}

	log.Infof("booting %d scheduler jobs from database.\n", numJobs)

	s.mu.Lock()
	defer s.mu.Unlock()

	s.cr.Stop()
	s.cr = cr
	cr.Start()

	return nil
}

// BindAction binds a named action to a cron spec and
// a configuration id.
func BindAction(name, spec string, cfgID int64) error {
	return global.bindAction(name, spec, cfgID)
}

// UnbindAction unbinds a named action from a user and
// a configuration id.
func UnbindAction(name string, cfgID int64) {
	global.unbindAction(name, cfgID)
}

// UnbindByID unbinds all schedules with a given id.
func UnbindByID(cfgID int64) {
	global.unbindByID(cfgID)
}

// UnbindByIDs unbinds all schedules for a given user.
func UnbindByIDs(ids map[int64]struct{}) {
	global.unbindByIDs(ids)
}

// HasAction asks if there is an action with a given name.
func HasAction(name string) bool {
	return global.hasAction(name)
}

func (s *scheduler) hasAction(name string) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name] != nil
}

func (s *scheduler) unbindByIDs(ids map[int64]struct{}) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	if len(entries) == 0 {
		return
	}

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if _, found = ids[ua.cfgID]; found {
			break
		}
	}
	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if _, found := ids[ua.cfgID]; !found {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindByID(cfgID int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found bool
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID == cfgID {
			found = true
			break
		}
	}

	if !found {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.cfgID != cfgID {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) unbindAction(name string, cfgID int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && cfgID == ua.cfgID {
			// Already have such a action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		return
	}

	s.cr.Stop()
	s.cr = cron.New()
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua != found {
			s.cr.Schedule(entry.Schedule, entry.Job)
		}
	}
	s.cr.Start()
}

func (s *scheduler) bindAction(name, spec string, cfgID int64) error {

	schedule, err := cron.Parse(spec)
	if err != nil {
		return err
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	entries := s.cr.Entries()

	var found *userAction
	for _, entry := range entries {
		ua := entry.Job.(*userAction)
		if ua.name == name && cfgID == ua.cfgID {
			// Already have such a user/action/cfg tuple  -> re-schedule.
			found = ua
			break
		}
	}

	if found == nil {
		// Add to current plan.
		job := &userAction{scheduler: s, name: name, cfgID: cfgID}
		s.cr.Schedule(schedule, job)
	} else {
		// If found re-build all.
		s.cr.Stop()
		s.cr = cron.New()
		for _, entry := range entries {
			ua := entry.Job.(*userAction)
			var sch cron.Schedule
			if found == ua {
				// replace with new schedule.
				sch = schedule
			} else {
				sch = entry.Schedule
			}
			s.cr.Schedule(sch, entry.Job)
		}
	}
	s.cr.Start()

	return nil
}

func (s *scheduler) action(name string) Action {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.actions[name]
}

// LogActionNames logs a sorted list of registered scheduler functions.
func LogActionNames() {
	names := global.actionNames()
	sort.Strings(names)
	log.Infof("actions registered to scheduler: %s\n",
		strings.Join(names, ", "))
}

func (s *scheduler) actionNames() []string {
	s.mu.Lock()
	defer s.mu.Unlock()
	names := make([]string, len(s.actions))
	var i int
	for k := range s.actions {
		names[i] = k
		i++
	}
	return names
}

func (s *scheduler) registerAction(name string, action Action) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.actions[name] = action
}

func (s *scheduler) unregisterAction(name string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.actions, name)
}