changeset 3405:2b5c22f6bb1f

available fairway depth: Implemented backend for sections and stretches.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 23 May 2019 12:53:19 +0200
parents 0c9467003d14
children 1ba669cbbee6
files pkg/controllers/bottlenecks.go pkg/controllers/routes.go pkg/controllers/stretches.go
diffstat 3 files changed, 440 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/bottlenecks.go	Thu May 23 12:48:55 2019 +0200
+++ b/pkg/controllers/bottlenecks.go	Thu May 23 12:53:19 2019 +0200
@@ -565,10 +565,26 @@
 	return
 }
 
+func intervalMode(mode string) int {
+	switch strings.ToLower(mode) {
+	case "monthly":
+		return 0
+	case "quarterly":
+		return 1
+	case "yearly":
+		return 2
+	default:
+		return 0
+	}
+}
+
 func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {
 
+	mode := intervalMode(req.FormValue("mode"))
+	var from, to time.Time
+	var los int
+
 	bn := mux.Vars(req)["objnam"]
-
 	if bn == "" {
 		http.Error(
 			rw, "Missing objnam of bottleneck",
@@ -576,25 +592,6 @@
 		return
 	}
 
-	var mode int
-	if m := req.FormValue("mode"); m != "" {
-		switch strings.ToLower(m) {
-		case "monthly":
-			mode = 0
-		case "quarterly":
-			mode = 1
-		case "yearly":
-			mode = 2
-		default:
-			http.Error(
-				rw, fmt.Sprintf("Unknown 'mode' value %s.", m),
-				http.StatusBadRequest)
-			return
-		}
-	}
-
-	var from, to time.Time
-
 	if f := req.FormValue("from"); f != "" {
 		var err error
 		if from, err = time.Parse(common.TimeFormat, f); err != nil {
@@ -625,9 +622,6 @@
 		to, from = from, to
 	}
 
-	log.Printf("info: time interval: (%v - %v)\n", from, to)
-
-	var los int
 	if l := req.FormValue("los"); l != "" {
 		var err error
 		if los, err = strconv.Atoi(l); err != nil {
@@ -660,6 +654,8 @@
 
 	access := limitingFactor(limiting)
 
+	log.Printf("info: time interval: (%v - %v)\n", from, to)
+
 	// load the measurements
 	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
 	if err != nil {
--- a/pkg/controllers/routes.go	Thu May 23 12:48:55 2019 +0200
+++ b/pkg/controllers/routes.go	Thu May 23 12:53:19 2019 +0200
@@ -314,11 +314,11 @@
 		Handle: bottleneckAvailabilty,
 	})).Methods(http.MethodGet)
 
-	api.Handle("/data/{kind:stretch|section}/availability/{objnam}", any(&JSONHandler{
+	api.Handle("/data/{kind:stretch|section}/availability/{name}", any(&JSONHandler{
 		Handle: stretchAvailabilty,
 	})).Methods(http.MethodGet)
 
-	api.Handle("/data/{kind:stretch|section}/fairway-depth/{objnam}", any(
+	api.Handle("/data/{kind:stretch|section}/fairway-depth/{name}", any(
 		middleware.DBConn(http.HandlerFunc(stretchAvailableFairwayDepth)))).Methods(http.MethodGet)
 
 	api.Handle("/data/bottleneck/fairway-depth/{objnam}", any(
--- a/pkg/controllers/stretches.go	Thu May 23 12:48:55 2019 +0200
+++ b/pkg/controllers/stretches.go	Thu May 23 12:53:19 2019 +0200
@@ -14,10 +14,429 @@
 package controllers
 
 import (
+	"context"
 	"database/sql"
+	"encoding/csv"
+	"fmt"
+	"log"
 	"net/http"
+	"runtime"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/middleware"
+	"github.com/gorilla/mux"
 )
 
+const (
+	selectSectionBottlenecks = `
+SELECT
+  distinct(b.objnam),
+  b.limiting
+FROM waterway.sections s, waterway.bottlenecks b
+WHERE ST_Intersects(b.area, s.area) AND s.name = $1`
+
+	selectStretchBottlenecks = `
+SELECT
+  distinct(b.objnam),
+  b.limiting
+FROM waterway.stretches s, waterway.bottlenecks b
+WHERE ST_Intersects(b.area, s.area) AND s.name = $1`
+)
+
+type (
+	stretchBottleneck struct {
+		name     string
+		limiting string
+	}
+
+	stretchBottlenecks []stretchBottleneck
+
+	fullStretchBottleneck struct {
+		*stretchBottleneck
+		measurements availMeasurements
+		ldc          []referenceValue
+		breaks       []referenceValue
+		access       func(*availMeasurement) float64
+	}
+)
+
+func (bns stretchBottlenecks) contains(limiting string) bool {
+	for i := range bns {
+		if bns[i].limiting == limiting {
+			return true
+		}
+	}
+	return false
+}
+
+func loadFullStretchBottleneck(
+	ctx context.Context,
+	conn *sql.Conn,
+	bn *stretchBottleneck,
+	los int,
+	from, to time.Time,
+	depthbreaks, widthbreaks []referenceValue,
+) (*fullStretchBottleneck, error) {
+	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
+	if err != nil {
+		return nil, err
+	}
+	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
+	if err != nil {
+		return nil, err
+	}
+
+	var access func(*availMeasurement) float64
+	var breaks []referenceValue
+
+	switch bn.limiting {
+	case "width":
+		access = (*availMeasurement).getWidth
+		breaks = widthbreaks
+	case "depth":
+		access = (*availMeasurement).getDepth
+		breaks = depthbreaks
+	default:
+		log.Printf(
+			"warn: unknown limitation '%s'. default to 'depth'.\n",
+			bn.limiting)
+		access = (*availMeasurement).getDepth
+		breaks = depthbreaks
+	}
+
+	return &fullStretchBottleneck{
+		stretchBottleneck: bn,
+		measurements:      measurements,
+		ldc:               ldc,
+		breaks:            breaks,
+		access:            access,
+	}, nil
+}
+
+func loadStretchBottlenecks(
+	ctx context.Context,
+	conn *sql.Conn,
+	stretch bool,
+	name string,
+) (stretchBottlenecks, error) {
+	var sql string
+	if stretch {
+		sql = selectStretchBottlenecks
+	} else {
+		sql = selectSectionBottlenecks
+	}
+
+	rows, err := conn.QueryContext(ctx, sql, name)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	var bns stretchBottlenecks
+
+	for rows.Next() {
+		var bn stretchBottleneck
+		if err := rows.Scan(
+			&bn.name,
+			&bn.limiting,
+		); err != nil {
+			return nil, err
+		}
+		bns = append(bns, bn)
+	}
+
+	if err := rows.Err(); err != nil {
+		return nil, err
+	}
+
+	return bns, nil
+}
+
+func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {
+
+	vars := mux.Vars(req)
+	stretch := vars["kind"] == "stretch"
+	name := vars["name"]
+
+	mode := intervalMode(req.FormValue("mode"))
+	var from, to time.Time
+	var los int
+
+	depthbreaks, widthbreaks := afdRefs, afdRefs
+
+	if f := req.FormValue("from"); f != "" {
+		var err error
+		if from, err = time.Parse(common.TimeFormat, f); err != nil {
+			http.Error(
+				rw, fmt.Sprintf("Invalid format for 'from': %v.", err),
+				http.StatusBadRequest)
+			return
+		}
+	} else {
+		from = time.Now().AddDate(-1, 0, 0)
+	}
+	from = from.UTC()
+
+	if t := req.FormValue("to"); t != "" {
+		var err error
+		if to, err = time.Parse(common.TimeFormat, t); err != nil {
+			http.Error(
+				rw, fmt.Sprintf("Invalid format for 'to': %v.", err),
+				http.StatusBadRequest)
+			return
+		}
+	} else {
+		to = from.AddDate(1, 0, 0)
+	}
+	to = to.UTC()
+
+	if to.Before(from) {
+		to, from = from, to
+	}
+
+	if l := req.FormValue("los"); l != "" {
+		var err error
+		if los, err = strconv.Atoi(l); err != nil {
+			http.Error(
+				rw, fmt.Sprintf("Invalid format for 'los': %v.", err),
+				http.StatusBadRequest)
+			return
+		}
+	} else {
+		los = 1
+	}
+
+	conn := middleware.GetDBConn(req)
+	ctx := req.Context()
+
+	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
+	if err != nil {
+		http.Error(
+			rw, fmt.Sprintf("DB error: %v.", err),
+			http.StatusInternalServerError)
+		return
+	}
+	if len(bns) == 0 {
+		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
+		return
+	}
+
+	if b := req.FormValue("depthbreaks"); b != "" {
+		depthbreaks = breaksToReferenceValue(b)
+	}
+
+	if b := req.FormValue("widthbreaks"); b != "" {
+		widthbreaks = breaksToReferenceValue(b)
+	}
+
+	useDepth, useWidth := bns.contains("depth"), bns.contains("width")
+
+	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
+		http.Error(
+			rw,
+			fmt.Sprintf("class breaks lengths differ: %d != %d",
+				len(widthbreaks), len(depthbreaks)),
+			http.StatusBadRequest,
+		)
+		return
+	}
+
+	log.Printf("info: time interval: (%v - %v)\n", from, to)
+
+	var loaded []*fullStretchBottleneck
+	var errors []error
+
+	for i := range bns {
+		l, err := loadFullStretchBottleneck(
+			ctx,
+			conn,
+			&bns[i],
+			los,
+			from, to,
+			depthbreaks, widthbreaks,
+		)
+		if err != nil {
+			errors = append(errors, err)
+			continue
+		}
+		loaded = append(loaded, l)
+	}
+
+	if len(loaded) == 0 {
+		http.Error(
+			rw,
+			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
+			http.StatusInternalServerError,
+		)
+		return
+	}
+
+	type (
+		result struct {
+			label       string
+			from, to    time.Time
+			ldc, breaks []time.Duration
+		}
+
+		calculation struct {
+			result      *result
+			ldc, breaks []time.Duration
+		}
+
+		job struct {
+			result *result
+			bn     *fullStretchBottleneck
+		}
+	)
+
+	n := runtime.NumCPU() / 2
+	if n == 0 {
+		n = 1
+	}
+
+	jobCh := make(chan job)
+	calcCh := make(chan calculation, n)
+	done := make(chan struct{})
+
+	var wg sync.WaitGroup
+
+	go func() {
+		defer close(done)
+		for calc := range calcCh {
+			ldc := calc.result.ldc
+			for i, v := range calc.ldc {
+				ldc[i] += v
+			}
+			breaks := calc.result.breaks
+			for i, v := range calc.breaks {
+				breaks[i] += v
+			}
+		}
+	}()
+
+	for i := 0; i < n; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for job := range jobCh {
+				bn := job.bn
+				res := job.result
+				ldc := bn.measurements.classify(
+					res.from, res.to,
+					bn.ldc,
+					bn.access,
+				)
+				breaks := bn.measurements.classify(
+					res.from, res.to,
+					bn.breaks,
+					bn.access,
+				)
+				calcCh <- calculation{
+					result: res,
+					breaks: breaks,
+					ldc:    ldc,
+				}
+			}
+		}()
+	}
+
+	var results []*result
+
+	interval := intervals[mode](from, to)
+
+	var breaks []referenceValue
+
+	if useDepth {
+		breaks = depthbreaks
+	} else {
+		breaks = widthbreaks
+	}
+
+	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
+
+		res := &result{
+			label:  label,
+			from:   pfrom,
+			to:     pto,
+			ldc:    make([]time.Duration, 2),
+			breaks: make([]time.Duration, len(breaks)+1),
+		}
+		results = append(results, res)
+
+		for _, bn := range loaded {
+			jobCh <- job{bn: bn, result: res}
+		}
+	}
+	close(jobCh)
+	wg.Wait()
+	close(calcCh)
+	<-done
+
+	rw.Header().Add("Content-Type", "text/csv")
+
+	out := csv.NewWriter(rw)
+
+	// label, classes, lnwl
+	record := make([]string, 1+1+len(breaks)+1)
+	record[0] = "#label"
+	record[1] = "# >= LDC [h]"
+	for i, v := range breaks {
+		if useDepth && useWidth {
+			if i == 0 {
+				record[2] = "# < break_1 [h]"
+			}
+			record[i+3] = fmt.Sprintf("# >= break_%d", i+1)
+		} else {
+			if i == 0 {
+				record[2] = fmt.Sprintf("# < %.2f [h]", v.value)
+			}
+			record[i+3] = fmt.Sprintf("# >= %.2f [h]", v.value)
+		}
+	}
+
+	if err := out.Write(record); err != nil {
+		// Too late for HTTP status message.
+		log.Printf("error: %v\n", err)
+		return
+	}
+
+	for _, r := range results {
+		record[0] = r.label
+		record[1] = fmt.Sprintf("%.3f", r.ldc[1].Hours())
+
+		for i, d := range r.breaks {
+			record[2+i] = fmt.Sprintf("%.3f", d.Hours())
+		}
+
+		if err := out.Write(record); err != nil {
+			// Too late for HTTP status message.
+			log.Printf("error: %v\n", err)
+			return
+		}
+	}
+
+	out.Flush()
+	if err := out.Error(); err != nil {
+		// Too late for HTTP status message.
+		log.Printf("error: %v\n", err)
+	}
+}
+
+func joinErrors(errors []error) string {
+	var b strings.Builder
+	for _, err := range errors {
+		if b.Len() > 0 {
+			b.WriteString(", ")
+		}
+		b.WriteString(err.Error())
+	}
+	return b.String()
+}
+
 func stretchAvailabilty(
 	_ interface{},
 	req *http.Request,
@@ -26,7 +445,3 @@
 	// TODO: Implement me!
 	return
 }
-
-func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {
-	// TODO: Implement me!
-}