view pkg/controllers/importqueue.go @ 3190:54a3e40cfbed

controllers: moved filter builder to a separate file.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 08 May 2019 10:50:14 +0200
parents 813309225e35
children eeff2cc4ff9d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/imports"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	warningSQLPrefix = `
WITH warned AS (
  SELECT distinct(import_id) AS id
  FROM import.import_logs
  WHERE kind = 'warn'::log_type
)`
	selectImportsCountSQL = warningSQLPrefix + `
SELECT count(*)
FROM import.imports
WHERE
`
	selectImportsSQL = warningSQLPrefix + `
SELECT
  imports.id AS id,
  state::varchar,
  enqueued,
  kind,
  username,
  signer,
  summary IS NOT NULL AS has_summary,
  imports.id IN (SELECT id FROM warned) AS has_warnings
FROM import.imports
WHERE
`
	selectBeforeSQL = warningSQLPrefix + `
SELECT enqueued FROM import.imports
WHERE
`
	selectAfterSQL = warningSQLPrefix + `
SELECT enqueued FROM import.imports
WHERE
`
	selectImportSummarySQL = `
SELECT summary, enqueued FROM import.imports WHERE id = $1`

	selectHasNoRunningImportSQL = `
SELECT true FROM import.imports
WHERE id = $1 AND state <> 'running'::import_state`

	selectImportLogsSQL = `
SELECT
  time,
  kind::varchar,
  msg
FROM import.import_logs
WHERE import_id = $1
ORDER BY time`

	deleteLogsSQL = `
DELETE FROM import.import_logs WHERE import_id = $1`

	deleteImportSQL = `
DELETE FROM import.imports WHERE id = $1`
)

func toInt8Array(txt string) *pgtype.Int8Array {
	parts := strings.Split(txt, ",")
	var ints []int64
	for _, part := range parts {
		part = strings.TrimSpace(part)
		v, err := strconv.ParseInt(part, 10, 64)
		if err != nil {
			continue
		}
		ints = append(ints, v)
	}
	var ia pgtype.Int8Array
	if err := ia.Set(ints); err != nil {
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &ia
}

func toTextArray(txt string, allowed []string) *pgtype.TextArray {
	parts := strings.Split(txt, ",")
	var accepted []string
	for _, part := range parts {
		if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 {
			continue
		}
		for _, a := range allowed {
			if part == a {
				accepted = append(accepted, part)
				break
			}
		}
	}
	if len(accepted) == 0 {
		return nil
	}
	var ta pgtype.TextArray
	if err := ta.Set(accepted); err != nil {
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &ta
}

func buildFilters(req *http.Request) (l, b, a *filterBuilder, err error) {

	l = new(filterBuilder)
	a = new(filterBuilder)
	b = new(filterBuilder)

	var noBefore, noAfter bool

	var counting bool

	switch count := strings.ToLower(req.FormValue("count")); count {
	case "1", "t", "true":
		counting = true
		l.stmt.WriteString(selectImportsCountSQL)
	default:
		l.stmt.WriteString(selectImportsSQL)
	}
	a.stmt.WriteString(selectAfterSQL)
	b.stmt.WriteString(selectBeforeSQL)

	cond := func(format string, v ...interface{}) {
		l.and(format, v...)
		a.and(format, v...)
		b.and(format, v...)
	}

	if query := req.FormValue("query"); query != "" {
		query = "%" + query + "%"
		cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+
			`id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `,
			query, query, query, query)
	}

	if st := req.FormValue("states"); st != "" {
		states := toTextArray(st, imports.ImportStateNames)
		cond(" state = ANY($%d) ", states)
	}

	if ks := req.FormValue("kinds"); ks != "" {
		kinds := toTextArray(ks, imports.ImportKindNames())
		cond(" kind = ANY($%d) ", kinds)
	}

	if idss := req.FormValue("ids"); idss != "" {
		ids := toInt8Array(idss)
		cond(" id = ANY($%d) ", ids)
	}

	if from := req.FormValue("from"); from != "" {
		var fromTime time.Time
		if fromTime, err = time.Parse(models.ImportTimeFormat, from); err != nil {
			return
		}
		l.and(" enqueued >= $%d ", fromTime)
		b.and(" enqueued < $%d", fromTime)
	} else {
		noBefore = true
	}

	if to := req.FormValue("to"); to != "" {
		var toTime time.Time
		if toTime, err = time.Parse(models.ImportTimeFormat, to); err != nil {
			return
		}
		l.and(" enqueued <= $%d ", toTime)
		a.and(" enqueued > $%d", toTime)
	} else {
		noAfter = true
	}

	switch warn := strings.ToLower(req.FormValue("warnings")); warn {
	case "1", "t", "true":
		cond(" id IN (SELECT id FROM warned) ")
	}

	if !l.hasCond {
		l.stmt.WriteString(" TRUE ")
	}
	if !b.hasCond {
		b.stmt.WriteString(" TRUE ")
	}
	if !a.hasCond {
		a.stmt.WriteString(" TRUE ")
	}

	if !counting {
		l.stmt.WriteString(" ORDER BY enqueued DESC ")
		a.stmt.WriteString(" ORDER BY enqueued LIMIT 1")
		b.stmt.WriteString(" ORDER BY enqueued DESC LIMIT 1")
	}

	if noBefore {
		b = nil
	}
	if noAfter {
		a = nil
	}
	return
}

func neighbored(ctx context.Context, conn *sql.Conn, fb *filterBuilder) *models.ImportTime {

	var when time.Time
	err := conn.QueryRowContext(ctx, fb.stmt.String(), fb.args...).Scan(&when)
	switch {
	case err == sql.ErrNoRows:
		return nil
	case err != nil:
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &models.ImportTime{Time: when}
}

func listImports(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	var list, before, after *filterBuilder

	if list, before, after, err = buildFilters(req); err != nil {
		return
	}

	ctx := req.Context()

	// Fast path for counting

	switch count := strings.ToLower(req.FormValue("count")); count {
	case "1", "t", "true":
		var count int64
		err = conn.QueryRowContext(ctx, list.stmt.String(), list.args...).Scan(&count)
		switch {
		case err == sql.ErrNoRows:
			count, err = 0, nil
		case err != nil:
			return
		}
		jr = JSONResult{Result: count}
		return
	}

	// Generate the list

	var rows *sql.Rows
	if rows, err = conn.QueryContext(ctx, list.stmt.String(), list.args...); err != nil {
		return
	}
	defer rows.Close()

	imports := make([]*models.Import, 0, 20)

	var signer sql.NullString

	for rows.Next() {
		var it models.Import
		var enqueued time.Time
		if err = rows.Scan(
			&it.ID,
			&it.State,
			&enqueued,
			&it.Kind,
			&it.User,
			&signer,
			&it.Summary,
			&it.Warnings,
		); err != nil {
			return
		}
		if signer.Valid {
			it.Signer = signer.String
		}
		it.Enqueued = models.ImportTime{Time: enqueued}
		imports = append(imports, &it)
	}

	if err = rows.Err(); err != nil {
		return
	}

	var prev, next *models.ImportTime

	if before != nil {
		prev = neighbored(ctx, conn, before)
	}

	if after != nil {
		next = neighbored(ctx, conn, after)
	}

	jr = JSONResult{
		Result: struct {
			Prev    *models.ImportTime `json:"prev,omitempty"`
			Next    *models.ImportTime `json:"next,omitempty"`
			Imports []*models.Import   `json:"imports"`
		}{
			Imports: imports,
			Prev:    prev,
			Next:    next,
		},
	}
	return
}

func importLogs(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()

	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	// Check if he have such a import job first.
	var summary sql.NullString
	var enqueued time.Time
	err = conn.QueryRowContext(ctx, selectImportSummarySQL, id).Scan(
		&summary,
		&enqueued,
	)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	var sum interface{}
	if summary.Valid {
		if err = json.NewDecoder(
			strings.NewReader(summary.String)).Decode(&sum); err != nil {
			return
		}
	}

	// We have it -> generate log entries.
	var rows *sql.Rows
	rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id)
	if err != nil {
		return
	}
	defer rows.Close()

	entries := make([]*models.ImportLogEntry, 0, 10)

	for rows.Next() {
		var entry models.ImportLogEntry
		if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil {
			return
		}
		entries = append(entries, &entry)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = JSONResult{
		Result: struct {
			Enqueued models.ImportTime        `json:"enqueued"`
			Summary  interface{}              `json:"summary,omitempty"`
			Entries  []*models.ImportLogEntry `json:"entries"`
		}{
			Enqueued: models.ImportTime{Time: enqueued},
			Summary:  sum,
			Entries:  entries,
		},
	}
	return
}

func deleteImport(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()
	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	var tx *sql.Tx
	tx, err = conn.BeginTx(ctx, nil)
	if err != nil {
		return
	}
	defer tx.Rollback()

	// Check if he have such a import job first.
	var dummy bool
	err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	jr = JSONResult{Code: http.StatusNoContent}

	return
}

const (
	isPendingSQL = `
SELECT state = 'pending'::import_state, kind
FROM import.imports
WHERE id = $1`

	reviewSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  signer = $2
WHERE id = $3`

	deleteImportDataSQL = `SELECT import.del_import($1)`

	deleteImportTrackSQL = `
DELETE FROM import.track_imports WHERE import_id = $1`

	logDecisionSQL = `
INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
)

func reviewImports(
	reviews interface{},
	req *http.Request,
	conn *sql.Conn,
) (JSONResult, error) {

	rs := *reviews.(*[]models.Review)

	type reviewResult struct {
		ID      int64  `json:"id"`
		Message string `json:"message,omitempty"`
		Error   string `json:"error,omitempty"`
	}

	results := make([]reviewResult, len(rs))

	for i := range rs {
		rev := &rs[i]
		msg, err := decideImport(req, conn, rev.ID, string(rev.State))
		var errString string
		if err != nil {
			errString = err.Error()
		}
		results[i] = reviewResult{
			ID:      rev.ID,
			Message: msg,
			Error:   errString,
		}
	}

	return JSONResult{Result: results}, nil
}

func reviewImport(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	vars := mux.Vars(req)
	id, _ := strconv.ParseInt(vars["id"], 10, 64)
	state := vars["state"]

	var msg string
	if msg, err = decideImport(req, conn, id, state); err != nil {
		return
	}

	result := struct {
		Message string `json:"message"`
	}{
		Message: msg,
	}

	jr = JSONResult{Result: &result}
	return
}

func decideImport(
	req *http.Request,
	conn *sql.Conn,
	id int64,
	state string,
) (message string, err error) {
	ctx := req.Context()
	var tx *sql.Tx
	if tx, err = conn.BeginTx(ctx, nil); err != nil {
		return
	}
	defer tx.Rollback()

	var pending bool
	var kind string

	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("cannot find import #%d", id),
		}
		return
	case err != nil:
		return
	case !pending:
		err = JSONError{
			Code:    http.StatusConflict,
			Message: fmt.Sprintf("import #%d is not pending", id),
		}
		return
	}

	if state == "accepted" {
		if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
			if err = jc.StageDone(ctx, tx, id); err != nil {
				return
			}
		}

	} else {
		if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
			return
		}
	}

	// Remove the import track
	if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
		return
	}

	// Log the decision and set the final state.
	session, _ := auth.GetSession(req)
	who := session.User

	if _, err = tx.ExecContext(ctx, logDecisionSQL, id,
		fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	message = fmt.Sprintf(
		"Import #%d successfully changed to state '%s'.", id, state)

	return
}