view pkg/controllers/importqueue.go @ 2549:9bf6b767a56a

client: refactored and improved splitscreen for diagrams To make different diagrams possible, the splitscreen view needed to be decoupled from the cross profiles. Also the style has changed to make it more consistent with the rest of the app. The standard box header is now used and there are collapse and expand animations.
author Markus Kottlaender <markus@intevation.de>
date Fri, 08 Mar 2019 08:50:47 +0100
parents 20d9b71f4125
children 3a242e6aa56d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"strings"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/imports"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	selectImportsSQL = `
SELECT
  id,
  state::varchar,
  enqueued,
  kind,
  username,
  signer,
  summary,
  EXISTS (
    SELECT true FROM import.import_logs
      WHERE kind = 'warn'::log_type AND
            import_id = import.imports.id
  ) AS has_warnings
FROM import.imports
`

	selectHasImportSQL = `
SELECT true FROM import.imports WHERE id = $1`

	selectHasNoRunningImportSQL = `
SELECT true FROM import.imports
WHERE id = $1 AND state <> 'running'::import_state`

	selectImportLogsSQL = `
SELECT
  time,
  kind::varchar,
  msg
FROM import.import_logs
WHERE import_id = $1
ORDER BY time`

	deleteLogsSQL = `
DELETE FROM import.import_logs WHERE import_id = $1`

	deleteImportSQL = `
DELETE FROM import.imports WHERE id = $1`
)

func toInt8Array(txt string) *pgtype.Int8Array {
	parts := strings.Split(txt, ",")
	var ints []int64
	for _, part := range parts {
		part = strings.TrimSpace(part)
		v, err := strconv.ParseInt(part, 10, 64)
		if err != nil {
			continue
		}
		ints = append(ints, v)
	}
	var ia pgtype.Int8Array
	if err := ia.Set(ints); err != nil {
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &ia
}

func toTextArray(txt string, allowed []string) *pgtype.TextArray {
	parts := strings.Split(txt, ",")
	var accepted []string
	for _, part := range parts {
		if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 {
			continue
		}
		for _, a := range allowed {
			if part == a {
				accepted = append(accepted, part)
				break
			}
		}
	}
	if len(accepted) == 0 {
		return nil
	}
	var ta pgtype.TextArray
	if err := ta.Set(accepted); err != nil {
		log.Printf("warn: %v\n", err)
		return nil
	}
	return &ta
}

func queryImportListStmt(
	conn *sql.Conn,
	req *http.Request,
) (*sql.Rows, error) {

	var (
		stmt   strings.Builder
		args   []interface{}
		states *pgtype.TextArray
		kinds  *pgtype.TextArray
		ids    *pgtype.Int8Array
	)

	arg := func(format string, v interface{}) {
		fmt.Fprintf(&stmt, format, len(args)+1)
		args = append(args, v)
	}

	if st := req.FormValue("states"); st != "" {
		states = toTextArray(st, imports.ImportStateNames)
	}

	if ks := req.FormValue("kinds"); ks != "" {
		kinds = toTextArray(ks, imports.ImportKindNames())
	}

	if idss := req.FormValue("ids"); idss != "" {
		ids = toInt8Array(idss)
	}

	stmt.WriteString(selectImportsSQL)
	if states != nil || kinds != nil || ids != nil {
		stmt.WriteString(" WHERE ")
	}

	if states != nil {
		arg(" state = ANY($%d) ", states)
	}

	if states != nil && (kinds != nil || ids != nil) {
		stmt.WriteString("AND")
	}

	if kinds != nil {
		arg(" kind = ANY($%d) ", kinds)
	}

	if (states != nil || kinds != nil) && ids != nil {
		stmt.WriteString("AND")
	}

	if ids != nil {
		arg(" id = ANY($%d) ", ids)
	}

	stmt.WriteString(" ORDER BY enqueued DESC ")

	if lim := req.FormValue("limit"); lim != "" {
		limit, err := strconv.ParseInt(lim, 10, 64)
		if err != nil {
			return nil, err
		}
		arg(" LIMIT $%d ", limit)
	}

	if ofs := req.FormValue("offset"); ofs != "" {
		offset, err := strconv.ParseInt(ofs, 10, 64)
		if err != nil {
			return nil, err
		}
		arg(" OFFSET $%d ", offset)
	}

	return conn.QueryContext(req.Context(), stmt.String(), args...)
}

func listImports(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	var rows *sql.Rows
	rows, err = queryImportListStmt(conn, req)
	if err != nil {
		return
	}
	defer rows.Close()

	imports := make([]*models.Import, 0, 20)

	var signer, summary sql.NullString

	for rows.Next() {
		var it models.Import
		if err = rows.Scan(
			&it.ID,
			&it.State,
			&it.Enqueued,
			&it.Kind,
			&it.User,
			&signer,
			&summary,
			&it.Warnings,
		); err != nil {
			return
		}
		if signer.Valid {
			it.Signer = signer.String
		}
		if summary.Valid {
			if err = json.NewDecoder(
				strings.NewReader(summary.String)).Decode(&it.Summary); err != nil {
				return
			}
		}
		imports = append(imports, &it)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = JSONResult{
		Result: struct {
			Imports []*models.Import `json:"imports"`
		}{
			Imports: imports,
		},
	}
	return
}

func importLogs(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()

	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	// Check if he have such a import job first.
	var dummy bool
	err = conn.QueryRowContext(ctx, selectHasImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	// We have it -> generate log entries.
	var rows *sql.Rows
	rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id)
	if err != nil {
		return
	}
	defer rows.Close()

	entries := make([]*models.ImportLogEntry, 0, 10)

	for rows.Next() {
		var entry models.ImportLogEntry
		if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil {
			return
		}
		entries = append(entries, &entry)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = JSONResult{
		Result: struct {
			Entries []*models.ImportLogEntry `json:"entries"`
		}{
			Entries: entries,
		},
	}
	return
}

func deleteImport(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	ctx := req.Context()
	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	var tx *sql.Tx
	tx, err = conn.BeginTx(ctx, nil)
	if err != nil {
		return
	}
	defer tx.Rollback()

	// Check if he have such a import job first.
	var dummy bool
	err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	jr = JSONResult{Code: http.StatusNoContent}

	return
}

const (
	isPendingSQL = `
SELECT state = 'pending'::import_state, kind
FROM import.imports
WHERE id = $1`

	reviewSQL = `
UPDATE import.imports SET
  state = $1::import_state,
  signer = $2
WHERE id = $3`

	deleteImportDataSQL = `SELECT import.del_import($1)`

	deleteImportTrackSQL = `
DELETE FROM import.track_imports WHERE import_id = $1`

	logDecisionSQL = `
INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
)

func reviewImports(
	reviews interface{},
	req *http.Request,
	conn *sql.Conn,
) (JSONResult, error) {

	rs := *reviews.(*[]models.Review)

	type reviewResult struct {
		ID      int64  `json:"id"`
		Message string `json:"message,omitempty"`
		Error   string `json:"error,omitempty"`
	}

	results := make([]reviewResult, len(rs))

	for i := range rs {
		rev := &rs[i]
		msg, err := decideImport(req, conn, rev.ID, string(rev.State))
		var errString string
		if err != nil {
			errString = err.Error()
		}
		results[i] = reviewResult{
			ID:      rev.ID,
			Message: msg,
			Error:   errString,
		}
	}

	return JSONResult{Result: results}, nil
}

func reviewImport(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {

	vars := mux.Vars(req)
	id, _ := strconv.ParseInt(vars["id"], 10, 64)
	state := vars["state"]

	var msg string
	if msg, err = decideImport(req, conn, id, state); err != nil {
		return
	}

	result := struct {
		Message string `json:"message"`
	}{
		Message: msg,
	}

	jr = JSONResult{Result: &result}
	return
}

func decideImport(
	req *http.Request,
	conn *sql.Conn,
	id int64,
	state string,
) (message string, err error) {
	ctx := req.Context()
	var tx *sql.Tx
	if tx, err = conn.BeginTx(ctx, nil); err != nil {
		return
	}
	defer tx.Rollback()

	var pending bool
	var kind string

	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
	switch {
	case err == sql.ErrNoRows:
		err = fmt.Errorf("cannot find import #%d", id)
		return
	case err != nil:
		return
	case !pending:
		err = fmt.Errorf("import %d is not pending", id)
		return
	}

	if state == "accepted" {
		if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
			if err = jc.StageDone(ctx, tx, id); err != nil {
				return
			}
		}

	} else {
		if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
			return
		}
	}

	// Remove the import track
	if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
		return
	}

	// Log the decision and set the final state.
	session, _ := auth.GetSession(req)
	who := session.User

	if _, err = tx.ExecContext(ctx, logDecisionSQL, id,
		fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	message = fmt.Sprintf(
		"Import #%d successfully changed to state '%s'.", id, state)

	return
}