changeset 1557:62171cd9a42b

Import scheduler: Start scheduler a gemma boot time with configurations from database which have a schedule.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 11 Dec 2018 22:59:10 +0100
parents 6869eb94ead2
children 0ded4c56978e
files pkg/scheduler/boot.go pkg/scheduler/scheduler.go
diffstat 2 files changed, 129 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/scheduler/boot.go	Tue Dec 11 22:59:10 2018 +0100
@@ -0,0 +1,77 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package scheduler
+
+import (
+	"context"
+	"database/sql"
+	"log"
+
+	"gemma.intevation.de/gemma/pkg/auth"
+	"gemma.intevation.de/gemma/pkg/config"
+)
+
+const (
+	bootRole = "sys_admin"
+
+	selectImportConfSQL = `
+SELECT id, username, kind, cron
+FROM waterway.import_configuration
+WHERE cron IS NOT NULL`
+)
+
+func init() { go boot() }
+
+// boot starts the scheduler with the configurations from
+// the database which have a schedule.
+func boot() {
+	config.WaitReady()
+	log.Println("info: booting scheduler from database.")
+	ctx := context.Background()
+	err := auth.RunAs(
+		ctx, bootRole,
+		func(conn *sql.Conn) error {
+			rows, err := conn.QueryContext(ctx, selectImportConfSQL)
+			if err != nil {
+				return err
+			}
+			defer rows.Close()
+			err = BootActions(func(ba *BoundAction) (bool, error) {
+				if err != nil {
+					return false, err
+				}
+				if !rows.Next() {
+					return false, nil
+				}
+				var id int64
+				if err = rows.Scan(
+					&id,
+					&ba.User,
+					&ba.Name,
+					&ba.Spec,
+				); err != nil {
+					return false, err
+				}
+				ba.CfgID = &id
+				return true, nil
+			})
+			if err != nil {
+				return err
+			}
+			return rows.Err()
+		})
+	if err != nil {
+		log.Printf("error: %v\n", err)
+	}
+}
--- a/pkg/scheduler/scheduler.go	Tue Dec 11 20:47:11 2018 +0100
+++ b/pkg/scheduler/scheduler.go	Tue Dec 11 22:59:10 2018 +0100
@@ -64,6 +64,58 @@
 	global.unregisterAction(name)
 }
 
+// BoundAction is a complete set of infos for
+// an action to be bound to a user, schedule and
+// optional configuration id.
+type BoundAction struct {
+	Name  string
+	Spec  string
+	User  string
+	CfgID *int64
+}
+
+// BootActions setup the global scheduler with a set
+// of bound actions delivered by the next function.
+func BootActions(next func(*BoundAction) (bool, error)) error {
+	return global.bootActions(next)
+}
+
+func (s *scheduler) bootActions(next func(*BoundAction) (bool, error)) error {
+
+	cr := cron.New()
+
+	for {
+		var ba BoundAction
+		ok, err := next(&ba)
+		if err != nil {
+			return err
+		}
+		if !ok {
+			break
+		}
+		schedule, err := cron.Parse(ba.Spec)
+		if err != nil {
+			return err
+		}
+		job := &userAction{
+			scheduler: s,
+			user:      ba.User,
+			name:      ba.Name,
+			cfgID:     ba.CfgID,
+		}
+		cr.Schedule(schedule, job)
+	}
+
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.cr.Stop()
+	s.cr = cr
+	cr.Start()
+
+	return nil
+}
+
 // BindAction binds a named action to a user, a cron spec and
 // an optional configuration id.
 func BindAction(name, spec, user string, cfgID *int64) error {