view pkg/controllers/importqueue.go @ 1189:3d50f558870c

REST GET call to /imports now has the ability to be filtered by kinds or states. Something like &states=queued,running,failed,pending,accepted&kinds=sr Note: The states are not the current states of the import queue, yet. The import queue will be adjusted soon.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 16 Nov 2018 12:08:46 +0100
parents a04126989d91
children 3afa71405b87
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"strings"

	"gemma.intevation.de/gemma/pkg/imports"
	"gemma.intevation.de/gemma/pkg/models"
	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"
)

const (
	selectImportsSQL = `
SELECT
  id,
  state::varchar,
  enqueued,
  kind,
  username
FROM waterway.imports
`

	selectHasImportSQL = `
SELECT true FROM Waterway.imports WHERE id = $1`

	selectHasNoRunningImportSQL = `
SELECT true FROM Waterway.imports
WHERE id = $1 AND state <> 'running'::waterway.import_state`

	selectImportLogsSQL = `
SELECT
  time,
  kind::varchar,
  msg
FROM waterway.import_logs
WHERE import_id = $1
ORDER BY time`

	deleteLogsSQL = `
DELETE FROM waterway.import_logs WHERE import_id = $1`

	deleteImportSQL = `
DELETE FROM waterway.imports WHERE id = $1`
)

func toTextArray(txt string, allowed []string) *pgtype.TextArray {
	parts := strings.Split(txt, ",")
	var accepted []string
	for _, part := range parts {
		if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 {
			continue
		}
		for _, a := range allowed {
			if part == a {
				accepted = append(accepted, part)
				break
			}
		}
	}
	if len(accepted) == 0 {
		return nil
	}
	var ta pgtype.TextArray
	if err := ta.Set(accepted); err != nil {
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &ta
}

func queryImportListStmt(
	conn *sql.Conn,
	ctx context.Context,
	vars map[string]string,
) (*sql.Rows, error) {

	var (
		stmt   strings.Builder
		args   []interface{}
		states *pgtype.TextArray
		kinds  *pgtype.TextArray
	)

	if st, found := vars["states"]; found {
		states = toTextArray(st, imports.ImportStateNames)
	}

	if ks, found := vars["kinds"]; found {
		kinds = toTextArray(ks, imports.ImportKindNames)
	}

	stmt.WriteString(selectImportsSQL)
	if states != nil || kinds != nil {
		stmt.WriteString(" WHERE ")
	}

	if states != nil {
		fmt.Fprintf(&stmt, " states = ANY($%d) ", len(args)+1)
		args = append(args, states)
	}

	if states != nil && kinds != nil {
		stmt.WriteString("AND")
	}

	if kinds != nil {
		fmt.Fprintf(&stmt, " kind = ANY($%d) ", len(args)+1)
		args = append(args, kinds)
	}

	stmt.WriteString(" ORDER BY enqueued DESC ")

	if lim, found := vars["limit"]; found {
		fmt.Fprintf(&stmt, " LIMIT $%d ", len(args)+1)
		limit, _ := strconv.ParseInt(lim, 10, 64)
		args = append(args, limit)
	}

	if ofs, found := vars["offset"]; found {
		fmt.Fprintf(&stmt, " OFFSET $%d ", len(args)+1)
		offset, _ := strconv.ParseInt(ofs, 10, 64)
		args = append(args, offset)
	}

	return conn.QueryContext(ctx, stmt.String(), args...)
}

func listImports(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	var rows *sql.Rows
	rows, err = queryImportListStmt(conn, req.Context(), mux.Vars(req))
	if err != nil {
		return
	}
	defer rows.Close()

	imports := make([]*models.Import, 0, 20)

	for rows.Next() {
		var it models.Import
		if err = rows.Scan(
			&it.ID,
			&it.State,
			&it.Enqueued,
			&it.Kind,
			&it.User,
		); err != nil {
			return
		}
		imports = append(imports, &it)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = JSONResult{
		Result: struct {
			Imports []*models.Import `json:"imports"`
		}{
			Imports: imports,
		},
	}
	return
}

func importLogs(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()

	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	// Check if he have such a import job first.
	var dummy bool
	err = conn.QueryRowContext(ctx, selectHasImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	// We have it -> generate log entries.
	var rows *sql.Rows
	rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id)
	if err != nil {
		return
	}
	defer rows.Close()

	entries := make([]*models.ImportLogEntry, 0, 10)

	for rows.Next() {
		var entry models.ImportLogEntry
		if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil {
			return
		}
		entries = append(entries, &entry)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = JSONResult{
		Result: struct {
			Entries []*models.ImportLogEntry `json:"entries"`
		}{
			Entries: entries,
		},
	}
	return
}

func deleteImport(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()
	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	var tx *sql.Tx
	tx, err = conn.BeginTx(ctx, nil)
	if err != nil {
		return
	}
	defer tx.Rollback()

	// Check if he have such a import job first.
	var dummy bool
	err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	jr = JSONResult{Code: http.StatusNoContent}

	return
}