view pkg/controllers/importconfig.go @ 2045:10a1e139d2e8 unify_imports

Imports: Re-enabled running configured imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 25 Jan 2019 18:01:35 +0100
parents d29ac997eb34
children 725884a4c89a
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"database/sql"
	"net/http"
	"strconv"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/imports"
)

const (
	selectImportConfigurationPrefix = `
SELECT
  id,
  username,
  kind,
  send_email,
  cron,
  url
FROM import.import_configuration`

	selectImportConfigurationSQL = selectImportConfigurationPrefix + `
ORDER by id`

	selectImportConfigurationIDSQL = selectImportConfigurationPrefix + `
WHERE id = $1`

	insertImportConfigurationSQL = `
INSERT INTO import.import_configuration
(username, kind, cron, send_email)
VALUES ($1, $2, $3, $4, $5)
RETURNING id`

	insertImportConfigurationAttributeSQL = `
INSERT INTO import.import_configuration_attributes
(import_configuration_id, k, v)
VALUES ($1, $2, $3)`

	hasImportConfigurationSQL = `
SELECT true FROM import.import_configuration
WHERE id = $1`

	deleteImportConfiguationAttributesSQL = `
DELETE FROM import.import_configuration_attributes
WHERE import_configuration_id = $1`

	deleteImportConfiguationSQL = `
DELETE FROM import.import_configuration
WHERE id = $1`

	updateImportConfigurationSQL = `
UPDATE import.import_configuration SET
  username = $2,
  kind = $3,
  cron = $4,
  send_email = $5
WHERE id = $1
`
)

func runImportConfig(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	ctx := req.Context()

	var jobID int64
	if jobID, err = imports.RunConfiguredImportContext(ctx, conn, id); err != nil {
		return
	}

	var result = struct {
		ID int64 `json:"id"`
	}{
		ID: jobID,
	}

	jr = JSONResult{
		Code:   http.StatusCreated,
		Result: &result,
	}
	return
}

func modifyImportConfig(
	input interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	/*

		ctx := req.Context()

		importConfig := input.(*imports.Config)

		id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

		var tx *sql.Tx

		if tx, err = conn.BeginTx(ctx, nil); err != nil {
			return
		}
		defer tx.Rollback()

		var (
			entry imports.IDConfig
			kind  string
			dummy sql.NullString
			url   sql.NullString
		)

		err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan(
			&entry.ID,
			&entry.User,
			&kind,
			&entry.SendEMail,
			&dummy,
			&url,
		)

		switch {
		case err == sql.ErrNoRows:
			err = JSONError{
				Code:    http.StatusNotFound,
				Message: fmt.Sprintf("No schedule %d found", id),
			}
			return
		case err != nil:
			return
		}

		session, _ := auth.GetSession(req)

		entry.SendEMail = importConfig.SendEMail

		// We always take the cron spec from the input.
		// If there is no spec remove schedule.
		var cron sql.NullString
		if importConfig.Cron != nil {
			cron = sql.NullString{String: string(*importConfig.Cron), Valid: true}
		}

		if importConfig.URL != nil {
			url = sql.NullString{String: *importConfig.URL, Valid: true}
		}

		if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL,
			id,
			session.User,
			string(importConfig.Kind),
			cron,
			url,
			importConfig.SendEMail,
		); err != nil {
			return
		}

		if importConfig.Attributes != nil {
			if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil {
				return
			}
			if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil {
				return
			}
		}

		scheduler.UnbindByID(id)

		if cron.Valid {
			if err = scheduler.BindAction(
				string(importConfig.Kind),
				cron.String,
				id,
			); err != nil {
				return
			}
		}

		if err = tx.Commit(); err != nil {
			return
		}

		var result = struct {
			ID int64 `json:"id"`
		}{
			ID: id,
		}

		jr = JSONResult{Result: &result}
	*/
	return
}

func infoImportConfig(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	/*

		ctx := req.Context()

		id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

		var entry *imports.IDConfig

		entry, err = imports.LoadIDConfigContext(ctx, conn, id)
		switch {
		case err != nil:
			return
		case entry == nil:
			err = JSONError{
				Code:    http.StatusNotFound,
				Message: fmt.Sprintf("No schedule %d found", id),
			}
			return
		}

		jr = JSONResult{Result: &entry}
	*/
	return
}

func deleteImportConfig(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	/*

		ctx := req.Context()

		id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

		var tx *sql.Tx
		if tx, err = conn.BeginTx(ctx, nil); err != nil {
			return
		}
		defer tx.Rollback()

		var found bool
		err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found)
		switch {
		case err == sql.ErrNoRows:
			err = JSONError{
				Code:    http.StatusNotFound,
				Message: fmt.Sprintf("No schedule %d found", id),
			}
			return
		case err != nil:
			return
		case !found:
			err = errors.New("Unexpected result")
			return
		}

		if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil {
			return
		}

		if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil {
			return
		}

		// Remove from running scheduler.
		scheduler.UnbindByID(id)

		if err = tx.Commit(); err != nil {
			return
		}

		var result = struct {
			ID int64 `json:"id"`
		}{
			ID: id,
		}

		jr = JSONResult{Result: &result}

	*/
	return
}

/*

func storeConfigAttributes(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	attrs common.Attributes,
) error {
	if len(attrs) == 0 {
		return nil
	}
	attrStmt, err := tx.PrepareContext(ctx, insertImportConfigurationAttributeSQL)
	if err != nil {
		return err
	}
	defer attrStmt.Close()
	// Sort to make it deterministic
	keys := make([]string, len(attrs))
	i := 0
	for key := range attrs {
		keys[i] = key
		i++
	}
	sort.Strings(keys)
	for _, key := range keys {
		if _, err := attrStmt.ExecContext(ctx, id, key, attrs[key]); err != nil {
			return err
		}
	}
	return nil
}

*/

func addImportConfig(
	input interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	/*

		importConfig := input.(*imports.Config)

		session, _ := auth.GetSession(req)

		var cron, url sql.NullString

		if importConfig.Cron != nil {
			cron = sql.NullString{String: string(*importConfig.Cron), Valid: true}
		}
		if importConfig.URL != nil {
			url = sql.NullString{String: *importConfig.URL, Valid: true}
		}

		ctx := req.Context()

		var tx *sql.Tx

		if tx, err = conn.BeginTx(ctx, nil); err != nil {
			return
		}
		defer tx.Rollback()

		var id int64
		if err = tx.QueryRowContext(
			ctx,
			insertImportConfigurationSQL,
			session.User,
			string(importConfig.Kind),
			cron,
			importConfig.SendEMail,
			url,
		).Scan(&id); err != nil {
			return
		}

		// Store extra attributes
		if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil {
			return
		}

		// Need to start a scheduler job right away?
		if importConfig.Cron != nil {
			if err = scheduler.BindAction(
				string(importConfig.Kind),
				string(*importConfig.Cron),
				id,
			); err != nil {
				return
			}
		}

		if err = tx.Commit(); err != nil {
			scheduler.UnbindByID(id)
			return
		}

		var result = struct {
			ID int64 `json:"id"`
		}{
			ID: id,
		}

		jr = JSONResult{
			Code:   http.StatusCreated,
			Result: &result,
		}
	*/
	return
}

func listImportConfigs(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	/*

		ctx := req.Context()
		var rows *sql.Rows

		if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil {
			return
		}
		defer rows.Close()

		list := []*imports.IDConfig{}

		for rows.Next() {
			var (
				entry imports.IDConfig
				kind  string
				cron  sql.NullString
				url   sql.NullString
			)
			if err = rows.Scan(
				&entry.ID,
				&entry.User,
				&kind,
				&entry.SendEMail,
				&cron,
				&url,
			); err != nil {
				return
			}
			entry.Kind = imports.ImportKind(kind)
			if cron.Valid {
				cs := models.CronSpec(cron.String)
				entry.Cron = &cs
			}
			if url.Valid {
				entry.URL = &url.String
			}
			list = append(list, &entry)
		}

		if err = rows.Err(); err != nil {
			return
		}

		jr = JSONResult{Result: list}
	*/
	return
}