Mercurial > gemma
changeset 4680:976aedc195e5
Be more clever when triggering diff calculations.
If a diff calc is started bring it to its end.
Before starting look if it is done and in db notice it as ready.
When a calculation is on its way add to list of waiting calls
to avoid starting a calculation again.
Only do two diff calcs at a time.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 15 Oct 2019 18:25:40 +0200 |
parents | 6c1dd2fbe2ee |
children | 2690933e404c |
files | go.mod go.sum pkg/controllers/diff.go pkg/controllers/routes.go |
diffstat | 4 files changed, 184 insertions(+), 83 deletions(-) [+] |
line wrap: on
line diff
--- a/go.mod Tue Oct 15 17:31:54 2019 +0200 +++ b/go.mod Tue Oct 15 18:25:40 2019 +0200 @@ -30,7 +30,6 @@ go.etcd.io/bbolt v1.3.3 // indirect golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 // indirect golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72 - golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/text v0.3.2 // indirect gonum.org/v1/gonum v0.0.0-20190922162417-bcfb93e04962 gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
--- a/go.sum Tue Oct 15 17:31:54 2019 +0200 +++ b/go.sum Tue Oct 15 18:25:40 2019 +0200 @@ -184,8 +184,6 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
--- a/pkg/controllers/diff.go Tue Oct 15 17:31:54 2019 +0200 +++ b/pkg/controllers/diff.go Tue Oct 15 18:25:40 2019 +0200 @@ -14,15 +14,17 @@ package controllers import ( + "context" "database/sql" + "errors" "fmt" "log" "net/http" + "runtime" "sync" "time" - "golang.org/x/sync/semaphore" - + "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" @@ -88,48 +90,56 @@ ` ) -// Only allow three diffence calculation at once. -// TODO: Make this configurable? -var diffCalculationSemaphore = semaphore.NewWeighted(int64(3)) - -func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) { - - begin := time.Now() - start := begin - - dci := mw.JSONInput(req).(*models.DiffCalculationInput) - - ctx := req.Context() - - conn := mw.JSONConn(req) - - var id int64 - err = conn.QueryRowContext( - ctx, - diffIDSQL, - dci.Bottleneck, - dci.Minuend.Time, - dci.Subtrahend.Time, - ).Scan(&id) - - switch { - case err == sql.ErrNoRows: - // We need to calculate it. - case err != nil: - return - default: - // We already have this diff - jr = mw.JSONResult{ - Result: map[string]int64{"id": id}, - } - return +type ( + diffResult struct { + id int64 + err error } - // DoS counter measure. - if err = diffCalculationSemaphore.Acquire(ctx, 1); err != nil { - return + diffCalculationManager struct { + waiting map[string][]chan diffResult + cmds chan func(*diffCalculationManager) + jobs chan *models.DiffCalculationInput + } +) + +var diffCalcMng = startDiffCalculationManager() + +func diffCalcHash(dci *models.DiffCalculationInput) string { + return dci.Bottleneck + "/" + + dci.Minuend.Format(common.DateFormat) + "/" + + dci.Subtrahend.Format(common.DateFormat) +} + +func startDiffCalculationManager() *diffCalculationManager { + dcm := diffCalculationManager{ + waiting: make(map[string][]chan diffResult), + cmds: make(chan func(*diffCalculationManager)), + jobs: make(chan *models.DiffCalculationInput), } - defer diffCalculationSemaphore.Release(1) + + go dcm.run() + return &dcm +} + +func (dcm *diffCalculationManager) run() { + n := runtime.NumCPU() + if n > 2 { + n = 2 + } + for i := 0; i < n; i++ { + go dcm.calculate() + } + + for cmd := range dcm.cmds { + cmd(dcm) + } +} + +func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) { + + start := time.Now() + begin := start minuendTree, err := octree.FromCache( ctx, conn, @@ -137,17 +147,13 @@ log.Printf("info: loading minuend mesh took %s\n", time.Since(start)) if err != nil { - return + return 0, err } if minuendTree == nil { - err = mw.JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("Cannot find survey for %s/%s.", - dci.Bottleneck, - dci.Minuend.Format(common.DateFormat)), - } - return + return 0, fmt.Errorf("Cannot find survey for %s/%s.", + dci.Bottleneck, + dci.Minuend.Format(common.DateFormat)) } start = time.Now() @@ -158,28 +164,20 @@ log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start)) if err != nil { - return + return 0, err } if subtrahendTree == nil { - err = mw.JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("Cannot find survey for %s/%s.", - dci.Bottleneck, - dci.Subtrahend.Format(common.DateFormat)), - } - return + return 0, fmt.Errorf("Cannot find survey for %s/%s.", + dci.Bottleneck, + dci.Subtrahend.Format(common.DateFormat)) } // We need a slow path implementation for this. epsg := minuendTree.EPSG() if epsg != subtrahendTree.EPSG() { - err = mw.JSONError{ - Code: http.StatusInternalServerError, - Message: "Calculating differences between two different " + - "EPSG code meshes are not supported, yet.", - } - return + return 0, errors.New("Calculating differences between two different " + + "EPSG code meshes are not supported, yet.") } start = time.Now() @@ -232,14 +230,11 @@ switch { case triErr != nil && clipErr != nil: - err = fmt.Errorf("%v %v", triErr, clipErr) - return + return 0, fmt.Errorf("%v %v", triErr, clipErr) case triErr != nil: - err = triErr - return + return 0, triErr case clipErr != nil: - err = clipErr - return + return 0, clipErr } start = time.Now() @@ -262,7 +257,7 @@ // XXX: Maybe we should start this transaction earlier!? var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { - return + return 0, err } defer tx.Rollback() @@ -273,6 +268,7 @@ "morphology_classbreaks_compare") if err != nil { log.Printf("warn: Loading class breaks failed: %v\n", err) + err = nil heights = octree.SampleDiffHeights(tin.Min.Z, tin.Max.Z, contourStep) } else { heights = octree.ExtrapolateClassBreaks(heights, tin.Min.Z, tin.Max.Z) @@ -283,10 +279,12 @@ var isoStmt *sql.Stmt if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil { - return + return 0, err } defer isoStmt.Close() + var id int64 + if err = tx.QueryRowContext( ctx, insertDiffSQL, @@ -294,7 +292,7 @@ dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id); err != nil { - return + return 0, err } heights = common.DedupFloat64s(heights) @@ -315,7 +313,7 @@ wkb, contourTolerance, ); err != nil { - return + return 0, err } } @@ -325,20 +323,125 @@ log.Printf("info: calculating and storing iso lines took %v\n", time.Since(start)) - if err != nil { - return - } - if err = tx.Commit(); err != nil { log.Printf("info: difference calculation failed after %v\n", time.Since(begin)) - return + return 0, err } log.Printf("info: difference calculation succeed after %v\n", time.Since(begin)) - jr = mw.JSONResult{ - Result: map[string]int64{"id": id}, + return id, nil +} + +func (dcm *diffCalculationManager) calculate() { + for dci := range dcm.jobs { + ctx := context.Background() + var id int64 + var err error + err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { + id, err = doDiff(ctx, conn, dci) + return err + }) + dcm.cmds <- func(dcm *diffCalculationManager) { + msg := diffResult{id: id, err: err} + key := diffCalcHash(dci) + list := dcm.waiting[key] + delete(dcm.waiting, key) + for _, ch := range list { + ch <- msg + } + } } +} + +func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult { + ch := make(chan diffResult) + + dcm.cmds <- func(dcm *diffCalculationManager) { + // When already running add to list of waitings + key := diffCalcHash(dci) + if list := dcm.waiting[key]; list != nil { + dcm.waiting[key] = append(list, ch) + return + } + // look if it is in database + var id int64 + ctx := context.Background() + + err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { + return conn.QueryRowContext( + ctx, + diffIDSQL, + dci.Bottleneck, + dci.Minuend.Time, + dci.Subtrahend.Time, + ).Scan(&id) + }) + + switch { + case err == sql.ErrNoRows: + // We need to calculate it. + case err != nil: // error + go func() { ch <- diffResult{err: err} }() + return + default: // we have it + go func() { ch <- diffResult{id: id} }() + return + } + + // add to waiting + dcm.waiting[key] = []chan diffResult{ch} + // feed to workers + dcm.jobs <- dci + } + + return ch +} + +func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) { + dcm.cmds <- func(dcm *diffCalculationManager) { + key := diffCalcHash(dci) + list := dcm.waiting[key] + for i, x := range list { + if x == ch { + if i != len(list)-1 { + copy(list[i:], list[i+1:]) + } + list[len(list)-1] = nil + list = list[:len(list)-1] + if len(list) == 0 { + delete(dcm.waiting, key) + } else { + dcm.waiting[key] = list + } + return + } + } + } +} + +func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) { + + dci := mw.JSONInput(req).(*models.DiffCalculationInput) + + ctx := req.Context() + + resultCh := diffCalcMng.enque(dci) + defer diffCalcMng.unregister(dci, resultCh) + + select { + case res := <-resultCh: + if res.err != nil { + err = res.err + return + } + jr = mw.JSONResult{ + Result: map[string]int64{"id": res.id}, + } + case <-ctx.Done(): + log.Println("request canceled") + } + return }
--- a/pkg/controllers/routes.go Tue Oct 15 17:31:54 2019 +0200 +++ b/pkg/controllers/routes.go Tue Oct 15 18:25:40 2019 +0200 @@ -185,6 +185,7 @@ api.Handle("/diff", any(&mw.JSONHandler{ Input: func(*http.Request) interface{} { return new(models.DiffCalculationInput) }, Handle: diffCalculation, + NoConn: true, })).Methods(http.MethodPost) // Cross sections