view pkg/imports/queue.go @ 1138:443fc80a315f

Don't issue new lines at end of log messages when importing.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 09 Nov 2018 10:57:49 +0100
parents a5069da2f0b7
children 930fdd8b474f
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"runtime/debug"
	"sync"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/config"
)

type (
	Feedback interface {
		Info(fmt string, args ...interface{})
		Warn(fmt string, args ...interface{})
		Error(fmt string, args ...interface{})
	}

	Job interface {
		Do(*sql.Conn, Feedback) error
		CleanUp() error
	}

	JobKind string

	JobCreator interface {
		Create(kind JobKind, data string) (Job, error)
		Depends() []string
	}

	idJob struct {
		id   int64
		kind JobKind
		user string
		data string
	}
)

const pollDuration = time.Second * 10

type importQueue struct {
	signalChan chan struct{}
	creatorsMu sync.Mutex
	creators   map[JobKind]JobCreator
	usedDeps   map[string]struct{}
}

var iqueue = importQueue{
	signalChan: make(chan struct{}),
	creators:   map[JobKind]JobCreator{},
	usedDeps:   map[string]struct{}{},
}

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE waterway.imports SET state = 'queued'::waterway.import_state
WHERE state = 'running'::waterway.import_state
`

	insertJobSQL = `
INSERT INTO waterway.imports (
  kind,
  username,
  data
) VALUES (
  $1,
  $2,
  $3
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  username,
  data
FROM waterway.imports 
WHERE state = 'queued'::waterway.import_state AND enqueued IN (
  SELECT min(enqueued)
  FROM waterway.imports 
  WHERE state = 'queued'::waterway.import_state AND
  kind = ANY($1)
)
LIMIT 1
`
	updateStateSQL = `
UPDATE waterway.imports SET state = $1::waterway.import_state
WHERE id = $2
`
	logMessageSQL = `
INSERT INTO waterway.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::waterway.log_type,
  $3
)`
)

func init() {
	go iqueue.importLoop()
}

func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	q.creators[kind] = jc
}

func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	iqueue.registerJobCreator(kind, jc)
}

func (q *importQueue) jobCreator(kind JobKind) JobCreator {
	q.creatorsMu.Lock()
	defer q.creatorsMu.Unlock()
	return q.creators[kind]
}

func (q *importQueue) addJob(kind JobKind, user, data string) (int64, error) {
	ctx := context.Background()
	var id int64
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
	})
	if err == nil {
		select {
		case q.signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

func AddJob(kind JobKind, user, data string) (int64, error) {
	return iqueue.addJob(kind, user, data)
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() (err error) {
		defer func() {
			if p := recover(); p != nil {
				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
			}
		}()
		return fn()
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func (q *importQueue) fetchJob() (*idJob, error) {

	var which []string

	q.creatorsMu.Lock()
nextCreator:
	for kind, jc := range q.creators {
		for _, d := range jc.Depends() {
			if _, found := q.usedDeps[d]; found {
				continue nextCreator
			}
		}
		which = append(which, string(kind))
	}
	q.creatorsMu.Unlock()

	if len(which) == 0 {
		return nil, sql.ErrNoRows
	}

	var kinds pgtype.TextArray
	if err := kinds.Set(which); err != nil {
		return nil, err
	}

	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func updateState(id int64, state string) error {
	ctx := context.Background()
	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	return err
}

func (q *importQueue) importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
				log.Printf("db error: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-q.signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("starting import #%d\n", idj.id)

		jc := q.jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		// Lock dependencies.
		q.creatorsMu.Lock()
		for _, d := range jc.Depends() {
			q.usedDeps[d] = struct{}{}
		}
		q.creatorsMu.Unlock()

		go func(jc JobCreator, idj *idJob) {

			// Unlock the dependencies.
			defer func() {
				q.creatorsMu.Lock()
				for _, d := range jc.Depends() {
					delete(q.usedDeps, d)
				}
				q.creatorsMu.Unlock()
				select {
				case q.signalChan <- struct{}{}:
				default:
				}
			}()

			job, err := jc.Create(idj.kind, idj.data)
			if err != nil {
				errorAndFail(idj.id, "failed to create job for import #%d: %v",
					idj.id, err)
				return
			}

			feedback := logFeedback(idj.id)

			feedback.Info("import #%d started", idj.id)

			errDo := survive(func() error {
				return auth.RunAs(idj.user, context.Background(),
					func(conn *sql.Conn) error { return job.Do(conn, feedback) })
			})()
			if errDo != nil {
				feedback.Error("error do: %v", errDo)
			}
			errCleanup := survive(job.CleanUp)()
			if errCleanup != nil {
				feedback.Error("error cleanup: %v", errCleanup)
			}

			var state string
			if errDo != nil || errCleanup != nil {
				state = "failed"
			} else {
				state = "successful"
			}
			if err := updateState(idj.id, state); err != nil {
				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
			}
			log.Printf("import #%d finished: %s\n", idj.id, state)
		}(jc, idj)
	}
}