Mercurial > gemma
view pkg/imports/queue.go @ 5736:55892008ec96 default tip
Fixed a bunch of corner cases in WG import.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Wed, 29 May 2024 19:02:42 +0200 |
parents | 4c8652a61eab |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "runtime/debug" "sort" "strings" "sync" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/config" "gemma.intevation.de/gemma/pkg/log" "gemma.intevation.de/gemma/pkg/pgxutils" ) type ( // Feedback is passed to the Do method of a Job to log // informations, warnings or errors. Feedback interface { // Info logs informations. Info(fmt string, args ...any) // Warn logs warnings. Warn(fmt string, args ...any) // Error logs errors. Error(fmt string, args ...any) } // UnchangedError may be issued by Do of a Job to indicate // That the database has not changed. UnchangedError string // Job is the central abstraction of an import job // run by the import queue. Job interface { // Do is called to do the actual import. // Bind transactions to ctx and conn, please- // id is the number of the import job. // feedback can be used to log the import process. // If no error is return the import is assumed to // be successfull. The non-error return value is // serialized as a JSON string into the database as // a summary to the import to be used by the review process. Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (any, error) // CleanUp is called to clean up ressources hold by the import. // It is called whether the import succeeded or not. CleanUp() error } // FeedbackJob is a job to create feedback. FeedbackJob interface { Job CreateFeedback(int64) Feedback } // JobKind is the type of an import. // Choose a unique name for every import. JobKind string // JobCreator is used to bring a job to life as it is stored // in pure meta-data form to the database. JobCreator interface { // Description is the long name of the import. Description() string // Create build the actual job. Create() Job // Depends returns two lists of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. // The first list are locked exclusively. // The second allows multiple read users but only one writing one. Depends() [2][]string // StageDone is called if an import is positively reviewed // (state = accepted). This can be used to finalize the imported // data to move it e.g from the staging area. StageDone(context.Context, *sql.Tx, int64, Feedback) error // AutoAccept indicates that imports of this kind // don't need a review. AutoAccept() bool } // JobRemover is a extented JobCreator to remove a job. JobRemover interface { JobCreator RemoveJob() bool } idJob struct { id int64 kind JobKind user string waitRetry pgtype.Interval triesLeft sql.NullInt64 sendEmail bool data string } ) const ( pollDuration = time.Second * 10 runExclusive = -66666 ) const ( // ReviewJobSuffix is the prefix of review jobs. ReviewJobSuffix = "#review" reviewJobRetries = 200 reviewJobWait = 10 * time.Minute ) const ( hardMaxTries = 200 minWaitRetry = 5 * time.Second ) // ErrRetrying are used to signal a retry. var ErrRetrying = errors.New("retrying") type importQueue struct { cmdCh chan func(*importQueue) creatorsMu sync.Mutex creators map[JobKind]JobCreator usedDeps map[string]int waiting map[int64]chan struct{} } var iqueue = importQueue{ cmdCh: make(chan func(*importQueue)), creators: map[JobKind]JobCreator{}, usedDeps: map[string]int{}, waiting: make(map[int64]chan struct{}), } var ( // ImportStateNames is a list of the states a job can be in. ImportStateNames = []string{ "queued", "running", "failed", "unchanged", "pending", "accepted", "declined", "reviewed", } ) const ( queueUser = "sys_admin" reEnqueueRunningSQL = ` UPDATE import.imports SET state = 'queued'::import_state, changed = CURRENT_TIMESTAMP WHERE state = 'running'::import_state` insertJobSQL = ` INSERT INTO import.imports ( kind, due, trys_left, retry_wait, username, send_email, data ) VALUES ( $1, COALESCE($2, CURRENT_TIMESTAMP), $3, $4, $5, $6, $7 ) RETURNING id` // Select oldest queued job but prioritize review jobs selectJobSQL = ` SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') id, kind, trys_left, retry_wait, username, send_email, data FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND state = 'queued'::import_state AND kind = ANY($1) ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued LIMIT 1` updateStateSQL = ` UPDATE import.imports SET state = $1::import_state, changed = CURRENT_TIMESTAMP WHERE id = $2` updateStateSummarySQL = ` UPDATE import.imports SET state = $1::import_state, changed = CURRENT_TIMESTAMP, summary = $2 WHERE id = $3` deleteJobSQL = ` DELETE FROM import.imports WHERE id = $1` logMessageSQL = ` INSERT INTO import.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::log_type, $3 )` ) func init() { go iqueue.importLoop() } // Error makes UnchangedError an error. func (ue UnchangedError) Error() string { return string(ue) } type reviewedJobCreator struct { jobCreator JobCreator } func (*reviewedJobCreator) AutoAccept() bool { return true } func (*reviewedJobCreator) RemoveJob() bool { return true } func (rjc *reviewedJobCreator) Depends() [2][]string { return rjc.jobCreator.Depends() } func (rjc *reviewedJobCreator) Description() string { return rjc.jobCreator.Description() + ReviewJobSuffix } func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } type reviewedJob struct { ID int64 `json:"id"` Accepted bool `json:"accepted"` } func (*reviewedJobCreator) Create() Job { return new(reviewedJob) } func (*reviewedJob) CleanUp() error { return nil } func (rj *reviewedJob) CreateFeedback(int64) Feedback { return logFeedback(rj.ID) } func (rj *reviewedJob) Do( ctx context.Context, importID int64, conn *sql.Conn, _ Feedback, ) (any, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var signer string if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { return nil, err } var user, kind string if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { return nil, err } jc := FindJobCreator(JobKind(kind)) if jc == nil { return nil, fmt.Errorf("no job creator found for '%s'", kind) } importFeedback := logFeedback(rj.ID) if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { userTx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer userTx.Rollback() if rj.Accepted { err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) } else { _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) } if err == nil { err = userTx.Commit() } return err }); err != nil { return nil, err } // Remove the import track if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { return nil, err } var state string if rj.Accepted { state = "accepted" } else { state = "declined" } if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { return nil, err } importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) return nil, tx.Commit() } func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} } // FindJobCreator looks up a JobCreator in the global import queue. func FindJobCreator(kind JobKind) JobCreator { return iqueue.jobCreator(kind) } // ImportKindNames is a list of the names of the imports the // global import queue supports. func ImportKindNames() []string { return iqueue.importKindNames() } // LogImportKindNames logs a list of importer types registered // to the global import queue. func LogImportKindNames() { kinds := ImportKindNames() sort.Strings(kinds) log.Infof("registered import kinds: %s", strings.Join(kinds, ", ")) } // HasImportKindName checks if the import queue supports a given kind. func HasImportKindName(kind string) bool { return iqueue.hasImportKindName(kind) } func (q *importQueue) hasImportKindName(kind string) bool { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[JobKind(kind)] != nil } // RegisterJobCreator adds a JobCreator to the global import queue. // This a good candidate to be called in a init function for // a particular JobCreator. func RegisterJobCreator(kind JobKind, jc JobCreator) { iqueue.registerJobCreator(kind, jc) } func (q *importQueue) importKindNames() []string { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() names := make([]string, len(q.creators)) var i int for kind := range q.creators { names[i] = string(kind) i++ } // XXX: Consider using sort.Strings to make output deterministic. return names } // String implements [fmt/Stringer]. func (idj *idJob) String() string { return fmt.Sprintf("job %s [%d]", idj.kind, idj.id) } func (idj *idJob) nextRetry(feedback Feedback) bool { switch { case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: return false case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: return true case idj.triesLeft.Valid: if idj.triesLeft.Int64 < 1 { feedback.Warn("no retries left") } else { idj.triesLeft.Int64-- feedback.Info("failed but will retry") return true } } return false } func (idj *idJob) nextDue() time.Time { now := time.Now() if idj.waitRetry.Status == pgtype.Present { var d time.Duration if err := idj.waitRetry.AssignTo(&d); err != nil { log.Errorf("converting waitRetry failed: %v\n", err) } else { now = now.Add(d) } } return now } func (idj *idJob) triesLeftPointer() *int { if !idj.triesLeft.Valid { return nil } t := int(idj.triesLeft.Int64) return &t } func (idj *idJob) waitRetryPointer() *time.Duration { if idj.waitRetry.Status != pgtype.Present { return nil } d := new(time.Duration) if err := idj.waitRetry.AssignTo(d); err != nil { log.Errorf("converting waitRetry failed: %v\n", err) return nil } return d } func (q *importQueue) lockDependencies(jc JobCreator) { deps := jc.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { q.usedDeps[d] = runExclusive } for _, d := range deps[1] { q.usedDeps[d]++ } } func (q *importQueue) unlockDependencies(jc JobCreator) { deps := jc.Depends() q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for _, d := range deps[0] { q.usedDeps[d] = 0 } for _, d := range deps[1] { q.usedDeps[d]-- } } func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[kind] } func (q *importQueue) addJob( kind JobKind, due time.Time, triesLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, sync bool, ) (int64, chan struct{}, error) { var id int64 if due.IsZero() { due = time.Now() } due = due.UTC() var tl sql.NullInt64 if triesLeft != nil { var many int64 if *triesLeft > hardMaxTries || *triesLeft < 0 { many = hardMaxTries } else { many = int64(*triesLeft) } tl = sql.NullInt64{Int64: many, Valid: true} } var wr pgtype.Interval if waitRetry != nil { var howLong time.Duration if minWaitRetry > *waitRetry { howLong = minWaitRetry } else { howLong = *waitRetry } if err := wr.Set(howLong); err != nil { return 0, nil, err } } else { wr = pgtype.Interval{Status: pgtype.Null} } errCh := make(chan error) var done chan struct{} q.cmdCh <- func(q *importQueue) { ctx := context.Background() errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error { err := conn.QueryRowContext( ctx, insertJobSQL, string(kind), due, tl, &wr, user, sendEmail, data).Scan(&id) if err == nil && sync { log.Infof("register wait for %d\n", id) done = make(chan struct{}) q.waiting[id] = done } return err }) } return id, done, <-errCh } // AddJob adds a job to the global import queue to be executed // as soon as possible after due. // This is gone in a separate Go routine // so this will not block. func AddJob( kind JobKind, due time.Time, triesLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { id, _, err := iqueue.addJob( kind, due, triesLeft, waitRetry, user, sendEmail, data, false) return id, err } const ( isPendingSQL = ` SELECT state = 'pending'::import_state, kind FROM import.imports WHERE id = $1` selectUserSQL = ` SELECT username from import.imports WHERE ID = $1` selectUserKindSQL = ` SELECT username, kind from import.imports WHERE ID = $1` reviewSQL = ` UPDATE import.imports SET state = $1::import_state, changed = CURRENT_TIMESTAMP, signer = $2 WHERE id = $3` deleteImportDataSQL = `SELECT import.del_import($1)` deleteImportTrackSQL = ` DELETE FROM import.track_imports WHERE import_id = $1` ) func (q *importQueue) decideImportTx( ctx context.Context, tx *sql.Tx, id int64, accepted bool, reviewer string, ) (chan struct{}, error) { var ( pending bool kind string ) switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { case err == sql.ErrNoRows: return nil, fmt.Errorf("cannot find import #%d", id) case err != nil: return nil, err case !pending: return nil, fmt.Errorf("#%d is not pending", id) } jc := q.jobCreator(JobKind(kind)) if jc == nil { return nil, fmt.Errorf("no job creator for kind '%s'", kind) } r := &reviewedJob{ ID: id, Accepted: accepted, } serialized, err := common.ToJSONString(r) if err != nil { return nil, err } // Try a little harder to persist the decision. tries := reviewJobRetries wait := reviewJobWait rID, done, err := q.addJob( JobKind(kind+ReviewJobSuffix), time.Now(), &tries, &wait, reviewer, false, serialized, true) if err != nil { return nil, err } log.Infof("add review job %d\n", rID) _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) if err != nil && done != nil { go func() { q.cmdCh <- func(q *importQueue) { delete(q.waiting, rID) } }() done = nil } return done, err } func (q *importQueue) decideImport( ctx context.Context, id int64, accepted bool, reviewer string, ) error { if ctx == nil { ctx = context.Background() } var done chan struct{} if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer) if err == nil { err = tx.Commit() } return err }); err != nil { return err } _, retry := <-done if retry { return ErrRetrying } return nil } // DecideImport decides if a given import is accepted or not. func DecideImport( ctx context.Context, id int64, accepted bool, reviewer string, ) error { return iqueue.decideImport(ctx, id, accepted, reviewer) } func (q *importQueue) All(fn func(JobKind, JobCreator)) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() for k, v := range q.creators { fn(k, v) } } // All reports all configured job creators and there kind // to the given function. func All(fn func(JobKind, JobCreator)) { iqueue.All(fn) } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...any) { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Errorf("logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...any) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...any) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...any) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() (err error) { defer func() { if p := recover(); p != nil { err = fmt.Errorf("%v: %s", p, string(debug.Stack())) } }() return fn() } } func reEnqueueRunning() error { ctx := context.Background() return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) return err }) } func (q *importQueue) fetchJob() (*idJob, error) { var which []string q.creatorsMu.Lock() nextCreator: for kind, jc := range q.creators { deps := jc.Depends() for _, d := range deps[0] { if q.usedDeps[d] != 0 { continue nextCreator } } for _, d := range deps[1] { if q.usedDeps[d] == runExclusive { continue nextCreator } } which = append(which, string(kind)) } q.creatorsMu.Unlock() if len(which) == 0 { return nil, sql.ErrNoRows } var kinds pgtype.TextArray if err := kinds.Set(which); err != nil { return nil, err } var ji idJob ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, &ji.triesLeft, &ji.waitRetry, &ji.user, &ji.sendEmail, &ji.data, ); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { if err = tx.Commit(); err != nil { return err } } // Clip repetition back to allowd values. if ji.waitRetry.Status == pgtype.Present { var d time.Duration ji.waitRetry.AssignTo(&d) if d < minWaitRetry { ji.waitRetry.Set(minWaitRetry) } } if ji.triesLeft.Valid { if ji.triesLeft.Int64 < 0 || ji.triesLeft.Int64 > hardMaxTries { ji.triesLeft.Int64 = hardMaxTries } } return nil }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { // As it is important to keep the persistent model // in sync with the in-memory model try harder to store // the state. const maxTries = 10 var sleep = time.Second for try := 1; ; try++ { var err error if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { return err } log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n", try, maxTries, err, sleep) time.Sleep(sleep) if sleep < time.Minute { if sleep *= 2; sleep > time.Minute { sleep = time.Minute } } } } func updateStateSummary( ctx context.Context, id int64, state string, summary any, ) error { var s sql.NullString if summary != nil { var b strings.Builder if err := json.NewEncoder(&b).Encode(summary); err != nil { return err } s = sql.NullString{String: b.String(), Valid: true} } return tryHardToStoreState(ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } func deleteJob(ctx context.Context, id int64) error { return tryHardToStoreState(ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, deleteJobSQL, id) return err }) } func errorAndFail(id int64, format string, args ...any) error { ctx := context.Background() return tryHardToStoreState(ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) } func (q *importQueue) importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. if err := reEnqueueRunning(); err != nil { log.Errorf("re-enqueuing failed: %v", err) } for { var idj *idJob var err error for { if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { log.Errorf("error: db: %v\n", err) } if idj != nil { break } select { case cmd := <-q.cmdCh: cmd(q) case <-time.After(pollDuration): } } log.Infof("starting import #%d\n", idj.id) jc := q.jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } // Lock dependencies. q.lockDependencies(jc) go func(jc JobCreator, idj *idJob) { var retry bool defer func() { // Unlock the dependencies. q.unlockDependencies(jc) // Unlock waiting. q.cmdCh <- func(q *importQueue) { if w := q.waiting[idj.id]; w != nil { log.Infof("unlock waiting %d\n", idj.id) if retry { w <- struct{}{} } else { close(w) } delete(q.waiting, idj.id) } } }() job := jc.Create() if err := common.FromJSONString(idj.data, job); err != nil { errorAndFail(idj.id, "failed to create job for import #%d: %v", idj.id, err) return } var feedback Feedback if fc, ok := job.(FeedbackJob); ok { feedback = fc.CreateFeedback(idj.id) } else { feedback = logFeedback(idj.id) } ctx := context.Background() var summary any errDo := survive(func() error { return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { var err error summary, err = job.Do(ctx, idj.id, conn, feedback) return err }) })() var unchanged bool if v, ok := errDo.(UnchangedError); ok { feedback.Info("unchanged: %s", v.Error()) unchanged = true } else if errDo != nil { feedback.Error("error in import: %v %v", idj, pgxutils.ReadableError{Err: errDo}) retry = idj.nextRetry(feedback) } var errCleanup error if !retry { // cleanup debris if errCleanup = survive(job.CleanUp)(); errCleanup != nil { feedback.Error("error cleanup: %v %v", idj, errCleanup) } } var remove bool if remover, ok := jc.(JobRemover); ok { remove = remover.RemoveJob() } var state string switch { case unchanged: state = "unchanged" case errDo != nil || errCleanup != nil: state = "failed" case jc.AutoAccept(): state = "accepted" default: state = "pending" } if !remove { if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Errorf("setting state of job %d failed: %v\n", idj.id, err) } log.Infof("info: import #%d finished: %s\n", idj.id, state) } if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } if retry { nid, _, err := q.addJob( idj.kind, idj.nextDue(), idj.triesLeftPointer(), idj.waitRetryPointer(), idj.user, idj.sendEmail, idj.data, false) if err != nil { log.Errorf("retry enqueue failed: %v\n", err) } else { log.Infof("re-enqueued job with id %d\n", nid) } } if remove { if err := deleteJob(ctx, idj.id); err != nil { log.Errorf("deleting job %d failed: %v\n", idj.id, err) } } }(jc, idj) } }