view pkg/controllers/bottlenecks.go @ 4700:a0c320d89682

FA diagram: take latest available LDC. As we have to take currently not active bottlenecks into account: don't use the current LDC (which might not be available), but the latest available one.
author Sascha Wilde <wilde@intevation.de>
date Wed, 16 Oct 2019 16:50:36 +0200
parents e357730c090a
children ef21c1464843
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectLimitingSQL = `
SELECT limiting FROM waterway.bottlenecks bn
  WHERE bn.validity @> current_timestamp AND objnam = $1
`

	selectAvailableDepthSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.bottleneck_id
  WHERE
    bn.validity @> current_timestamp AND
    bn.objnam = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date
`

	selectGaugeLDCSQL = `
SELECT
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity @> COALESCE(upper(bns.validity), current_timestamp)
WHERE lower(bns.validity) = (SELECT max(lower(validity))
                             FROM waterway.bottlenecks WHERE objnam = $1)
  AND bns.objnam = $1
  AND grwl.depth_reference like 'LDC%'
`
)

type (
	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement
)

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func limitingFactor(limiting string) func(*availMeasurement) float64 {
	switch limiting {
	case "depth":
		return (*availMeasurement).getDepth
	case "width":
		return (*availMeasurement).getWidth
	default:
		log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting)
		return (*availMeasurement).getDepth
	}
}

// According to clarification, it has to be assumed, that at times
// with no data, the best case (which by convention is the highest
// class created by classify()) should be assumed.  That is due to the
// fact, that at times where bottlenecks are not a limiting factor on
// the waterway, services don't provide any data for the bottleneck in
// question.
//
// FIXME: A potential improvement could be to intersect the time
// ranges with the time ranges where bottlenecks were "active" (this
// _might_ be derivable from the validity periods in the bottleneck
// data.  So it _might_ be possible do detect actual missing data (BN
// valid, but no data from FA service).  Anyway, this is left out for
// now, as many clarification regarding the base assumtions would be
// needed and the results still might be unrelyable.
func optimisticPadClassification(
	from, to time.Time,
	classified []time.Duration,
) []time.Duration {

	var actualDuration time.Duration
	for _, v := range classified {
		actualDuration += v
	}

	// If the actual duration is smaller than the length
	// of the classifaction interval extend the
	// time spend in the highest class by the difference.
	if delta := to.Sub(from) - actualDuration; delta > 0 {
		log.Printf("info: time interval: (%v - %v)\n", from, to)
		log.Printf("info: found only data for %.2f hours, padding by %.2f hours\n",
			actualDuration.Hours(), delta.Hours())
		classified[len(classified)-1] += delta
	}

	return classified
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return optimisticPadClassification(from, to, result)
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return optimisticPadClassification(from, to, result)
		}

		if p2.when.Before(from) {
			continue
		}

		lo, hi := maxTime(p1.when, from), minTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := orderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = maxTime(start, lo), minTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return optimisticPadClassification(from, to, result)
}

func orderTime(a, b time.Time) (time.Time, time.Time) {
	if a.Before(b) {
		return a, b
	}
	return b, a
}

func minTime(a, b time.Time) time.Time {
	if a.Before(b) {
		return a
	}
	return b
}

func maxTime(a, b time.Time) time.Time {
	if a.After(b) {
		return a
	}
	return b
}

func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 {
	percents := make([]float64, len(classes))
	total := 100 / duration.Seconds()
	for i, v := range classes {
		percents[i] = v.Seconds() * total
	}
	return percents
}

func parseFormTime(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def time.Time,
) (time.Time, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def.UTC(), true
	}
	v, err := common.ParseTime(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return time.Time{}, false
	}
	return v.UTC(), true
}

func parseFormInt(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def int,
) (int, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def, true
	}
	v, err := strconv.Atoi(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return 0, false
	}
	return v, true
}

func intervalMode(mode string) int {
	switch strings.ToLower(mode) {
	case "monthly":
		return 0
	case "quarterly":
		return 1
	case "yearly":
		return 2
	default:
		return 0
	}
}

func loadDepthValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
	los int,
	from, to time.Time,
) (availMeasurements, error) {

	rows, err := conn.QueryContext(
		ctx, selectAvailableDepthSQL, bottleneck, los, from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return nil, err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return ms, nil
}

func loadLDCReferenceValue(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]float64, error) {
	var value float64
	err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value)
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return []float64{value}, nil
}

func breaksToReferenceValue(breaks string) []float64 {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if v, err := strconv.ParseFloat(part, 64); err == nil {
			values = append(values, v)
		}
	}

	sort.Float64s(values)

	// dedup
	for i := 1; i < len(values); {
		if values[i-1] == values[i] {
			copy(values[i:], values[i+1:])
			values = values[:len(values)-1]
		} else {
			i++
		}
	}
	return values
}

func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))
	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		http.Error(
			rw,
			"Missing objnam of bottleneck",
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw,
			fmt.Sprintf("Internal server error: %v", err),
			http.StatusInternalServerError,
		)
		return
	}

	if len(ldcRefs) == 0 {
		http.Error(
			rw,
			"No gauge reference values found for bottleneck",
			http.StatusNotFound,
		)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var ms availMeasurements
	if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil {
		return
	}

	if len(ms) == 0 {
		http.Error(
			rw,
			"No available fairway depth values found",
			http.StatusNotFound,
		)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [%%]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [%%]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("#d < %.1f [%%]", v)
		}
		record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		lnwl := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getValue,
		)

		afd := ms.classify(
			pfrom, pto,
			breaks,
			(*availMeasurement).getDepth,
		)

		duration := pto.Sub(pfrom)
		lnwlPercents := durationsToPercentage(duration, lnwl)
		afdPercents := durationsToPercentage(duration, afd)

		record[0] = label
		for i, v := range lnwlPercents {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}
		for i, v := range afdPercents {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))

	bn := mux.Vars(req)["objnam"]
	if bn == "" {
		http.Error(
			rw, "Missing objnam of bottleneck",
			http.StatusBadRequest)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	var limiting string
	err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting)
	switch {
	case err == sql.ErrNoRows:
		http.Error(
			rw, fmt.Sprintf("Unknown limitation for %s.", bn),
			http.StatusNotFound)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	access := limitingFactor(limiting)

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	// load the measurements
	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading measurements failed: %v.", err),
			http.StatusInternalServerError)
		return
	}

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading LDC failed: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(ldcRefs) == 0 {
		http.Error(rw, "No LDC found", http.StatusNotFound)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, ldc, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [d]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [d]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("# < %.1f [d]", v)
		}
		record[i+4] = fmt.Sprintf("# >= %.1f [d]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	//log.Println(len(ms))
	//for i := range ms {
	//	log.Println(ms[i].when, ms[i].depth)
	//}

	log.Printf("info: measurements: %d\n", len(ms))
	if len(ms) > 1 {
		log.Printf("info: first: %v\n", ms[0].when)
		log.Printf("info: last: %v\n", ms[len(ms)-1].when)
		log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours())
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		ldc := ms.classify(
			pfrom, pto,
			ldcRefs,
			access,
		)

		ranges := ms.classify(
			pfrom, pto,
			breaks,
			access,
		)

		// Round to full days
		ldcRounded := common.RoundToFullDays(ldc)
		rangesRounded := common.RoundToFullDays(ranges)

		record[0] = label
		for i, v := range ldcRounded {
			record[i+1] = fmt.Sprintf("%d", v)
		}

		for i, d := range rangesRounded {
			record[3+i] = fmt.Sprintf("%d", d)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){
	monthly,
	quarterly,
	yearly,
}

func monthly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 1, 0)
		label := fmt.Sprintf("%02d-%d", f.Month(), f.Year())
		return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label
	}
}

func quarterly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 3, 0)
		label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year())
		return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label
	}
}

func yearly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(1, 0, 0)
		label := fmt.Sprintf("%d", f.Year())
		return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label
	}
}