view pkg/controllers/stretches.go @ 4343:63c25eb9c07c

FA: be optimistic about missing data. According to clarification, missing data has to be interpreted as the best case, this is, because the services do not provide data for bottlenecks, which are not considered a limitating factor on the water way at a given time.
author Sascha Wilde <wilde@intevation.de>
date Fri, 06 Sep 2019 17:22:42 +0200
parents 0f467a839fe2
children 97312d7954ba
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE b.validity @> current_timestamp
  AND ST_Intersects(b.area, s.area)
  AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.stretches s, waterway.bottlenecks b
WHERE b.validity @> current_timestamp
  AND ST_Intersects(b.area, s.area)
  AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}
)

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	depthbreaks, widthbreaks := afdRefs, afdRefs

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []time.Duration
		breaks []time.Duration
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {

				var ldc, breaks []time.Duration

				now := time.Now()
				for _, bn := range loaded {
					// Don't interpolate for the future
					if now.Sub(res.to) < 0 {
						res.to = now
					}

					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						bn.access,
					)
					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				res.ldc = ldc
				res.breaks = breaks
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)
		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [h]"
	record[2] = "# >= LDC [h]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [h]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.1f [h]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.1f [h]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	// Normalize to look like as we have only one bottleneck.
	scale := 1 / float64(len(loaded))

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, r := range results {
		record[0] = r.label
		for i, v := range r.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale)
		}

		for i, d := range r.breaks {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	if name == "" {
		http.Error(
			rw,
			fmt.Sprintf("Missing %s name", vars["kind"]),
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(
			rw,
			"No bottlenecks found.",
			http.StatusNotFound,
		)
		return
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []float64
		breaks []float64
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {
				var ldc, breaks []time.Duration

				now := time.Now()
				for _, bn := range loaded {
					// Don't interpolate for the future
					if now.Sub(res.to) < 0 {
						res.to = now
					}

					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						(*availMeasurement).getValue,
					)

					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				duration := res.to.Sub(res.from) * time.Duration(len(loaded))

				res.ldc = durationsToPercentage(duration, ldc)
				res.breaks = durationsToPercentage(duration, breaks)
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)

		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [%%]"
	record[2] = "# >= LDC [%%]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [%%]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.3f [%%]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, res := range results {
		record[0] = res.label

		for i, v := range res.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}

		for i, v := range res.breaks {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}