view pkg/controllers/stretches.go @ 3457:870812d8f247

client: spuc05: implemented new data format (csv) for diagrams on map
author Markus Kottlaender <markus@intevation.de>
date Fri, 24 May 2019 15:32:47 +0200
parents ae6c09fbc590
children 7265adcc5baa
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2099 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"gemma.intevation.de/gemma/pkg/middleware"
	"github.com/gorilla/mux"
)

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.stretches s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}

	availResult struct {
		label       string
		from, to    time.Time
		ldc, breaks []time.Duration
	}

	availCalculation struct {
		result      *availResult
		ldc, breaks []time.Duration
	}

	availJob struct {
		result *availResult
		bn     *fullStretchBottleneck
	}
)

func (r *availResult) add(c availCalculation) {
	for i, v := range c.ldc {
		r.ldc[i] += v
	}
	for i, v := range c.breaks {
		r.breaks[i] += v
	}
}

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	depthbreaks, widthbreaks := afdRefs, afdRefs

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	jobCh := make(chan availJob)
	calcCh := make(chan availCalculation, n)
	done := make(chan struct{})

	var wg sync.WaitGroup

	go func() {
		defer close(done)
		for calc := range calcCh {
			calc.result.add(calc)
		}
	}()

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				bn := job.bn
				res := job.result
				ldc := bn.measurements.classify(
					res.from, res.to,
					bn.ldc,
					bn.access,
				)
				breaks := bn.measurements.classify(
					res.from, res.to,
					bn.breaks,
					bn.access,
				)
				calcCh <- availCalculation{
					result: res,
					breaks: breaks,
					ldc:    ldc,
				}
			}
		}()
	}

	var results []*availResult

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &availResult{
			label:  label,
			from:   pfrom,
			to:     pto,
			ldc:    make([]time.Duration, 2),
			breaks: make([]time.Duration, len(breaks)+1),
		}
		results = append(results, res)

		for _, bn := range loaded {
			jobCh <- availJob{bn: bn, result: res}
		}
	}

	close(jobCh)
	wg.Wait()
	close(calcCh)
	<-done

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [h]"
	record[2] = "# >= LDC [h]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [h]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.2f [h]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.2f [h]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	// Normalize to look like as we have only one bottleneck.
	scale := 1 / float64(len(loaded))

	for _, r := range results {
		record[0] = r.label
		for i, v := range r.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale)
		}

		for i, d := range r.breaks {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	if name == "" {
		http.Error(
			rw,
			fmt.Sprintf("Missing %s name", vars["kind"]),
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "form", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(
			rw,
			"No bottlenecks found.",
			http.StatusNotFound,
		)
		return
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	jobCh := make(chan availJob)
	calcCh := make(chan availCalculation, n)
	done := make(chan struct{})

	var wg sync.WaitGroup

	go func() {
		defer close(done)
		for calc := range calcCh {
			calc.result.add(calc)
		}
	}()

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				bn := job.bn
				res := job.result
				ldc := bn.measurements.classify(
					from, to,
					bn.ldc,
					(*availMeasurement).getValue,
				)

				breaks := bn.measurements.classify(
					from, to,
					bn.breaks,
					bn.access,
				)
				calcCh <- availCalculation{
					result: res,
					breaks: breaks,
					ldc:    ldc,
				}
			}
		}()
	}

	var results []*availResult

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &availResult{
			label:  label,
			from:   pfrom,
			to:     pto,
			ldc:    make([]time.Duration, 2),
			breaks: make([]time.Duration, len(breaks)+1),
		}
		results = append(results, res)

		for _, bn := range loaded {
			jobCh <- availJob{bn: bn, result: res}
		}
	}

	close(jobCh)
	wg.Wait()
	close(calcCh)
	<-done

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [%%]"
	record[2] = "# >= LDC [%%]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [%%]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.3f [%%]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	for _, r := range results {
		duration := r.to.Sub(r.from) * time.Duration(len(loaded))

		ldcPercents := durationsToPercentage(duration, r.ldc)
		breaksPercents := durationsToPercentage(duration, r.breaks)

		record[0] = r.label

		for i, v := range ldcPercents {
			record[2+i] = fmt.Sprintf("%.3f", v)
		}

		for i, v := range breaksPercents {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}