view pkg/imports/scheduled.go @ 1856:5996b50d154a

Fairway dimension import: Added missing scheduled configuration.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 17 Jan 2019 11:58:46 +0100
parents bbd653a43a6a
children 427f86518097
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/scheduler"
)

// JobKindSetups maps JobKinds to special setup functions.
var JobKindSetups = map[JobKind]func(*IDConfig) (interface{}, error){

	GMJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'gm' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &GaugeMeasurement{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	},

	FAJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'fa' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &FairwayAvailability{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	},

	BNJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'bn' import")
		insecure := cfg.Attributes.Bool("insecure")
		return &Bottleneck{
			URL:      *cfg.URL,
			Insecure: insecure,
		}, nil
	},

	WXJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'wx' import")
		ft, found := cfg.Attributes.Get("feature-type")
		if !found {
			return nil, errors.New("cannot find 'feature-type' attribute")
		}
		sb, found := cfg.Attributes.Get("sort-by")
		if !found {
			return nil, errors.New("cannot find 'sort-by' attribute")
		}
		return &WaterwayAxis{
			URL:         *cfg.URL,
			FeatureType: ft,
			SortBy:      sb,
		}, nil
	},

	WAJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'wa' import")
		ft, found := cfg.Attributes.Get("feature-type")
		if !found {
			return nil, errors.New("cannot find 'feature-type' attribute")
		}
		sb, found := cfg.Attributes.Get("sort-by")
		if !found {
			return nil, errors.New("cannot find 'sort-by' attribute")
		}
		return &WaterwayArea{
			URL:         *cfg.URL,
			FeatureType: ft,
			SortBy:      sb,
		}, nil
	},

	FDJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'fd' import")
		ft, found := cfg.Attributes.Get("feature-type")
		if !found {
			return nil, errors.New("cannot find 'feature-type' attribute")
		}
		sb, found := cfg.Attributes.Get("sort-by")
		if !found {
			return nil, errors.New("cannot find 'sort-by' attribute")
		}
		los, found := cfg.Attributes.Int("los")
		if !found {
			return nil, errors.New("cannot find 'los' attribute")
		}
		minWidth, found := cfg.Attributes.Int("min-width")
		if !found {
			return nil, errors.New("cannot find 'min-width' attribute")
		}
		maxWidth, found := cfg.Attributes.Int("max-width")
		if !found {
			return nil, errors.New("cannot find 'max-width' attribute")
		}
		depth, found := cfg.Attributes.Int("depth")
		if !found {
			return nil, errors.New("cannot find 'depth' attribute")
		}
		sourceOrganization, found := cfg.Attributes.Get("source-organization")
		if !found {
			return nil, errors.New("cannot find 'source-organization' attribute")
		}

		return &FairwayDimension{
			URL:                *cfg.URL,
			FeatureType:        ft,
			SortBy:             sb,
			LOS:                los,
			MinWidth:           minWidth,
			MaxWidth:           maxWidth,
			Depth:              depth,
			SourceOrganization: sourceOrganization,
		}, nil
	},

	WGJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'wg' import")
		username, _ := cfg.Attributes.Get("username")
		password, _ := cfg.Attributes.Get("password")
		insecure := cfg.Attributes.Bool("insecure")
		return &WaterwayGauge{
			URL:      *cfg.URL,
			Username: username,
			Password: password,
			Insecure: insecure,
		}, nil
	},

	DMVJobKind: func(cfg *IDConfig) (interface{}, error) {
		log.Println("info: schedule 'dvm' import")
		username, _ := cfg.Attributes.Get("username")
		password, _ := cfg.Attributes.Get("password")
		insecure := cfg.Attributes.Bool("insecure")
		return &DistanceMarksVirtual{
			URL:      *cfg.URL,
			Username: username,
			Password: password,
			Insecure: insecure,
		}, nil
	},
}

func init() {
	run := func(cfgID int64) {
		jobID, err := RunConfiguredImport(cfgID)
		if err != nil {
			log.Printf("error: running scheduled import failed: %v\n", err)
			return
		}
		log.Printf("info: added import #%d to queue\n", jobID)
	}

	for kind := range JobKindSetups {
		scheduler.RegisterAction(string(kind), run)
	}
}

// RunConfiguredImportContext runs an import configured from the database.
func RunConfiguredImportContext(ctx context.Context, conn *sql.Conn, id int64) (int64, error) {
	cfg, err := LoadIDConfigContext(ctx, conn, id)
	return runConfiguredImport(id, cfg, err)
}

// RunConfiguredImport runs an import configured from the database.
func RunConfiguredImport(id int64) (int64, error) {
	cfg, err := loadIDConfig(id)
	return runConfiguredImport(id, cfg, err)
}

func runConfiguredImport(id int64, cfg *IDConfig, err error) (int64, error) {

	if err != nil {
		return 0, err
	}
	if cfg == nil {
		return 0, fmt.Errorf("no config found for id %d.\n", id)
	}
	if cfg.URL == nil {
		return 0, errors.New("error: No URL specified")
	}

	kind := JobKind(cfg.Kind)

	setup := JobKindSetups[kind]
	if setup == nil {
		return 0, fmt.Errorf("unknown job kind: %s", cfg.Kind)
	}

	what, err := setup(cfg)
	if err != nil {
		return 0, err
	}

	var serialized string
	if serialized, err = common.ToJSONString(what); err != nil {
		return 0, err
	}

	due, _ := cfg.Attributes.Time("due")

	retries, found := cfg.Attributes.Int("retries")
	if !found {
		retries = -1
	}

	var jobID int64
	if jobID, err = AddJob(
		kind,
		due, retries,
		cfg.User,
		cfg.SendEMail,
		serialized,
	); err != nil {
		return 0, err
	}

	return jobID, nil
}