changeset 4680:976aedc195e5

Be more clever when triggering diff calculations. If a diff calc is started bring it to its end. Before starting look if it is done and in db notice it as ready. When a calculation is on its way add to list of waiting calls to avoid starting a calculation again. Only do two diff calcs at a time.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 15 Oct 2019 18:25:40 +0200
parents 6c1dd2fbe2ee
children 2690933e404c
files go.mod go.sum pkg/controllers/diff.go pkg/controllers/routes.go
diffstat 4 files changed, 184 insertions(+), 83 deletions(-) [+]
line wrap: on
line diff
--- a/go.mod	Tue Oct 15 17:31:54 2019 +0200
+++ b/go.mod	Tue Oct 15 18:25:40 2019 +0200
@@ -30,7 +30,6 @@
 	go.etcd.io/bbolt v1.3.3 // indirect
 	golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 // indirect
 	golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72
-	golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
 	golang.org/x/text v0.3.2 // indirect
 	gonum.org/v1/gonum v0.0.0-20190922162417-bcfb93e04962
 	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
--- a/go.sum	Tue Oct 15 17:31:54 2019 +0200
+++ b/go.sum	Tue Oct 15 18:25:40 2019 +0200
@@ -184,8 +184,6 @@
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
-golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
--- a/pkg/controllers/diff.go	Tue Oct 15 17:31:54 2019 +0200
+++ b/pkg/controllers/diff.go	Tue Oct 15 18:25:40 2019 +0200
@@ -14,15 +14,17 @@
 package controllers
 
 import (
+	"context"
 	"database/sql"
+	"errors"
 	"fmt"
 	"log"
 	"net/http"
+	"runtime"
 	"sync"
 	"time"
 
-	"golang.org/x/sync/semaphore"
-
+	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/common"
 	"gemma.intevation.de/gemma/pkg/models"
 	"gemma.intevation.de/gemma/pkg/octree"
@@ -88,48 +90,56 @@
 `
 )
 
-// Only allow three diffence calculation at once.
-// TODO: Make this configurable?
-var diffCalculationSemaphore = semaphore.NewWeighted(int64(3))
-
-func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) {
-
-	begin := time.Now()
-	start := begin
-
-	dci := mw.JSONInput(req).(*models.DiffCalculationInput)
-
-	ctx := req.Context()
-
-	conn := mw.JSONConn(req)
-
-	var id int64
-	err = conn.QueryRowContext(
-		ctx,
-		diffIDSQL,
-		dci.Bottleneck,
-		dci.Minuend.Time,
-		dci.Subtrahend.Time,
-	).Scan(&id)
-
-	switch {
-	case err == sql.ErrNoRows:
-		// We need to calculate it.
-	case err != nil:
-		return
-	default:
-		// We already have this diff
-		jr = mw.JSONResult{
-			Result: map[string]int64{"id": id},
-		}
-		return
+type (
+	diffResult struct {
+		id  int64
+		err error
 	}
 
-	// DoS counter measure.
-	if err = diffCalculationSemaphore.Acquire(ctx, 1); err != nil {
-		return
+	diffCalculationManager struct {
+		waiting map[string][]chan diffResult
+		cmds    chan func(*diffCalculationManager)
+		jobs    chan *models.DiffCalculationInput
+	}
+)
+
+var diffCalcMng = startDiffCalculationManager()
+
+func diffCalcHash(dci *models.DiffCalculationInput) string {
+	return dci.Bottleneck + "/" +
+		dci.Minuend.Format(common.DateFormat) + "/" +
+		dci.Subtrahend.Format(common.DateFormat)
+}
+
+func startDiffCalculationManager() *diffCalculationManager {
+	dcm := diffCalculationManager{
+		waiting: make(map[string][]chan diffResult),
+		cmds:    make(chan func(*diffCalculationManager)),
+		jobs:    make(chan *models.DiffCalculationInput),
 	}
-	defer diffCalculationSemaphore.Release(1)
+
+	go dcm.run()
+	return &dcm
+}
+
+func (dcm *diffCalculationManager) run() {
+	n := runtime.NumCPU()
+	if n > 2 {
+		n = 2
+	}
+	for i := 0; i < n; i++ {
+		go dcm.calculate()
+	}
+
+	for cmd := range dcm.cmds {
+		cmd(dcm)
+	}
+}
+
+func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) {
+
+	start := time.Now()
+	begin := start
 
 	minuendTree, err := octree.FromCache(
 		ctx, conn,
@@ -137,17 +147,13 @@
 
 	log.Printf("info: loading minuend mesh took %s\n", time.Since(start))
 	if err != nil {
-		return
+		return 0, err
 	}
 
 	if minuendTree == nil {
-		err = mw.JSONError{
-			Code: http.StatusNotFound,
-			Message: fmt.Sprintf("Cannot find survey for %s/%s.",
-				dci.Bottleneck,
-				dci.Minuend.Format(common.DateFormat)),
-		}
-		return
+		return 0, fmt.Errorf("Cannot find survey for %s/%s.",
+			dci.Bottleneck,
+			dci.Minuend.Format(common.DateFormat))
 	}
 
 	start = time.Now()
@@ -158,28 +164,20 @@
 
 	log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start))
 	if err != nil {
-		return
+		return 0, err
 	}
 
 	if subtrahendTree == nil {
-		err = mw.JSONError{
-			Code: http.StatusNotFound,
-			Message: fmt.Sprintf("Cannot find survey for %s/%s.",
-				dci.Bottleneck,
-				dci.Subtrahend.Format(common.DateFormat)),
-		}
-		return
+		return 0, fmt.Errorf("Cannot find survey for %s/%s.",
+			dci.Bottleneck,
+			dci.Subtrahend.Format(common.DateFormat))
 	}
 
 	// We need a slow path implementation for this.
 	epsg := minuendTree.EPSG()
 	if epsg != subtrahendTree.EPSG() {
-		err = mw.JSONError{
-			Code: http.StatusInternalServerError,
-			Message: "Calculating differences between two different " +
-				"EPSG code meshes are not supported, yet.",
-		}
-		return
+		return 0, errors.New("Calculating differences between two different " +
+			"EPSG code meshes are not supported, yet.")
 	}
 
 	start = time.Now()
@@ -232,14 +230,11 @@
 
 	switch {
 	case triErr != nil && clipErr != nil:
-		err = fmt.Errorf("%v %v", triErr, clipErr)
-		return
+		return 0, fmt.Errorf("%v %v", triErr, clipErr)
 	case triErr != nil:
-		err = triErr
-		return
+		return 0, triErr
 	case clipErr != nil:
-		err = clipErr
-		return
+		return 0, clipErr
 	}
 
 	start = time.Now()
@@ -262,7 +257,7 @@
 	// XXX: Maybe we should start this transaction earlier!?
 	var tx *sql.Tx
 	if tx, err = conn.BeginTx(ctx, nil); err != nil {
-		return
+		return 0, err
 	}
 	defer tx.Rollback()
 
@@ -273,6 +268,7 @@
 		"morphology_classbreaks_compare")
 	if err != nil {
 		log.Printf("warn: Loading class breaks failed: %v\n", err)
+		err = nil
 		heights = octree.SampleDiffHeights(tin.Min.Z, tin.Max.Z, contourStep)
 	} else {
 		heights = octree.ExtrapolateClassBreaks(heights, tin.Min.Z, tin.Max.Z)
@@ -283,10 +279,12 @@
 
 	var isoStmt *sql.Stmt
 	if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil {
-		return
+		return 0, err
 	}
 	defer isoStmt.Close()
 
+	var id int64
+
 	if err = tx.QueryRowContext(
 		ctx,
 		insertDiffSQL,
@@ -294,7 +292,7 @@
 		dci.Minuend.Time,
 		dci.Subtrahend.Time,
 	).Scan(&id); err != nil {
-		return
+		return 0, err
 	}
 
 	heights = common.DedupFloat64s(heights)
@@ -315,7 +313,7 @@
 			wkb,
 			contourTolerance,
 		); err != nil {
-			return
+			return 0, err
 		}
 	}
 
@@ -325,20 +323,125 @@
 	log.Printf("info: calculating and storing iso lines took %v\n",
 		time.Since(start))
 
-	if err != nil {
-		return
-	}
-
 	if err = tx.Commit(); err != nil {
 		log.Printf("info: difference calculation failed after %v\n",
 			time.Since(begin))
-		return
+		return 0, err
 	}
 	log.Printf("info: difference calculation succeed after %v\n",
 		time.Since(begin))
 
-	jr = mw.JSONResult{
-		Result: map[string]int64{"id": id},
+	return id, nil
+}
+
+func (dcm *diffCalculationManager) calculate() {
+	for dci := range dcm.jobs {
+		ctx := context.Background()
+		var id int64
+		var err error
+		err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
+			id, err = doDiff(ctx, conn, dci)
+			return err
+		})
+		dcm.cmds <- func(dcm *diffCalculationManager) {
+			msg := diffResult{id: id, err: err}
+			key := diffCalcHash(dci)
+			list := dcm.waiting[key]
+			delete(dcm.waiting, key)
+			for _, ch := range list {
+				ch <- msg
+			}
+		}
 	}
+}
+
+func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult {
+	ch := make(chan diffResult)
+
+	dcm.cmds <- func(dcm *diffCalculationManager) {
+		// When already running add to list of waitings
+		key := diffCalcHash(dci)
+		if list := dcm.waiting[key]; list != nil {
+			dcm.waiting[key] = append(list, ch)
+			return
+		}
+		// look if it is in database
+		var id int64
+		ctx := context.Background()
+
+		err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
+			return conn.QueryRowContext(
+				ctx,
+				diffIDSQL,
+				dci.Bottleneck,
+				dci.Minuend.Time,
+				dci.Subtrahend.Time,
+			).Scan(&id)
+		})
+
+		switch {
+		case err == sql.ErrNoRows:
+			// We need to calculate it.
+		case err != nil: // error
+			go func() { ch <- diffResult{err: err} }()
+			return
+		default: // we have it
+			go func() { ch <- diffResult{id: id} }()
+			return
+		}
+
+		// add to waiting
+		dcm.waiting[key] = []chan diffResult{ch}
+		// feed to workers
+		dcm.jobs <- dci
+	}
+
+	return ch
+}
+
+func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) {
+	dcm.cmds <- func(dcm *diffCalculationManager) {
+		key := diffCalcHash(dci)
+		list := dcm.waiting[key]
+		for i, x := range list {
+			if x == ch {
+				if i != len(list)-1 {
+					copy(list[i:], list[i+1:])
+				}
+				list[len(list)-1] = nil
+				list = list[:len(list)-1]
+				if len(list) == 0 {
+					delete(dcm.waiting, key)
+				} else {
+					dcm.waiting[key] = list
+				}
+				return
+			}
+		}
+	}
+}
+
+func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) {
+
+	dci := mw.JSONInput(req).(*models.DiffCalculationInput)
+
+	ctx := req.Context()
+
+	resultCh := diffCalcMng.enque(dci)
+	defer diffCalcMng.unregister(dci, resultCh)
+
+	select {
+	case res := <-resultCh:
+		if res.err != nil {
+			err = res.err
+			return
+		}
+		jr = mw.JSONResult{
+			Result: map[string]int64{"id": res.id},
+		}
+	case <-ctx.Done():
+		log.Println("request canceled")
+	}
+
 	return
 }
--- a/pkg/controllers/routes.go	Tue Oct 15 17:31:54 2019 +0200
+++ b/pkg/controllers/routes.go	Tue Oct 15 18:25:40 2019 +0200
@@ -185,6 +185,7 @@
 	api.Handle("/diff", any(&mw.JSONHandler{
 		Input:  func(*http.Request) interface{} { return new(models.DiffCalculationInput) },
 		Handle: diffCalculation,
+		NoConn: true,
 	})).Methods(http.MethodPost)
 
 	// Cross sections