# HG changeset patch # User Sascha L. Teichmann # Date 1541688628 -3600 # Node ID a5069da2f0b713e799c24c4588436f1e56bfa0c0 # Parent 19a04b150b6cde1de8e65037e6128aacfefd0c07 Independent imports in terms of affected tables/dependencies are now run concurrently. diff -r 19a04b150b6c -r a5069da2f0b7 pkg/imports/queue.go --- a/pkg/imports/queue.go Wed Nov 07 23:03:59 2018 +0100 +++ b/pkg/imports/queue.go Thu Nov 08 15:50:28 2018 +0100 @@ -22,6 +22,8 @@ "sync" "time" + "github.com/jackc/pgx/pgtype" + "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/config" ) @@ -40,7 +42,10 @@ JobKind string - JobCreator func(kind JobKind, data string) (Job, error) + JobCreator interface { + Create(kind JobKind, data string) (Job, error) + Depends() []string + } idJob struct { id int64 @@ -52,12 +57,18 @@ const pollDuration = time.Second * 10 -var ( - signalChan = make(chan struct{}) +type importQueue struct { + signalChan chan struct{} + creatorsMu sync.Mutex + creators map[JobKind]JobCreator + usedDeps map[string]struct{} +} - creatorsMu sync.Mutex - creators = map[JobKind]JobCreator{} -) +var iqueue = importQueue{ + signalChan: make(chan struct{}), + creators: map[JobKind]JobCreator{}, + usedDeps: map[string]struct{}{}, +} const ( queueUser = "sys_admin" @@ -88,7 +99,8 @@ WHERE state = 'queued'::waterway.import_state AND enqueued IN ( SELECT min(enqueued) FROM waterway.imports - WHERE state = 'queued'::waterway.import_state + WHERE state = 'queued'::waterway.import_state AND + kind = ANY($1) ) LIMIT 1 ` @@ -109,23 +121,27 @@ ) func init() { - go importLoop() + go iqueue.importLoop() +} + +func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + q.creators[kind] = jc } func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) - creatorsMu.Lock() - defer creatorsMu.Unlock() - creators[kind] = jc + iqueue.registerJobCreator(kind, jc) } -func jobCreator(kind JobKind) JobCreator { - creatorsMu.Lock() - defer creatorsMu.Unlock() - return creators[kind] +func (q *importQueue) jobCreator(kind JobKind) JobCreator { + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + return q.creators[kind] } -func AddJob(kind JobKind, user, data string) (int64, error) { +func (q *importQueue) addJob(kind JobKind, user, data string) (int64, error) { ctx := context.Background() var id int64 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { @@ -133,13 +149,17 @@ }) if err == nil { select { - case signalChan <- struct{}{}: + case q.signalChan <- struct{}{}: default: } } return id, err } +func AddJob(kind JobKind, user, data string) (int64, error) { + return iqueue.addJob(kind, user, data) +} + type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { @@ -167,18 +187,13 @@ } func survive(fn func() error) func() error { - return func() error { - errCh := make(chan error) - go func() { - defer func() { - if err := recover(); err != nil { - errCh <- fmt.Errorf("%v: %s", - err, string(debug.Stack())) - } - }() - errCh <- fn() + return func() (err error) { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("%v: %s", p, string(debug.Stack())) + } }() - return <-errCh + return fn() } } @@ -190,7 +205,31 @@ }) } -func fetchJob() (*idJob, error) { +func (q *importQueue) fetchJob() (*idJob, error) { + + var which []string + + q.creatorsMu.Lock() +nextCreator: + for kind, jc := range q.creators { + for _, d := range jc.Depends() { + if _, found := q.usedDeps[d]; found { + continue nextCreator + } + } + which = append(which, string(kind)) + } + q.creatorsMu.Unlock() + + if len(which) == 0 { + return nil, sql.ErrNoRows + } + + var kinds pgtype.TextArray + if err := kinds.Set(which); err != nil { + return nil, err + } + var ji idJob ctx := context.Background() err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { @@ -199,7 +238,7 @@ return err } defer tx.Rollback() - if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( + if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { return err } @@ -249,7 +288,7 @@ return err } -func importLoop() { +func (q *importQueue) importLoop() { config.WaitReady() // re-enqueue the jobs that are in state running. // They where in progess when the server went down. @@ -262,58 +301,81 @@ var err error for { - if idj, err = fetchJob(); err != nil { + if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { log.Printf("db error: %v\n", err) } if idj != nil { break } select { - case <-signalChan: + case <-q.signalChan: case <-time.After(pollDuration): } } log.Printf("starting import #%d\n", idj.id) - jc := jobCreator(idj.kind) + jc := q.jobCreator(idj.kind) if jc == nil { errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) continue } - job, err := jc(idj.kind, idj.data) - if err != nil { - errorAndFail(idj.id, "failed to create job for import #%d: %v", - idj.id, err) - continue + // Lock dependencies. + q.creatorsMu.Lock() + for _, d := range jc.Depends() { + q.usedDeps[d] = struct{}{} } + q.creatorsMu.Unlock() - feedback := logFeedback(idj.id) + go func(jc JobCreator, idj *idJob) { - feedback.Info("import #%d started\n", idj.id) + // Unlock the dependencies. + defer func() { + q.creatorsMu.Lock() + for _, d := range jc.Depends() { + delete(q.usedDeps, d) + } + q.creatorsMu.Unlock() + select { + case q.signalChan <- struct{}{}: + default: + } + }() - errDo := survive(func() error { - return auth.RunAs(idj.user, context.Background(), - func(conn *sql.Conn) error { return job.Do(conn, feedback) }) - })() - if errDo != nil { - feedback.Error("error do: %v\n", errDo) - } - errCleanup := survive(job.CleanUp)() - if errCleanup != nil { - feedback.Error("error cleanup: %v\n", errCleanup) - } + job, err := jc.Create(idj.kind, idj.data) + if err != nil { + errorAndFail(idj.id, "failed to create job for import #%d: %v", + idj.id, err) + return + } + + feedback := logFeedback(idj.id) + + feedback.Info("import #%d started\n", idj.id) - var state string - if errDo != nil || errCleanup != nil { - state = "failed" - } else { - state = "successful" - } - if err := updateState(idj.id, state); err != nil { - log.Printf("setting state of job %d failed: %v\n", idj.id, err) - } - log.Printf("import #%d finished: %s\n", idj.id, state) + errDo := survive(func() error { + return auth.RunAs(idj.user, context.Background(), + func(conn *sql.Conn) error { return job.Do(conn, feedback) }) + })() + if errDo != nil { + feedback.Error("error do: %v\n", errDo) + } + errCleanup := survive(job.CleanUp)() + if errCleanup != nil { + feedback.Error("error cleanup: %v\n", errCleanup) + } + + var state string + if errDo != nil || errCleanup != nil { + state = "failed" + } else { + state = "successful" + } + if err := updateState(idj.id, state); err != nil { + log.Printf("setting state of job %d failed: %v\n", idj.id, err) + } + log.Printf("import #%d finished: %s\n", idj.id, state) + }(jc, idj) } } diff -r 19a04b150b6c -r a5069da2f0b7 pkg/imports/sr.go --- a/pkg/imports/sr.go Wed Nov 07 23:03:59 2018 +0100 +++ b/pkg/imports/sr.go Thu Nov 08 15:50:28 2018 +0100 @@ -61,10 +61,22 @@ const SRJobKind JobKind = "sr" +type srJobCreator struct{} + +func (srJobCreator) Create(_ JobKind, data string) (Job, error) { + return SoundingResult(data), nil +} + +func (srJobCreator) Depends() []string { + return []string{ + "waterway.sounding_results", + "waterway.sounding_results_contour_lines", + "waterway.bottlenecks", + } +} + func init() { - RegisterJobCreator(SRJobKind, func(_ JobKind, data string) (Job, error) { - return SoundingResult(data), nil - }) + RegisterJobCreator(SRJobKind, srJobCreator{}) } const (