Mercurial > gemma
view pkg/imports/queue.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | 7c83b5277c1c |
children | 054f5d61452d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "fmt" "log" "runtime/debug" "strings" "sync" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/config" ) type ( // Feedback is passed to the Do method of a Job to log // informations, warnings or errors. Feedback interface { // Info logs informations. Info(fmt string, args ...interface{}) // Warn logs warnings. Warn(fmt string, args ...interface{}) // Error logs errors. Error(fmt string, args ...interface{}) } // UnchangedError may be issued by Do of a Job to indicate // That the database has not changed. UnchangedError string // Job is the central abstraction of an import job // run by the import queue. Job interface { // Do is called to do the actual import. // Bind transactions to ctx and conn, please- // id is the number of the import job. // feedback can be used to log the import process. // If no error is return the import is assumed to // be successfull. The non-error return value is // serialized as a JSON string into the database as // a summary to the import to be used by the review process. Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) // CleanUp is called to clean up ressources hold by the import. // It is called whether the import succeeded or not. CleanUp() error } // JobKind is the type of an import. // Choose a unique name for every import. JobKind string // JobCreator is used to bring a job to life as it is stored // in pure meta-data form to the database. JobCreator interface { // Description is the long name of the import. Description() string // Create build the actual job. Create() Job // Depends returns a list of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. Depends() []string // StageDone is called if an import is positively reviewed // (state = accepted). This can be used to finalize the imported // data to move it e.g from the staging area. StageDone(context.Context, *sql.Tx, int64) error // AutoAccept indicates that imports of this kind // don't need a review. AutoAccept() bool } idJob struct { id int64 kind JobKind user string waitRetry pgtype.Interval trysLeft sql.NullInt64 sendEmail bool data string } ) const pollDuration = time.Second * 10 type importQueue struct { signalChan chan struct{} creatorsMu sync.Mutex creators map[JobKind]JobCreator usedDeps map[string]struct{} } var iqueue = importQueue{ signalChan: make(chan struct{}), creators: map[JobKind]JobCreator{}, usedDeps: map[string]struct{}{}, } var ( // ImportStateNames is a list of the states a job can be in. ImportStateNames = []string{ "queued", "running", "failed", "unchanged", "pending", "accepted", "declined", } ) const ( queueUser = "sys_admin" reEnqueueRunningSQL = ` UPDATE import.imports SET state = 'queued'::import_state WHERE state = 'running'::import_state` insertJobSQL = ` INSERT INTO import.imports ( kind, due, trys_left, retry_wait, username, send_email, data ) VALUES ( $1, COALESCE($2, CURRENT_TIMESTAMP), $3, $4, $5, $6, $7 ) RETURNING id` selectJobSQL = ` SELECT id, kind, trys_left, retry_wait, username, send_email, data FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND state = 'queued'::import_state AND enqueued IN ( SELECT min(enqueued) FROM import.imports WHERE state = 'queued'::import_state AND kind = ANY($1)) LIMIT 1` updateStateSQL = ` UPDATE import.imports SET state = $1::import_state WHERE id = $2` updateStateSummarySQL = ` UPDATE import.imports SET state = $1::import_state, summary = $2 WHERE id = $3` logMessageSQL = ` INSERT INTO import.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::log_type, $3 )` ) func init() { go iqueue.importLoop() } // Error makes UnchangedError an error. func (ue UnchangedError) Error() string { return string(ue) } func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc } // FindJobCreator looks up a JobCreator in the global import queue. func FindJobCreator(kind JobKind) JobCreator { return iqueue.jobCreator(kind) } // ImportKindNames is a list of the names of the imports the // global import queue supports. func ImportKindNames() []string { return iqueue.importKindNames() } // HasImportKindName checks if the import queue supports a given kind. func HasImportKindName(kind string) bool { return iqueue.hasImportKindName(kind) } // func (q *importQueue) hasImportKindName(kind string) bool { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[JobKind(kind)] != nil } // RegisterJobCreator adds a JobCreator to the global import queue. // This a good candidate to be called in a init function for // a particular JobCreator. func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) iqueue.registerJobCreator(kind, jc) } func (q *importQueue) importKindNames() []string { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() names := make([]string, len(q.creators)) var i int for kind := range q.creators { names[i] = string(kind) i++ } // XXX: Consider using sort.Strings to make output deterministic. return names } func (idj *idJob) nextRetry(feedback Feedback) bool { switch { case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: return false case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: return true case idj.trysLeft.Valid: if idj.trysLeft.Int64 < 1 { feedback.Warn("import should be retried, but no retrys left") } else { idj.trysLeft.Int64-- feedback.Info("import failed but will be retried") return true } } return false } func (idj *idJob) nextDue() time.Time { now := time.Now() if idj.waitRetry.Status == pgtype.Present { var d time.Duration if err := idj.waitRetry.AssignTo(&d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) } else { now = now.Add(d) } } return now } func (idj *idJob) trysLeftPointer() *int { if !idj.trysLeft.Valid { return nil } t := int(idj.trysLeft.Int64) return &t } func (idj *idJob) waitRetryPointer() *time.Duration { if idj.waitRetry.Status != pgtype.Present { return nil } d := new(time.Duration) if err := idj.waitRetry.AssignTo(d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) return nil } return d } func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[kind] } func (q *importQueue) addJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { var id int64 if due.IsZero() { due = time.Now() } var tl sql.NullInt64 if trysLeft != nil { tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} } var wr pgtype.Interval if waitRetry != nil { if err := wr.Set(*waitRetry); err != nil { return 0, err } } else { wr = pgtype.Interval{Status: pgtype.Null} } ctx := context.Background() err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, insertJobSQL, string(kind), due, tl, &wr, user, sendEmail, data).Scan(&id) }) if err == nil { select { case q.signalChan <- struct{}{}: default: } } return id, err } // AddJob adds a job to the global import queue to be executed // as soon as possible after due. // This is gone in a separate Go routine // so this will not block. func AddJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { return iqueue.addJob( kind, due, trysLeft, waitRetry, user, sendEmail, data) } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Printf("error: logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...interface{}) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...interface{}) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...interface{}) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() (err error) { defer func() { if p := recover(); p != nil { err = fmt.Errorf("%v: %s", p, string(debug.Stack())) } }() return fn() } } func reEnqueueRunning() error { ctx := context.Background() return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) return err }) } func (q *importQueue) fetchJob() (*idJob, error) { var which []string q.creatorsMu.Lock() nextCreator: for kind, jc := range q.creators { for _, d := range jc.Depends() { if _, found := q.usedDeps[d]; found { continue nextCreator } } which = append(which, string(kind)) } q.creatorsMu.Unlock() if len(which) == 0 { return nil, sql.ErrNoRows } var kinds pgtype.TextArray if err := kinds.Set(which); err != nil { return nil, err } var ji idJob ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, &ji.trysLeft, &ji.waitRetry, &ji.user, &ji.sendEmail, &ji.data, ); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { err = tx.Commit() } return err }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func updateStateSummary( ctx context.Context, id int64, state string, summary interface{}, ) error { var s sql.NullString if summary != nil { var b strings.Builder if err := json.NewEncoder(&b).Encode(summary); err != nil { return err } s = sql.NullString{String: b.String(), Valid: true} } return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) return err } func (q *importQueue) importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. if err := reEnqueueRunning(); err != nil { log.Printf("error: re-enqueuing failed: %v", err) } for { var idj *idJob var err error for { if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { log.Printf("error: db: %v\n", err) } if idj != nil { break } select { case <-q.signalChan: case <-time.After(pollDuration): } } log.Printf("info: starting import #%d\n", idj.id) jc := q.jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } // Lock dependencies. q.creatorsMu.Lock() for _, d := range jc.Depends() { q.usedDeps[d] = struct{}{} } q.creatorsMu.Unlock() go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { q.creatorsMu.Lock() for _, d := range jc.Depends() { delete(q.usedDeps, d) } q.creatorsMu.Unlock() select { case q.signalChan <- struct{}{}: default: } }() job := jc.Create() if err := common.FromJSONString(idj.data, job); err != nil { errorAndFail(idj.id, "failed to create job for import #%d: %v", idj.id, err) return } feedback := logFeedback(idj.id) feedback.Info("import #%d started", idj.id) ctx := context.Background() var summary interface{} errDo := survive(func() error { return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { var err error summary, err = job.Do(ctx, idj.id, conn, feedback) return err }) })() var unchanged, retry bool if v, ok := errDo.(UnchangedError); ok { feedback.Info("unchanged: %s", v.Error()) unchanged = true } else if errDo != nil { feedback.Error("error in import: %v", errDo) retry = idj.nextRetry(feedback) } var errCleanup error if retry { // cleanup debris if errCleanup = survive(job.CleanUp)(); errCleanup != nil { feedback.Error("error cleanup: %v", errCleanup) } } var state string switch { case unchanged: state = "unchanged" case errDo != nil || errCleanup != nil: state = "failed" case jc.AutoAccept(): state = "accepted" default: state = "pending" } if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) } log.Printf("info: import #%d finished: %s\n", idj.id, state) if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } if retry { nid, err := q.addJob( idj.kind, idj.nextDue(), idj.trysLeftPointer(), idj.waitRetryPointer(), idj.user, idj.sendEmail, idj.data) if err != nil { log.Printf("error: retry enqueue failed: %v\n", err) } else { log.Printf("info: re-enqueued job with id %d\n", nid) } } }(jc, idj) } }