Mercurial > gemma
view pkg/imports/queue.go @ 3163:d9903cb34842
Handle failing INSERTs gracefully during gauges import
Using the special table EXCLUDED in INSERT statements makes
functionally no difference, but makes editing of the statements easier.
Since reference water levels are not deleted all at once before
(re-)importing anymore, take the chance to report those that were
deleted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 06 May 2019 13:25:49 +0200 |
parents | bfea3f80ca9a |
children | 4acbee65275d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "fmt" "log" "runtime/debug" "strings" "sync" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/config" ) type ( // Feedback is passed to the Do method of a Job to log // informations, warnings or errors. Feedback interface { // Info logs informations. Info(fmt string, args ...interface{}) // Warn logs warnings. Warn(fmt string, args ...interface{}) // Error logs errors. Error(fmt string, args ...interface{}) } // UnchangedError may be issued by Do of a Job to indicate // That the database has not changed. UnchangedError string // Job is the central abstraction of an import job // run by the import queue. Job interface { // Do is called to do the actual import. // Bind transactions to ctx and conn, please- // id is the number of the import job. // feedback can be used to log the import process. // If no error is return the import is assumed to // be successfull. The non-error return value is // serialized as a JSON string into the database as // a summary to the import to be used by the review process. Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) // CleanUp is called to clean up ressources hold by the import. // It is called whether the import succeeded or not. CleanUp() error } // JobKind is the type of an import. // Choose a unique name for every import. JobKind string // JobCreator is used to bring a job to life as it is stored // in pure meta-data form to the database. JobCreator interface { // Description is the long name of the import. Description() string // Create build the actual job. Create() Job // Depends returns a list of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. Depends() []string // StageDone is called if an import is positively reviewed // (state = accepted). This can be used to finalize the imported // data to move it e.g from the staging area. StageDone(context.Context, *sql.Tx, int64) error // AutoAccept indicates that imports of this kind // don't need a review. AutoAccept() bool } idJob struct { id int64 kind JobKind user string waitRetry pgtype.Interval trysLeft sql.NullInt64 sendEmail bool data string } ) const pollDuration = time.Second * 10 type importQueue struct { signalChan chan struct{} creatorsMu sync.Mutex creators map[JobKind]JobCreator usedDeps map[string]struct{} } var iqueue = importQueue{ signalChan: make(chan struct{}), creators: map[JobKind]JobCreator{}, usedDeps: map[string]struct{}{}, } var ( // ImportStateNames is a list of the states a job can be in. ImportStateNames = []string{ "queued", "running", "failed", "unchanged", "pending", "accepted", "declined", } ) const ( queueUser = "sys_admin" reEnqueueRunningSQL = ` UPDATE import.imports SET state = 'queued'::import_state WHERE state = 'running'::import_state` insertJobSQL = ` INSERT INTO import.imports ( kind, due, trys_left, retry_wait, username, send_email, data ) VALUES ( $1, COALESCE($2, CURRENT_TIMESTAMP), $3, $4, $5, $6, $7 ) RETURNING id` selectJobSQL = ` SELECT id, kind, trys_left, retry_wait, username, send_email, data FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND state = 'queued'::import_state AND enqueued IN ( SELECT min(enqueued) FROM import.imports WHERE state = 'queued'::import_state AND kind = ANY($1)) LIMIT 1` updateStateSQL = ` UPDATE import.imports SET state = $1::import_state WHERE id = $2` updateStateSummarySQL = ` UPDATE import.imports SET state = $1::import_state, summary = $2 WHERE id = $3` logMessageSQL = ` INSERT INTO import.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::log_type, $3 )` ) func init() { go iqueue.importLoop() } // Error makes UnchangedError an error. func (ue UnchangedError) Error() string { return string(ue) } func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc } // FindJobCreator looks up a JobCreator in the global import queue. func FindJobCreator(kind JobKind) JobCreator { return iqueue.jobCreator(kind) } // ImportKindNames is a list of the names of the imports the // global import queue supports. func ImportKindNames() []string { return iqueue.importKindNames() } // HasImportKindName checks if the import queue supports a given kind. func HasImportKindName(kind string) bool { return iqueue.hasImportKindName(kind) } // func (q *importQueue) hasImportKindName(kind string) bool { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[JobKind(kind)] != nil } // RegisterJobCreator adds a JobCreator to the global import queue. // This a good candidate to be called in a init function for // a particular JobCreator. func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) iqueue.registerJobCreator(kind, jc) } func (q *importQueue) importKindNames() []string { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() names := make([]string, len(q.creators)) var i int for kind := range q.creators { names[i] = string(kind) i++ } // XXX: Consider using sort.Strings to make output deterministic. return names } func (idj *idJob) nextRetry(feedback Feedback) bool { switch { case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: return false case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: return true case idj.trysLeft.Valid: if idj.trysLeft.Int64 < 1 { feedback.Warn("import should be retried, but no retrys left") } else { idj.trysLeft.Int64-- feedback.Info("import failed but will be retried") return true } } return false } func (idj *idJob) nextDue() time.Time { now := time.Now() if idj.waitRetry.Status == pgtype.Present { var d time.Duration if err := idj.waitRetry.AssignTo(&d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) } else { now = now.Add(d) } } return now } func (idj *idJob) trysLeftPointer() *int { if !idj.trysLeft.Valid { return nil } t := int(idj.trysLeft.Int64) return &t } func (idj *idJob) waitRetryPointer() *time.Duration { if idj.waitRetry.Status != pgtype.Present { return nil } d := new(time.Duration) if err := idj.waitRetry.AssignTo(d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) return nil } return d } func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[kind] } func (q *importQueue) addJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { var id int64 if due.IsZero() { due = time.Now() } var tl sql.NullInt64 if trysLeft != nil { tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} } var wr pgtype.Interval if waitRetry != nil { if err := wr.Set(*waitRetry); err != nil { return 0, err } } else { wr = pgtype.Interval{Status: pgtype.Null} } ctx := context.Background() err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, insertJobSQL, string(kind), due, tl, &wr, user, sendEmail, data).Scan(&id) }) if err == nil { select { case q.signalChan <- struct{}{}: default: } } return id, err } // AddJob adds a job to the global import queue to be executed // as soon as possible after due. // This is gone in a separate Go routine // so this will not block. func AddJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { return iqueue.addJob( kind, due, trysLeft, waitRetry, user, sendEmail, data) } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Printf("error: logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...interface{}) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...interface{}) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...interface{}) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() (err error) { defer func() { if p := recover(); p != nil { err = fmt.Errorf("%v: %s", p, string(debug.Stack())) } }() return fn() } } func reEnqueueRunning() error { ctx := context.Background() return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) return err }) } func (q *importQueue) fetchJob() (*idJob, error) { var which []string q.creatorsMu.Lock() nextCreator: for kind, jc := range q.creators { for _, d := range jc.Depends() { if _, found := q.usedDeps[d]; found { continue nextCreator } } which = append(which, string(kind)) } q.creatorsMu.Unlock() if len(which) == 0 { return nil, sql.ErrNoRows } var kinds pgtype.TextArray if err := kinds.Set(which); err != nil { return nil, err } var ji idJob ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, &ji.trysLeft, &ji.waitRetry, &ji.user, &ji.sendEmail, &ji.data, ); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { err = tx.Commit() } return err }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { // As it is important to keep the persistent model // in sync with the in-memory model try harder to store // the state. const maxTries = 10 var sleep = time.Second for try := 1; ; try++ { var err error if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { return err } log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", try, maxTries, err, sleep) time.Sleep(sleep) if sleep < time.Minute { if sleep *= 2; sleep > time.Minute { sleep = time.Minute } } } } func updateStateSummary( ctx context.Context, id int64, state string, summary interface{}, ) error { var s sql.NullString if summary != nil { var b strings.Builder if err := json.NewEncoder(&b).Encode(summary); err != nil { return err } s = sql.NullString{String: b.String(), Valid: true} } return tryHardToStoreState(ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() return tryHardToStoreState(ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) } func (q *importQueue) importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. if err := reEnqueueRunning(); err != nil { log.Printf("error: re-enqueuing failed: %v", err) } for { var idj *idJob var err error for { if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { log.Printf("error: db: %v\n", err) } if idj != nil { break } select { case <-q.signalChan: case <-time.After(pollDuration): } } log.Printf("info: starting import #%d\n", idj.id) jc := q.jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } // Lock dependencies. q.creatorsMu.Lock() for _, d := range jc.Depends() { q.usedDeps[d] = struct{}{} } q.creatorsMu.Unlock() go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { q.creatorsMu.Lock() for _, d := range jc.Depends() { delete(q.usedDeps, d) } q.creatorsMu.Unlock() select { case q.signalChan <- struct{}{}: default: } }() job := jc.Create() if err := common.FromJSONString(idj.data, job); err != nil { errorAndFail(idj.id, "failed to create job for import #%d: %v", idj.id, err) return } feedback := logFeedback(idj.id) feedback.Info("import #%d started", idj.id) ctx := context.Background() var summary interface{} errDo := survive(func() error { return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { var err error summary, err = job.Do(ctx, idj.id, conn, feedback) return err }) })() var unchanged, retry bool if v, ok := errDo.(UnchangedError); ok { feedback.Info("unchanged: %s", v.Error()) unchanged = true } else if errDo != nil { feedback.Error("error in import: %v", errDo) retry = idj.nextRetry(feedback) } var errCleanup error if !retry { // cleanup debris if errCleanup = survive(job.CleanUp)(); errCleanup != nil { feedback.Error("error cleanup: %v", errCleanup) } } var state string switch { case unchanged: state = "unchanged" case errDo != nil || errCleanup != nil: state = "failed" case jc.AutoAccept(): state = "accepted" default: state = "pending" } if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) } log.Printf("info: import #%d finished: %s\n", idj.id, state) if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } if retry { nid, err := q.addJob( idj.kind, idj.nextDue(), idj.trysLeftPointer(), idj.waitRetryPointer(), idj.user, idj.sendEmail, idj.data) if err != nil { log.Printf("error: retry enqueue failed: %v\n", err) } else { log.Printf("info: re-enqueued job with id %d\n", nid) } } }(jc, idj) } }