changeset 1136:a5069da2f0b7

Independent imports in terms of affected tables/dependencies are now run concurrently.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 08 Nov 2018 15:50:28 +0100
parents 19a04b150b6c
children 2fb6c0a6ec8a
files pkg/imports/queue.go pkg/imports/sr.go
diffstat 2 files changed, 139 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/queue.go	Wed Nov 07 23:03:59 2018 +0100
+++ b/pkg/imports/queue.go	Thu Nov 08 15:50:28 2018 +0100
@@ -22,6 +22,8 @@
 	"sync"
 	"time"
 
+	"github.com/jackc/pgx/pgtype"
+
 	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/config"
 )
@@ -40,7 +42,10 @@
 
 	JobKind string
 
-	JobCreator func(kind JobKind, data string) (Job, error)
+	JobCreator interface {
+		Create(kind JobKind, data string) (Job, error)
+		Depends() []string
+	}
 
 	idJob struct {
 		id   int64
@@ -52,12 +57,18 @@
 
 const pollDuration = time.Second * 10
 
-var (
-	signalChan = make(chan struct{})
+type importQueue struct {
+	signalChan chan struct{}
+	creatorsMu sync.Mutex
+	creators   map[JobKind]JobCreator
+	usedDeps   map[string]struct{}
+}
 
-	creatorsMu sync.Mutex
-	creators   = map[JobKind]JobCreator{}
-)
+var iqueue = importQueue{
+	signalChan: make(chan struct{}),
+	creators:   map[JobKind]JobCreator{},
+	usedDeps:   map[string]struct{}{},
+}
 
 const (
 	queueUser = "sys_admin"
@@ -88,7 +99,8 @@
 WHERE state = 'queued'::waterway.import_state AND enqueued IN (
   SELECT min(enqueued)
   FROM waterway.imports 
-  WHERE state = 'queued'::waterway.import_state
+  WHERE state = 'queued'::waterway.import_state AND
+  kind = ANY($1)
 )
 LIMIT 1
 `
@@ -109,23 +121,27 @@
 )
 
 func init() {
-	go importLoop()
+	go iqueue.importLoop()
+}
+
+func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
+	q.creatorsMu.Lock()
+	defer q.creatorsMu.Unlock()
+	q.creators[kind] = jc
 }
 
 func RegisterJobCreator(kind JobKind, jc JobCreator) {
 	log.Printf("info: register import job creator for kind '%s'\n", kind)
-	creatorsMu.Lock()
-	defer creatorsMu.Unlock()
-	creators[kind] = jc
+	iqueue.registerJobCreator(kind, jc)
 }
 
-func jobCreator(kind JobKind) JobCreator {
-	creatorsMu.Lock()
-	defer creatorsMu.Unlock()
-	return creators[kind]
+func (q *importQueue) jobCreator(kind JobKind) JobCreator {
+	q.creatorsMu.Lock()
+	defer q.creatorsMu.Unlock()
+	return q.creators[kind]
 }
 
-func AddJob(kind JobKind, user, data string) (int64, error) {
+func (q *importQueue) addJob(kind JobKind, user, data string) (int64, error) {
 	ctx := context.Background()
 	var id int64
 	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
@@ -133,13 +149,17 @@
 	})
 	if err == nil {
 		select {
-		case signalChan <- struct{}{}:
+		case q.signalChan <- struct{}{}:
 		default:
 		}
 	}
 	return id, err
 }
 
+func AddJob(kind JobKind, user, data string) (int64, error) {
+	return iqueue.addJob(kind, user, data)
+}
+
 type logFeedback int64
 
 func (lf logFeedback) log(kind, format string, args ...interface{}) {
@@ -167,18 +187,13 @@
 }
 
 func survive(fn func() error) func() error {
-	return func() error {
-		errCh := make(chan error)
-		go func() {
-			defer func() {
-				if err := recover(); err != nil {
-					errCh <- fmt.Errorf("%v: %s",
-						err, string(debug.Stack()))
-				}
-			}()
-			errCh <- fn()
+	return func() (err error) {
+		defer func() {
+			if p := recover(); p != nil {
+				err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
+			}
 		}()
-		return <-errCh
+		return fn()
 	}
 }
 
@@ -190,7 +205,31 @@
 	})
 }
 
-func fetchJob() (*idJob, error) {
+func (q *importQueue) fetchJob() (*idJob, error) {
+
+	var which []string
+
+	q.creatorsMu.Lock()
+nextCreator:
+	for kind, jc := range q.creators {
+		for _, d := range jc.Depends() {
+			if _, found := q.usedDeps[d]; found {
+				continue nextCreator
+			}
+		}
+		which = append(which, string(kind))
+	}
+	q.creatorsMu.Unlock()
+
+	if len(which) == 0 {
+		return nil, sql.ErrNoRows
+	}
+
+	var kinds pgtype.TextArray
+	if err := kinds.Set(which); err != nil {
+		return nil, err
+	}
+
 	var ji idJob
 	ctx := context.Background()
 	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
@@ -199,7 +238,7 @@
 			return err
 		}
 		defer tx.Rollback()
-		if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
+		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
 			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
 			return err
 		}
@@ -249,7 +288,7 @@
 	return err
 }
 
-func importLoop() {
+func (q *importQueue) importLoop() {
 	config.WaitReady()
 	// re-enqueue the jobs that are in state running.
 	// They where in progess when the server went down.
@@ -262,58 +301,81 @@
 		var err error
 
 		for {
-			if idj, err = fetchJob(); err != nil {
+			if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
 				log.Printf("db error: %v\n", err)
 			}
 			if idj != nil {
 				break
 			}
 			select {
-			case <-signalChan:
+			case <-q.signalChan:
 			case <-time.After(pollDuration):
 			}
 		}
 
 		log.Printf("starting import #%d\n", idj.id)
 
-		jc := jobCreator(idj.kind)
+		jc := q.jobCreator(idj.kind)
 		if jc == nil {
 			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
 			continue
 		}
 
-		job, err := jc(idj.kind, idj.data)
-		if err != nil {
-			errorAndFail(idj.id, "failed to create job for import #%d: %v",
-				idj.id, err)
-			continue
+		// Lock dependencies.
+		q.creatorsMu.Lock()
+		for _, d := range jc.Depends() {
+			q.usedDeps[d] = struct{}{}
 		}
+		q.creatorsMu.Unlock()
 
-		feedback := logFeedback(idj.id)
+		go func(jc JobCreator, idj *idJob) {
 
-		feedback.Info("import #%d started\n", idj.id)
+			// Unlock the dependencies.
+			defer func() {
+				q.creatorsMu.Lock()
+				for _, d := range jc.Depends() {
+					delete(q.usedDeps, d)
+				}
+				q.creatorsMu.Unlock()
+				select {
+				case q.signalChan <- struct{}{}:
+				default:
+				}
+			}()
 
-		errDo := survive(func() error {
-			return auth.RunAs(idj.user, context.Background(),
-				func(conn *sql.Conn) error { return job.Do(conn, feedback) })
-		})()
-		if errDo != nil {
-			feedback.Error("error do: %v\n", errDo)
-		}
-		errCleanup := survive(job.CleanUp)()
-		if errCleanup != nil {
-			feedback.Error("error cleanup: %v\n", errCleanup)
-		}
+			job, err := jc.Create(idj.kind, idj.data)
+			if err != nil {
+				errorAndFail(idj.id, "failed to create job for import #%d: %v",
+					idj.id, err)
+				return
+			}
+
+			feedback := logFeedback(idj.id)
+
+			feedback.Info("import #%d started\n", idj.id)
 
-		var state string
-		if errDo != nil || errCleanup != nil {
-			state = "failed"
-		} else {
-			state = "successful"
-		}
-		if err := updateState(idj.id, state); err != nil {
-			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
-		}
-		log.Printf("import #%d finished: %s\n", idj.id, state)
+			errDo := survive(func() error {
+				return auth.RunAs(idj.user, context.Background(),
+					func(conn *sql.Conn) error { return job.Do(conn, feedback) })
+			})()
+			if errDo != nil {
+				feedback.Error("error do: %v\n", errDo)
+			}
+			errCleanup := survive(job.CleanUp)()
+			if errCleanup != nil {
+				feedback.Error("error cleanup: %v\n", errCleanup)
+			}
+
+			var state string
+			if errDo != nil || errCleanup != nil {
+				state = "failed"
+			} else {
+				state = "successful"
+			}
+			if err := updateState(idj.id, state); err != nil {
+				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
+			}
+			log.Printf("import #%d finished: %s\n", idj.id, state)
+		}(jc, idj)
 	}
 }
--- a/pkg/imports/sr.go	Wed Nov 07 23:03:59 2018 +0100
+++ b/pkg/imports/sr.go	Thu Nov 08 15:50:28 2018 +0100
@@ -61,10 +61,22 @@
 
 const SRJobKind JobKind = "sr"
 
+type srJobCreator struct{}
+
+func (srJobCreator) Create(_ JobKind, data string) (Job, error) {
+	return SoundingResult(data), nil
+}
+
+func (srJobCreator) Depends() []string {
+	return []string{
+		"waterway.sounding_results",
+		"waterway.sounding_results_contour_lines",
+		"waterway.bottlenecks",
+	}
+}
+
 func init() {
-	RegisterJobCreator(SRJobKind, func(_ JobKind, data string) (Job, error) {
-		return SoundingResult(data), nil
-	})
+	RegisterJobCreator(SRJobKind, srJobCreator{})
 }
 
 const (