Mercurial > gemma
view pkg/imports/queue.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | 59055c8301df |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "fmt" "log" "runtime/debug" "strings" "sync" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/config" ) type ( // Feedback is passed to the Do method of a Job to log // informations, warnings or errors. Feedback interface { // Info logs informations. Info(fmt string, args ...interface{}) // Warn logs warnings. Warn(fmt string, args ...interface{}) // Error logs errors. Error(fmt string, args ...interface{}) } // UnchangedError may be issued by Do of a Job to indicate // That the database has not changed. UnchangedError string // Job is the central abstraction of an import job // run by the import queue. Job interface { // Do is called to do the actual import. // Bind transactions to ctx and conn, please- // id is the number of the import job. // feedback can be used to log the import process. // If no error is return the import is assumed to // be successfull. The non-error return value is // serialized as a JSON string into the database as // a summary to the import to be used by the review process. Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) // CleanUp is called to clean up ressources hold by the import. // It is called whether the import succeeded or not. CleanUp() error } // JobKind is the type of an import. // Choose a unique name for every import. JobKind string // JobCreator is used to bring a job to life as it is stored // in pure meta-data form to the database. JobCreator interface { // Description is the long name of the import. Description() string // Create build the actual job. // kind is the name of the import type. // data is a free form string to pass arguments to the creation // process. This is useful to tell e.g. where to find data // in the file system to be used for importing. Create(kind JobKind, data string) (Job, error) // Depends returns a list of ressources locked by this type of import. // Imports are run concurrently if they have disjoint sets // of dependencies. Depends() []string // StageDone is called if an import is positively reviewed // (state = accepted). This can be used to finalize the imported // data to move it e.g from the staging area. StageDone(context.Context, *sql.Tx, int64) error // AutoAccept indicates that imports of this kind // don't need a review. AutoAccept() bool } idJob struct { id int64 kind JobKind user string waitRetry pgtype.Interval trysLeft sql.NullInt64 sendEmail bool data string } ) const pollDuration = time.Second * 10 type importQueue struct { signalChan chan struct{} creatorsMu sync.Mutex creators map[JobKind]JobCreator usedDeps map[string]struct{} } var iqueue = importQueue{ signalChan: make(chan struct{}), creators: map[JobKind]JobCreator{}, usedDeps: map[string]struct{}{}, } var ( // ImportStateNames is a list of the states a job can be in. ImportStateNames = []string{ "queued", "running", "failed", "unchanged", "pending", "accepted", "declined", } ) const ( queueUser = "sys_admin" reEnqueueRunningSQL = ` UPDATE import.imports SET state = 'queued'::import_state WHERE state = 'running'::import_state` insertJobSQL = ` INSERT INTO import.imports ( kind, due, trys_left, retry_wait, username, send_email, data ) VALUES ( $1, COALESCE($2, CURRENT_TIMESTAMP), $3, $4, $5, $6, $7 ) RETURNING id` selectJobSQL = ` SELECT id, kind, trys_left, retry_wait, username, send_email, data FROM import.imports WHERE due <= CURRENT_TIMESTAMP + interval '5 seconds' AND state = 'queued'::import_state AND enqueued IN ( SELECT min(enqueued) FROM import.imports WHERE state = 'queued'::import_state AND kind = ANY($1)) LIMIT 1` updateStateSQL = ` UPDATE import.imports SET state = $1::import_state WHERE id = $2` updateStateSummarySQL = ` UPDATE import.imports SET state = $1::import_state, summary = $2 WHERE id = $3` logMessageSQL = ` INSERT INTO import.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::log_type, $3 )` ) func init() { go iqueue.importLoop() } // Error makes UnchangedError an error. func (ue UnchangedError) Error() string { return string(ue) } func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() q.creators[kind] = jc } // FindJobCreator looks up a JobCreator in the global import queue. func FindJobCreator(kind JobKind) JobCreator { return iqueue.jobCreator(kind) } // ImportKindNames is a list of the names of the imports the // global import queue supports. func ImportKindNames() []string { return iqueue.importKindNames() } // HasImportKindName checks if the import queue supports a given kind. func HasImportKindName(kind string) bool { return iqueue.hasImportKindName(kind) } // func (q *importQueue) hasImportKindName(kind string) bool { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[JobKind(kind)] != nil } // RegisterJobCreator adds a JobCreator to the global import queue. // This a good candidate to be called in a init function for // a particular JobCreator. func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) iqueue.registerJobCreator(kind, jc) } func (q *importQueue) importKindNames() []string { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() names := make([]string, len(q.creators)) var i int for kind := range q.creators { names[i] = string(kind) i++ } // XXX: Consider using sort.Strings to make output deterministic. return names } func (idj *idJob) nextRetry(feedback Feedback) bool { switch { case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: return false case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: return true case idj.trysLeft.Valid: if idj.trysLeft.Int64 < 1 { feedback.Warn("import should be retried, but no retrys left") } else { idj.trysLeft.Int64-- feedback.Info("import failed but will be retried") return true } } return false } func (idj *idJob) nextDue() time.Time { now := time.Now() if idj.waitRetry.Status == pgtype.Present { var d time.Duration if err := idj.waitRetry.AssignTo(&d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) } else { now = now.Add(d) } } return now } func (idj *idJob) trysLeftPointer() *int { if !idj.trysLeft.Valid { return nil } t := int(idj.trysLeft.Int64) return &t } func (idj *idJob) waitRetryPointer() *time.Duration { if idj.waitRetry.Status != pgtype.Present { return nil } d := new(time.Duration) if err := idj.waitRetry.AssignTo(d); err != nil { log.Printf("error: converting waitRetry failed: %v\n", err) return nil } return d } func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() return q.creators[kind] } func (q *importQueue) addJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { var id int64 if due.IsZero() { due = time.Now() } var tl sql.NullInt64 if trysLeft != nil { tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} } var wr pgtype.Interval if waitRetry != nil { if err := wr.Set(*waitRetry); err != nil { return 0, err } } else { wr = pgtype.Interval{Status: pgtype.Null} } ctx := context.Background() err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, insertJobSQL, string(kind), due, tl, &wr, user, sendEmail, data).Scan(&id) }) if err == nil { select { case q.signalChan <- struct{}{}: default: } } return id, err } // AddJob adds a job to the global import queue to be executed // as soon as possible after due. // This is gone in a separate Go routine // so this will not block. func AddJob( kind JobKind, due time.Time, trysLeft *int, waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { return iqueue.addJob( kind, due, trysLeft, waitRetry, user, sendEmail, data) } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Printf("error: logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...interface{}) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...interface{}) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...interface{}) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() (err error) { defer func() { if p := recover(); p != nil { err = fmt.Errorf("%v: %s", p, string(debug.Stack())) } }() return fn() } } func reEnqueueRunning() error { ctx := context.Background() return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) return err }) } func (q *importQueue) fetchJob() (*idJob, error) { var which []string q.creatorsMu.Lock() nextCreator: for kind, jc := range q.creators { for _, d := range jc.Depends() { if _, found := q.usedDeps[d]; found { continue nextCreator } } which = append(which, string(kind)) } q.creatorsMu.Unlock() if len(which) == 0 { return nil, sql.ErrNoRows } var kinds pgtype.TextArray if err := kinds.Set(which); err != nil { return nil, err } var ji idJob ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, &ji.trysLeft, &ji.waitRetry, &ji.user, &ji.sendEmail, &ji.data, ); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { err = tx.Commit() } return err }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func updateStateSummary( ctx context.Context, id int64, state string, summary interface{}, ) error { var s sql.NullString if summary != nil { var b strings.Builder if err := json.NewEncoder(&b).Encode(summary); err != nil { return err } s = sql.NullString{String: b.String(), Valid: true} } return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) return err } func (q *importQueue) importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. if err := reEnqueueRunning(); err != nil { log.Printf("error: re-enqueuing failed: %v", err) } for { var idj *idJob var err error for { if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { log.Printf("error: db: %v\n", err) } if idj != nil { break } select { case <-q.signalChan: case <-time.After(pollDuration): } } log.Printf("info: starting import #%d\n", idj.id) jc := q.jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } // Lock dependencies. q.creatorsMu.Lock() for _, d := range jc.Depends() { q.usedDeps[d] = struct{}{} } q.creatorsMu.Unlock() go func(jc JobCreator, idj *idJob) { // Unlock the dependencies. defer func() { q.creatorsMu.Lock() for _, d := range jc.Depends() { delete(q.usedDeps, d) } q.creatorsMu.Unlock() select { case q.signalChan <- struct{}{}: default: } }() job, err := jc.Create(idj.kind, idj.data) if err != nil { errorAndFail(idj.id, "failed to create job for import #%d: %v", idj.id, err) return } feedback := logFeedback(idj.id) feedback.Info("import #%d started", idj.id) ctx := context.Background() var summary interface{} errDo := survive(func() error { return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { var err error summary, err = job.Do(ctx, idj.id, conn, feedback) return err }) })() var unchanged, retry bool if v, ok := errDo.(UnchangedError); ok { feedback.Info("unchanged: %s", v.Error()) unchanged = true } else if errDo != nil { feedback.Error("error in import: %v", errDo) retry = idj.nextRetry(feedback) } var errCleanup error if retry { // cleanup debris if errCleanup = survive(job.CleanUp)(); errCleanup != nil { feedback.Error("error cleanup: %v", errCleanup) } } var state string switch { case unchanged: state = "unchanged" case errDo != nil || errCleanup != nil: state = "failed" case jc.AutoAccept(): state = "accepted" default: state = "pending" } if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) } log.Printf("info: import #%d finished: %s\n", idj.id, state) if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } if retry { nid, err := q.addJob( idj.kind, idj.nextDue(), idj.trysLeftPointer(), idj.waitRetryPointer(), idj.user, idj.sendEmail, idj.data) if err != nil { log.Printf("error: retry enqueue failed: %v\n", err) } else { log.Printf("info: re-enqueued job with id %d\n", nid) } } }(jc, idj) } }