Mercurial > gemma
view pkg/imports/queue.go @ 1068:c933665b0193
merge
author | Bernhard Reiter <bernhard@intevation.de> |
---|---|
date | Fri, 26 Oct 2018 09:36:13 +0200 |
parents | a244b18cb916 |
children | a5069da2f0b7 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "fmt" "log" "runtime/debug" "sync" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/config" ) type ( Feedback interface { Info(fmt string, args ...interface{}) Warn(fmt string, args ...interface{}) Error(fmt string, args ...interface{}) } Job interface { Do(*sql.Conn, Feedback) error CleanUp() error } JobKind string JobCreator func(kind JobKind, data string) (Job, error) idJob struct { id int64 kind JobKind user string data string } ) const pollDuration = time.Second * 10 var ( signalChan = make(chan struct{}) creatorsMu sync.Mutex creators = map[JobKind]JobCreator{} ) const ( queueUser = "sys_admin" reEnqueueRunningSQL = ` UPDATE waterway.imports SET state = 'queued'::waterway.import_state WHERE state = 'running'::waterway.import_state ` insertJobSQL = ` INSERT INTO waterway.imports ( kind, username, data ) VALUES ( $1, $2, $3 ) RETURNING id` selectJobSQL = ` SELECT id, kind, username, data FROM waterway.imports WHERE state = 'queued'::waterway.import_state AND enqueued IN ( SELECT min(enqueued) FROM waterway.imports WHERE state = 'queued'::waterway.import_state ) LIMIT 1 ` updateStateSQL = ` UPDATE waterway.imports SET state = $1::waterway.import_state WHERE id = $2 ` logMessageSQL = ` INSERT INTO waterway.import_logs ( import_id, kind, msg ) VALUES ( $1, $2::waterway.log_type, $3 )` ) func init() { go importLoop() } func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) creatorsMu.Lock() defer creatorsMu.Unlock() creators[kind] = jc } func jobCreator(kind JobKind) JobCreator { creatorsMu.Lock() defer creatorsMu.Unlock() return creators[kind] } func AddJob(kind JobKind, user, data string) (int64, error) { ctx := context.Background() var id int64 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) }) if err == nil { select { case signalChan <- struct{}{}: default: } } return id, err } type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext( ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) return err }) if err != nil { log.Printf("logging failed: %v\n", err) } } func (lf logFeedback) Info(format string, args ...interface{}) { lf.log("info", format, args...) } func (lf logFeedback) Warn(format string, args ...interface{}) { lf.log("warn", format, args...) } func (lf logFeedback) Error(format string, args ...interface{}) { lf.log("error", format, args...) } func survive(fn func() error) func() error { return func() error { errCh := make(chan error) go func() { defer func() { if err := recover(); err != nil { errCh <- fmt.Errorf("%v: %s", err, string(debug.Stack())) } }() errCh <- fn() }() return <-errCh } } func reEnqueueRunning() error { ctx := context.Background() return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) return err }) } func fetchJob() (*idJob, error) { var ji idJob ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { return err } _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) if err == nil { err = tx.Commit() } return err }) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return &ji, nil } func updateState(id int64, state string) error { ctx := context.Background() return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { _, err := conn.ExecContext(ctx, updateStateSQL, state, id) return err }) } func errorAndFail(id int64, format string, args ...interface{}) error { ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() _, err = conn.ExecContext( ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) if err != nil { return err } _, err = conn.ExecContext( ctx, updateStateSQL, "failed", id) if err == nil { err = tx.Commit() } return err }) return err } func importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. if err := reEnqueueRunning(); err != nil { log.Printf("re-enqueuing failed: %v", err) } for { var idj *idJob var err error for { if idj, err = fetchJob(); err != nil { log.Printf("db error: %v\n", err) } if idj != nil { break } select { case <-signalChan: case <-time.After(pollDuration): } } log.Printf("starting import #%d\n", idj.id) jc := jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } job, err := jc(idj.kind, idj.data) if err != nil { errorAndFail(idj.id, "failed to create job for import #%d: %v", idj.id, err) continue } feedback := logFeedback(idj.id) feedback.Info("import #%d started\n", idj.id) errDo := survive(func() error { return auth.RunAs(idj.user, context.Background(), func(conn *sql.Conn) error { return job.Do(conn, feedback) }) })() if errDo != nil { feedback.Error("error do: %v\n", errDo) } errCleanup := survive(job.CleanUp)() if errCleanup != nil { feedback.Error("error cleanup: %v\n", errCleanup) } var state string if errDo != nil || errCleanup != nil { state = "failed" } else { state = "successful" } if err := updateState(idj.id, state); err != nil { log.Printf("setting state of job %d failed: %v\n", idj.id, err) } log.Printf("import #%d finished: %s\n", idj.id, state) } }