view pkg/imports/queue.go @ 1646:a0982c38eac0

Import queue: Implemented email notifications.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 20 Dec 2018 15:33:29 +0100
parents 49c04bb64e0a
children 0828fcb80647
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"html/template"
	"log"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/misc"
)

type (
	// Feedback is passed to the Do method of a Job to log
	// informations, warnings or errors.
	Feedback interface {
		// Info logs informations.
		Info(fmt string, args ...interface{})
		// Warn logs warnings.
		Warn(fmt string, args ...interface{})
		// Error logs errors.
		Error(fmt string, args ...interface{})
	}

	// Job is the central abstraction of an import job
	// run by the import queue.
	Job interface {
		// Do is called to do the actual import.
		// Bind transactions to ctx and conn, please-
		// id is the number of the import job.
		// feedback can be used to log the import process.
		// If no error is return the import is assumed to
		// be successfull. The non-error return value is
		// serialized as a JSON string into the database as
		// a summary to the import to be used by the review process.
		Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
		// CleanUp is called to clean up ressources hold by the import.
		// It is called whether the import succeeded or not.
		CleanUp() error
	}

	// JobKind is the type of an import.
	// Choose a unique name for every import.
	JobKind string

	// JobCreator is used to bring a job to life as it is stored
	// in pure meta-data form to the database.
	JobCreator interface {
		// Description is the long name of the import.
		Description() string
		// Create build the actual job.
		// kind is the name of the import type.
		// data is a free form string to pass arguments to the creation
		// process. This is useful to tell e.g. where to find data
		// in the file system to be used for importing.
		Create(kind JobKind, data string) (Job, error)
		// Depends returns a list of ressources locked by this type of import.
		// Imports are run concurrently if they have disjoint sets
		// of dependencies.
		Depends() []string
		// StageDone is called if an import is positively reviewed
		// (state = accepted). This can be used to finalize the imported
		// data to move it e.g from the staging area.
		StageDone(context.Context, *sql.Tx, int64) error
	}

	idJob struct {
		id         int64
		kind       JobKind
		user       string
		sendEmail  bool
		autoAccept bool
		data       string
	}
)

const pollDuration = time.Second * 10

type importQueue struct {
	signalChan chan struct{}
	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]struct{}
}

var iqueue = importQueue{
	signalChan: make(chan struct{}),
	creators:   map[JobKind]JobCreator{},
	usedDeps:   map[string]struct{}{},
}

var (
	// ImportStateNames is a list of the states a job can be in.
	ImportStateNames = []string{
		"queued",
		"running",
		"failed",
		"pending",
		"accepted",
		"declined",
	}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE waterway.imports SET state = 'queued'::waterway.import_state
WHERE state = 'running'::waterway.import_state`

	insertJobSQL = `
INSERT INTO waterway.imports (
  kind,
  username,
  send_email,
  auto_accept,
  data
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  username,
  send_email,
  auto_accept,
  data
FROM waterway.imports
WHERE state = 'queued'::waterway.import_state AND enqueued IN (
  SELECT min(enqueued)
  FROM waterway.imports
  WHERE state = 'queued'::waterway.import_state AND
  kind = ANY($1)
)
LIMIT 1`

	updateStateSQL = `
UPDATE waterway.imports SET state = $1::waterway.import_state
WHERE id = $2`

	updateStateSummarySQL = `
UPDATE waterway.imports SET
   state = $1::waterway.import_state,
   summary = $2
WHERE id = $3`

	logMessageSQL = `
INSERT INTO waterway.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::waterway.log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
}

// FindJobCreator looks up a JobCreator in the global import queue.
func FindJobCreator(kind JobKind) JobCreator {
	return iqueue.jobCreator(kind)
}

// ImportKindNames is a list of the names of the imports the
// global import queue supports.
func ImportKindNames() []string {
	return iqueue.importKindNames()
}

// HasImportKind checks if the import queue supports a given kind.
func HasImportKindName(kind string) bool {
	return iqueue.hasImportKindName(kind)
}

//
func (q *importQueue) hasImportKindName(kind string) bool {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[JobKind(kind)] != nil
}

// RegisterJobCreator adds a JobCreator to the global import queue.
// This a good candidate to be called in a init function for
// a particular JobCreator.
func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) importKindNames() []string {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	names := make([]string, len(q.creators))
	var i int
	for kind := range q.creators {
		names[i] = string(kind)
		i++
	}
	// XXX: Consider using sort.Strings to make output deterministic.
	return names
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(
	kind JobKind,
	user string,
	sendEmail, autoAccept bool,
	data string,
) (int64, error) {
	ctx := context.Background()
	var id int64
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		return conn.QueryRowContext(
			ctx,
			insertJobSQL,
			string(kind),
			user,
			sendEmail,
			autoAccept,
			data).Scan(&id)
	})
	if err == nil {
		select {
		case q.signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

// AddJob adds a job to the global import queue to be executed
// as soon as possible. This is gone in a separate Go routine
// so this will not block.
func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) {
	return iqueue.addJob(kind, user, sendEmail, autoAccept, data)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		for _, d := range jc.Depends() {
			if _, found := q.usedDeps[d]; found {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id,
			&ji.kind,
			&ji.user,
			&ji.sendEmail,
			&ji.autoAccept,
			&ji.data,
		); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func updateStateSummary(
	ctx context.Context,
	id int64,
	state string,
	summary interface{},
) error {
	var s sql.NullString
	if summary != nil {
		var b strings.Builder
		if err := json.NewEncoder(&b).Encode(summary); err != nil {
			return err
		}
		s = sql.NullString{String: b.String(), Valid: true}
	}
	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	return err
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Printf("db error: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-q.signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.creatorsMu.Lock()
		for _, d := range jc.Depends() {
			q.usedDeps[d] = struct{}{}
		}
		q.creatorsMu.Unlock()

		go func(jc JobCreator, idj *idJob) {

			// Unlock the dependencies.
			defer func() {
				q.creatorsMu.Lock()
				for _, d := range jc.Depends() {
					delete(q.usedDeps, d)
				}
				q.creatorsMu.Unlock()
				select {
				case q.signalChan <- struct{}{}:
				default:
				}
			}()

			job, err := jc.Create(idj.kind, idj.data)
			if err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			feedback := logFeedback(idj.id)

			feedback.Info("import #%d started", idj.id)

			ctx := context.Background()
			var summary interface{}

			errDo := survive(func() error {
				return auth.RunAs(ctx, idj.user,
					func(conn *sql.Conn) error {
						var err error
						summary, err = job.Do(ctx, idj.id, conn, feedback)
						return err
					})
			})()
			if errDo != nil {
				feedback.Error("error do: %v", errDo)
			}
			errCleanup := survive(job.CleanUp)()
			if errCleanup != nil {
				feedback.Error("error cleanup: %v", errCleanup)
			}

			var state string
			switch {
			case errDo != nil || errCleanup != nil:
				state = "failed"
			case idj.autoAccept:
				state = "accepted"
			default:
				state = "pending"
			}
			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
			}
			// TODO: Send email if sendEmail is set.
			log.Printf("import #%d finished: %s\n", idj.id, state)
			if idj.sendEmail {
				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
			}
		}(jc, idj)
	}
}

const (
	selectEmailSQL = `SELECT email_address FROM users.list_users WHERE username = $1`

	importNotificationMailSubject = `import notification mail`
)

var (
	importNotificationMailTmpl = template.Must(
		template.New("notification").Parse(`
Dear {{ .User }},

a {{ .Description }} import on server {{ .Server }} triggered
this email notification.

{{ if eq .State "accepted" }}The imported data were successfully integrated into the database.{{ end -}}
{{ if eq .State "failed" }}The import failed for some reasons.{{ end -}}
{{ if eq .State "pending" }}The imported data could be integrated into the database
but your final decision is needed.{{ end }}

Please follow this link to have a closer look at the details:

{{ .Server }}/#?review={{ .ID }}

Best regards
    Your service team`))
)

func sendNotificationMail(user, description, state string, id int64) {
	config.WaitReady()

	ctx := context.Background()
	var email string
	if err := auth.RunAs(ctx, user,
		func(conn *sql.Conn) error {
			return conn.QueryRowContext(ctx, selectEmailSQL, user).Scan(&email)
		},
	); err != nil {
		log.Printf("error: %v\n", err)
		return
	}

	data := struct {
		User        string
		Description string
		Server      string
		State       string
		ID          int64
	}{
		User:        user,
		Description: description,
		Server:      config.ExternalURL(),
		State:       state,
		ID:          id,
	}

	var body strings.Builder
	if err := importNotificationMailTmpl.Execute(&body, &data); err != nil {
		log.Printf("error: %v\n", err)
		return
	}

	if err := misc.SendMail(email, importNotificationMailSubject, body.String()); err != nil {
		log.Printf("error: %v\n", err)
	}
}