view pkg/controllers/importqueue.go @ 5563:3be1d79ad3a3

Log export: Limit log lines loading to 'sr' imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Sat, 12 Feb 2022 17:36:20 +0100
parents 5152b4db40cc
children aaa9e658cabd
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/imports"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectImportsCountSQL = `
SELECT count(*)
FROM import.imports
WHERE
`
	selectImportsSQL = `
SELECT
  imports.id AS id,
  state::varchar,
  enqueued,
  changed,
  kind,
  username,
  signer,
  summary IS NOT NULL AS has_summary,
  EXISTS(SELECT 1 FROM import.import_logs
    WHERE kind = 'error'::log_type and import_id = imports.id) AS has_errors,
  EXISTS(SELECT 1 FROM import.import_logs
    WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings
FROM import.imports
WHERE
`
	// XXX: Consider not only supporting 'sr' for log message parsing.
	selectExportSQL = `
SELECT
  imports.id AS id,
  state::varchar,
  enqueued,
  changed,
  imports.kind,
  username,
  (SELECT country FROM users.list_users lu
    WHERE lu.username = import.imports.username) AS country,
  signer,
  EXISTS(SELECT 1 FROM import.import_logs
    WHERE kind = 'warn'::log_type and import_id = id) AS has_warnings,
  data,
  CASE WHEN kind = 'sr' THEN ARRAY(SELECT msg FROM import.import_logs WHERE import_id = id)
  ELSE NULL
  END AS msgs
FROM import.imports
WHERE
`
	selectEnqueuedSQL = `
SELECT enqueued FROM import.imports
WHERE
`
	selectImportSummarySQL = `
SELECT summary, enqueued FROM import.imports WHERE id = $1`

	selectHasNoRunningImportSQL = `
SELECT true FROM import.imports
WHERE id = $1 AND state <> 'running'::import_state`

	selectImportLogsSQL = `
SELECT
  time,
  kind::varchar,
  msg
FROM import.import_logs
WHERE import_id = $1
ORDER BY time`

	deleteLogsSQL = `
DELETE FROM import.import_logs WHERE import_id = $1`

	deleteImportSQL = `
DELETE FROM import.imports WHERE id = $1`
)

type filledStmt struct {
	stmt strings.Builder
	args []interface{}
}

func buildFilters(projection string, req *http.Request) (*filledStmt, *filledStmt, *filledStmt, error) {

	var l, a, b filterAnd

	var noBefore, noAfter bool

	cond := func(format string, args ...interface{}) {
		term := &filterTerm{format: format, args: args}
		l = append(l, term)
		a = append(a, term)
		b = append(b, term)
	}

	// Always filter review jobs. They are only for internal use.
	cond(` NOT imports.kind LIKE '%%` + imports.ReviewJobSuffix + `'`)

	if query := req.FormValue("query"); query != "" {
		query = "%" + query + "%"
		cond(` (imports.kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+
			`id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `,
			query, query, query, query)
	}

	if cc := req.FormValue("cc"); cc != "" {
		codes := sliceToTextArray(splitUpper(cc))
		cond(" username IN "+
			"(SELECT username FROM internal.user_profiles "+
			"WHERE country = ANY($%d)) ",
			codes)
	}

	if st := req.FormValue("states"); st != "" {
		states := toTextArray(st, imports.ImportStateNames)
		cond(" state = ANY($%d) ", states)
	}

	if ks := req.FormValue("kinds"); ks != "" {
		kinds := toTextArray(ks, imports.ImportKindNames())
		cond(" imports.kind = ANY($%d) ", kinds)
	}

	if idss := req.FormValue("ids"); idss != "" {
		ids := toInt8Array(idss)
		cond(" id = ANY($%d) ", ids)
	}

	if from := req.FormValue("from"); from != "" {
		fromTime, err := common.ParseTime(from)
		if err != nil {
			return nil, nil, nil, err
		}
		l = append(l, buildFilterTerm("enqueued >= $%d", fromTime))
		b = append(b, buildFilterTerm("enqueued < $%d", fromTime))
	} else {
		noBefore = true
	}

	if to := req.FormValue("to"); to != "" {
		toTime, err := common.ParseTime(to)
		if err != nil {
			return nil, nil, nil, err
		}
		l = append(l, buildFilterTerm("enqueued <= $%d", toTime))
		a = append(a, buildFilterTerm("enqueued > $%d", toTime))
	} else {
		noAfter = true
	}

	switch warn := strings.ToLower(req.FormValue("warnings")); warn {
	case "1", "t", "true":
		cond(` EXISTS(SELECT 1 FROM import.import_logs
                 WHERE kind = 'warn'::log_type and import_id = imports.id)`)
	}

	switch errors := strings.ToLower(req.FormValue("errors")); errors {
	case "1", "t", "true":
		cond(` EXISTS(SELECT 1 FROM import.import_logs
                 WHERE kind = 'error'::log_type and import_id = imports.id)`)
	}

	fl := &filledStmt{}
	fa := &filledStmt{}
	fb := &filledStmt{}

	fa.stmt.WriteString(selectEnqueuedSQL)
	fb.stmt.WriteString(selectEnqueuedSQL)

	var counting bool

	if projection != "" {
		fl.stmt.WriteString(projection)
	} else {
		switch count := strings.ToLower(req.FormValue("count")); count {
		case "1", "t", "true":
			counting = true
			fl.stmt.WriteString(selectImportsCountSQL)
		default:
			fl.stmt.WriteString(selectImportsSQL)
		}
	}

	if len(l) == 0 {
		fl.stmt.WriteString(" TRUE ")
	} else {
		l.serialize(&fl.stmt, &fl.args)
	}

	if len(b) == 0 {
		fb.stmt.WriteString(" TRUE ")
	} else {
		b.serialize(&fb.stmt, &fb.args)
	}

	if len(a) == 0 {
		fa.stmt.WriteString(" TRUE ")
	} else {
		a.serialize(&fa.stmt, &fa.args)
	}

	if !counting {
		fl.stmt.WriteString(" ORDER BY enqueued DESC ")
		fa.stmt.WriteString(" ORDER BY enqueued LIMIT 1")
		fb.stmt.WriteString(" ORDER BY enqueued DESC LIMIT 1")
	}

	if noBefore {
		fb = nil
	}
	if noAfter {
		fa = nil
	}

	return fl, fb, fa, nil
}

func neighbored(ctx context.Context, conn *sql.Conn, fb *filledStmt) *models.ImportTime {

	var when time.Time
	err := conn.QueryRowContext(ctx, fb.stmt.String(), fb.args...).Scan(&when)
	switch {
	case err == sql.ErrNoRows:
		return nil
	case err != nil:
		log.Warnf("%v\n", err)
		return nil
	}
	return &models.ImportTime{Time: when.UTC()}
}

func exportImports(rw http.ResponseWriter, req *http.Request) {

	list, _, _, err := buildFilters(selectExportSQL, req)
	if err != nil {
		http.Error(rw, "error: "+err.Error(), http.StatusBadRequest)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")
	out := csv.NewWriter(rw)

	record := []string{
		"#id",
		"#kind",
		"#enqueued",
		"#changed",
		"#user",
		"#country",
		"#signer",
		"#state",
		"#warnings",
		"#source",
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Errorf("%v\n", err)
		return
	}

	start := time.Now()

	conn := mw.GetDBConn(req)
	ctx := req.Context()
	var rows *sql.Rows
	if rows, err = conn.QueryContext(ctx, list.stmt.String(), list.args...); err != nil {
		log.Errorf("%v\n", err)
		return
	}
	defer rows.Close()

	// Extract some meta infos from the import.
	type Description interface {
		Description([]string) (string, error)
	}

	type dataset struct {
		id       int64
		state    string
		enqueued time.Time
		changed  time.Time
		kind     string
		user     string
		country  string
		signer   sql.NullString
		warnings bool
		data     string
		msgs     pgtype.TextArray
	}

	// Log unsupported description interfaces per kind only once.
	unsupported := make(map[string]bool)

	store := func(ds *dataset) error {

		var description string

		// Do some introspection on the job to be more verbose.
		if jc := imports.FindJobCreator(imports.JobKind(ds.kind)); jc != nil {
			job := jc.Create()
			if err := common.FromJSONString(ds.data, job); err != nil {
				log.Errorf("%v\n", err)
			} else if desc, ok := job.(Description); ok {
				var ms []string
				if ds.msgs.Status == pgtype.Present {
					if err := ds.msgs.AssignTo(&ms); err != nil {
						return err
					}
				}
				if description, err = desc.Description(ms); err != nil {
					return err
				}
				description = strings.Replace(description, ",", "|", -1)
			} else {
				if !unsupported[ds.kind] {
					unsupported[ds.kind] = true
					log.Debugf("%s: description not supported\n", ds.kind)
				}
			}
		}

		var signer string
		if ds.signer.Valid {
			signer = ds.signer.String
		}

		record[0] = strconv.FormatInt(ds.id, 10)
		record[1] = ds.kind
		record[2] = ds.enqueued.UTC().Format(common.TimeFormat)
		record[3] = ds.changed.UTC().Format(common.TimeFormat)
		record[4] = ds.user
		record[5] = ds.country
		record[6] = signer
		record[7] = ds.state
		record[8] = strconv.FormatBool(ds.warnings)
		record[9] = description

		return out.Write(record)
	}

	for rows.Next() {
		var curr dataset

		if err := rows.Scan(
			&curr.id,
			&curr.state,
			&curr.enqueued,
			&curr.changed,
			&curr.kind,
			&curr.user,
			&curr.country,
			&curr.signer,
			&curr.warnings,
			&curr.data,
			&curr.msgs,
		); err != nil {
			log.Errorf("%v\n", err)
			return
		}

		if err := store(&curr); err != nil {
			log.Errorf("%v\n", err)
			return
		}
	}

	if err := rows.Err(); err != nil {
		log.Errorf("%v\n", err)
		return
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Errorf("%v\n", err)
	}

	log.Debugf("Export took: %v\n", time.Since(start))
}

func listImports(req *http.Request) (jr mw.JSONResult, err error) {

	var list, before, after *filledStmt

	if list, before, after, err = buildFilters("", req); err != nil {
		return
	}

	ctx := req.Context()

	conn := mw.JSONConn(req)

	// Fast path for counting

	switch count := strings.ToLower(req.FormValue("count")); count {
	case "1", "t", "true":
		var count int64
		err = conn.QueryRowContext(ctx, list.stmt.String(), list.args...).Scan(&count)
		switch {
		case err == sql.ErrNoRows:
			count, err = 0, nil
		case err != nil:
			return
		}
		jr = mw.JSONResult{Result: count}
		return
	}

	// Generate the list

	var rows *sql.Rows
	if rows, err = conn.QueryContext(ctx, list.stmt.String(), list.args...); err != nil {
		return
	}
	defer rows.Close()

	imports := make([]*models.Import, 0, 20)

	var signer sql.NullString

	for rows.Next() {
		var it models.Import
		var enqueued time.Time
		var changed time.Time
		if err = rows.Scan(
			&it.ID,
			&it.State,
			&enqueued,
			&changed,
			&it.Kind,
			&it.User,
			&signer,
			&it.Summary,
			&it.Errors,
			&it.Warnings,
		); err != nil {
			return
		}
		if signer.Valid {
			it.Signer = signer.String
		}
		it.Enqueued = models.ImportTime{Time: enqueued.UTC()}
		it.Changed = models.ImportTime{Time: changed.UTC()}
		imports = append(imports, &it)
	}

	if err = rows.Err(); err != nil {
		return
	}

	var prev, next *models.ImportTime

	if before != nil {
		prev = neighbored(ctx, conn, before)
	}

	if after != nil {
		next = neighbored(ctx, conn, after)
	}

	jr = mw.JSONResult{
		Result: struct {
			Prev    *models.ImportTime `json:"prev,omitempty"`
			Next    *models.ImportTime `json:"next,omitempty"`
			Imports []*models.Import   `json:"imports"`
		}{
			Imports: imports,
			Prev:    prev,
			Next:    next,
		},
	}
	return
}

func importLogs(req *http.Request) (jr mw.JSONResult, err error) {

	ctx := req.Context()

	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	conn := mw.JSONConn(req)

	// Check if he have such a import job first.
	var summary sql.NullString
	var enqueued time.Time
	err = conn.QueryRowContext(ctx, selectImportSummarySQL, id).Scan(
		&summary,
		&enqueued,
	)
	switch {
	case err == sql.ErrNoRows:
		err = mw.JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}
	enqueued = enqueued.UTC()

	var sum interface{}
	if summary.Valid {
		if err = json.NewDecoder(
			strings.NewReader(summary.String)).Decode(&sum); err != nil {
			return
		}
	}

	// We have it -> generate log entries.
	var rows *sql.Rows
	rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id)
	if err != nil {
		return
	}
	defer rows.Close()

	entries := make([]*models.ImportLogEntry, 0, 10)

	for rows.Next() {
		var entry models.ImportLogEntry
		var t time.Time
		if err = rows.Scan(&t, &entry.Kind, &entry.Message); err != nil {
			return
		}
		entry.Time = models.ImportTime{Time: t.UTC()}
		entries = append(entries, &entry)
	}

	if err = rows.Err(); err != nil {
		return
	}

	jr = mw.JSONResult{
		Result: struct {
			Enqueued models.ImportTime        `json:"enqueued"`
			Summary  interface{}              `json:"summary,omitempty"`
			Entries  []*models.ImportLogEntry `json:"entries"`
		}{
			Enqueued: models.ImportTime{Time: enqueued},
			Summary:  sum,
			Entries:  entries,
		},
	}
	return
}

func deleteImport(req *http.Request) (jr mw.JSONResult, err error) {

	ctx := req.Context()
	id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64)

	var tx *sql.Tx
	tx, err = mw.JSONConn(req).BeginTx(ctx, nil)
	if err != nil {
		return
	}
	defer tx.Rollback()

	// Check if he have such a import job first.
	var dummy bool
	err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy)
	switch {
	case err == sql.ErrNoRows:
		err = mw.JSONError{
			Code:    http.StatusNotFound,
			Message: fmt.Sprintf("Cannot find import #%d.", id),
		}
		return
	case err != nil:
		return
	}

	if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil {
		return
	}

	if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil {
		return
	}

	if err = tx.Commit(); err != nil {
		return
	}

	jr = mw.JSONResult{Code: http.StatusNoContent}

	return
}

func reviewImports(req *http.Request) (mw.JSONResult, error) {

	rs := *mw.JSONInput(req).(*[]models.Review)

	type reviewResult struct {
		ID      int64  `json:"id"`
		Message string `json:"message,omitempty"`
		Error   string `json:"error,omitempty"`
	}

	results := make([]reviewResult, len(rs))

	for i := range results {
		results[i].ID = rs[i].ID
		results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID)
	}

	var wg sync.WaitGroup
	var mu sync.Mutex

	for i := range rs {
		wg.Add(1)
		go func(idx int) {
			defer wg.Done()
			rev := &rs[idx]
			msg, err := decideImport(req, rev.ID, string(rev.State))
			mu.Lock()
			if err != nil {
				results[idx].Error = err.Error()
			}
			results[idx].Message = msg
			mu.Unlock()
		}(i)
	}

	done := make(chan struct{})
	go func() {
		defer close(done)
		wg.Wait()
	}()

	select {
	case <-time.After(5 * time.Second):
	case <-done:
	}

	out := make([]reviewResult, len(rs))
	mu.Lock()
	copy(out, results)
	mu.Unlock()

	return mw.JSONResult{Result: out}, nil
}

func reviewImport(req *http.Request) (jr mw.JSONResult, err error) {

	vars := mux.Vars(req)
	id, _ := strconv.ParseInt(vars["id"], 10, 64)
	state := vars["state"]

	var msg string
	if msg, err = decideImport(req, id, state); err != nil {
		return
	}

	result := struct {
		Message string `json:"message"`
	}{
		Message: msg,
	}

	jr = mw.JSONResult{Result: &result}
	return
}

func decideImport(
	req *http.Request,
	id int64,
	state string,
) (message string, err error) {

	session, _ := auth.GetSession(req)
	reviewer := session.User

	ctx := req.Context()
	accepted := state == "accepted"

	switch err = imports.DecideImport(ctx, id, accepted, reviewer); {
	case err == imports.ErrRetrying:
		return "", fmt.Errorf("Finalizing import #%d is re-scheduled.", id)
	case err != nil:
		return "", err
	}

	return fmt.Sprintf("Import #%d is %s.", id, state), nil
}