Mercurial > gemma
changeset 1557:62171cd9a42b
Import scheduler: Start scheduler a gemma boot time with configurations from database which have a schedule.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 11 Dec 2018 22:59:10 +0100 |
parents | 6869eb94ead2 |
children | 0ded4c56978e |
files | pkg/scheduler/boot.go pkg/scheduler/scheduler.go |
diffstat | 2 files changed, 129 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/scheduler/boot.go Tue Dec 11 22:59:10 2018 +0100 @@ -0,0 +1,77 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package scheduler + +import ( + "context" + "database/sql" + "log" + + "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/config" +) + +const ( + bootRole = "sys_admin" + + selectImportConfSQL = ` +SELECT id, username, kind, cron +FROM waterway.import_configuration +WHERE cron IS NOT NULL` +) + +func init() { go boot() } + +// boot starts the scheduler with the configurations from +// the database which have a schedule. +func boot() { + config.WaitReady() + log.Println("info: booting scheduler from database.") + ctx := context.Background() + err := auth.RunAs( + ctx, bootRole, + func(conn *sql.Conn) error { + rows, err := conn.QueryContext(ctx, selectImportConfSQL) + if err != nil { + return err + } + defer rows.Close() + err = BootActions(func(ba *BoundAction) (bool, error) { + if err != nil { + return false, err + } + if !rows.Next() { + return false, nil + } + var id int64 + if err = rows.Scan( + &id, + &ba.User, + &ba.Name, + &ba.Spec, + ); err != nil { + return false, err + } + ba.CfgID = &id + return true, nil + }) + if err != nil { + return err + } + return rows.Err() + }) + if err != nil { + log.Printf("error: %v\n", err) + } +}
--- a/pkg/scheduler/scheduler.go Tue Dec 11 20:47:11 2018 +0100 +++ b/pkg/scheduler/scheduler.go Tue Dec 11 22:59:10 2018 +0100 @@ -64,6 +64,58 @@ global.unregisterAction(name) } +// BoundAction is a complete set of infos for +// an action to be bound to a user, schedule and +// optional configuration id. +type BoundAction struct { + Name string + Spec string + User string + CfgID *int64 +} + +// BootActions setup the global scheduler with a set +// of bound actions delivered by the next function. +func BootActions(next func(*BoundAction) (bool, error)) error { + return global.bootActions(next) +} + +func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error { + + cr := cron.New() + + for { + var ba BoundAction + ok, err := next(&ba) + if err != nil { + return err + } + if !ok { + break + } + schedule, err := cron.Parse(ba.Spec) + if err != nil { + return err + } + job := &userAction{ + scheduler: s, + user: ba.User, + name: ba.Name, + cfgID: ba.CfgID, + } + cr.Schedule(schedule, job) + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.cr.Stop() + s.cr = cr + cr.Start() + + return nil +} + // BindAction binds a named action to a user, a cron spec and // an optional configuration id. func BindAction(name, spec, user string, cfgID *int64) error {