view pkg/controllers/stretches.go @ 4795:fe838fc3ca69

FA: changed aggregation, so that worst classes are determining the result. Previously aggregation was done by calculating the mean, but that was not the expected behavior as the overall availability of the section can't be better than the availability of the worst included bottleneck.
author Sascha Wilde <wilde@intevation.de>
date Fri, 25 Oct 2019 17:11:37 +0200
parents ef21c1464843
children 199c49f967d2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

// The following requests are taking _all_ bottlenecks into account, not only
// the currently valid ones.  This is neccessary, as we are doing reports on
// arbitrary time ranges and bottlenecks currently active might have been in the
// selected time range.
//
// FIXME: the better solution would be to limit the bottlenecks to those with:
//   b.validity && REQUESTED_TIME_RANGE

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area)
  AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM users.stretches s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area)
  AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}
)

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func maxDuration(a time.Duration, b time.Duration) time.Duration {
	if a > b {
		return a
	}
	return b
}

func sumClassesTo(breaks []time.Duration, to int) time.Duration {
	var result time.Duration
	if to < 0 {
		result = 0
	}
	for i := 0; i <= to; i++ {
		result += breaks[i]
	}
	return result
}

func aggregateClasses(
	new []time.Duration,
	agg []time.Duration,
) []time.Duration {
	newAgg := make([]time.Duration, len(agg))

	fmt.Printf("Old agg: %v\n", agg)
	fmt.Printf("compwit: %v\n", new)
	for i := 0; i < len(new)-1; i++ {
		old_sum := sumClassesTo(agg, i)
		new_sum := sumClassesTo(new, i)
		newAgg[i] = maxDuration(new_sum, old_sum) - sumClassesTo(newAgg, i-1)
	}
	// adjust highest class so the sum of all classes in agg
	// matches the original sum of all classes in new.
	newAgg[len(new)-1] =
		sumClassesTo(new, len(new)-1) - sumClassesTo(newAgg, len(new)-2)
	fmt.Printf("New agg: %v\n", newAgg)
	return newAgg
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	if len(ldc) == 0 {
		return nil, fmt.Errorf("No LDC found for bottleneck: %s", bn.name)
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	depthbreaks, widthbreaks := afdRefs, afdRefs

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []time.Duration
		breaks []time.Duration
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {

				var ldc, breaks []time.Duration

				now := time.Now()
				for _, bn := range loaded {
					// Don't interpolate for the future
					if now.Sub(res.to) < 0 {
						res.to = now
					}

					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						(*availMeasurement).getValue,
					)
					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						ldc = aggregateClasses(l, ldc)
						breaks = aggregateClasses(b, breaks)
					}
				}

				res.ldc = ldc
				res.breaks = breaks
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)
		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [d]"
	record[2] = "# >= LDC [d]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [d]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.1f [d]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.1f [d]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, r := range results {
		// Round to full days
		ldcRounded := common.RoundToFullDays(r.ldc)
		rangesRounded := common.RoundToFullDays(r.breaks)

		record[0] = r.label
		for i, v := range ldcRounded {
			record[1+i] = fmt.Sprintf("%d", v)
		}

		for i, d := range rangesRounded {
			record[3+i] = fmt.Sprintf("%d", d)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	if name == "" {
		http.Error(
			rw,
			fmt.Sprintf("Missing %s name", vars["kind"]),
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(
			rw,
			"No bottlenecks found.",
			http.StatusNotFound,
		)
		return
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []float64
		breaks []float64
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {
				var ldc, breaks []time.Duration

				now := time.Now()
				for _, bn := range loaded {
					// Don't interpolate for the future
					if now.Sub(res.to) < 0 {
						res.to = now
					}

					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						(*availMeasurement).getValue,
					)

					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						ldc = aggregateClasses(l, ldc)
						breaks = aggregateClasses(b, breaks)
					}
				}

				duration := res.to.Sub(res.from)

				res.ldc = durationsToPercentage(duration, ldc)
				res.breaks = durationsToPercentage(duration, breaks)
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)

		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [%%]"
	record[2] = "# >= LDC [%%]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [%%]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.3f [%%]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, res := range results {
		record[0] = res.label

		for i, v := range res.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}

		for i, v := range res.breaks {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}