view pkg/imports/scheduled.go @ 1708:49e047c2106e

Imports: Made imports re-runnable if they fail.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 08 Jan 2019 13:35:44 +0100
parents dcbe2a7dc532
children 74f7d4c531bc
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"errors"
	"log"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/scheduler"
)

func init() {
	registerAction(GMJobKind, func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'gm' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &GaugeMeasurement{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	})
	registerAction(FAJobKind, func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'fa' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &FairwayAvailability{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	})
	registerAction(BNJobKind, func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'bn' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &Bottleneck{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	})
	registerAction(WXJobKind, func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'wx' import")

		ft, found := cfg.Attributes.Get("feature-type")
		if !found {
			return nil, errors.New("cannot find 'feature-type' attribute")
		}
		sb, found := cfg.Attributes.Get("sort-by")
		if !found {
			return nil, errors.New("cannot find 'sort-by' attribute")
		}
		return &WaterwayAxis{
			URL:         *cfg.URL,
			FeatureType: ft,
			SortBy:      sb,
		}, nil
	})
}

func registerAction(kind JobKind, setup func(cfg *IDConfig) (interface{}, error)) {

	action := func(id int64) {
		cfg, err := loadIDConfig(id)
		if err != nil {
			log.Printf("error: %v\n", err)
			return
		}
		if cfg == nil {
			log.Printf("error: No config found for id %d.\n", id)
			return
		}
		if cfg.URL == nil {
			log.Println("error: No URL specified")
			return
		}

		what, err := setup(cfg)
		if err != nil {
			log.Printf("error: setup failed %v.\n", err)
			return
		}

		var serialized string
		if serialized, err = common.ToJSONString(what); err != nil {
			log.Printf("error: %v\n", err)
			return
		}

		due, _ := cfg.Attributes.Time("due")

		retries, found := cfg.Attributes.Int("retries")
		if !found {
			retries = -1
		}

		var jobID int64
		if jobID, err = AddJob(
			kind,
			due, retries,
			cfg.User,
			cfg.SendEMail, cfg.AutoAccept,
			serialized,
		); err != nil {
			log.Printf("error: %v\n", err)
			return
		}

		log.Printf("info: added import #%d to queue\n", jobID)
	}

	scheduler.RegisterAction(string(kind), action)
}