view pkg/controllers/bottlenecks.go @ 4860:de7aa9230837

Limit interpolation of time ranges for Available Fairway Depths to 1.5h. This is necessary to prevent interpolation into time ranges where intentionally no data is provided. (When bottlenecks are "inactive" no data is provided and gemma shall assume perfect conditions for that time)
author Sascha Wilde <wilde@intevation.de>
date Wed, 04 Dec 2019 15:28:22 +0100
parents 125cac3c977d
children 2ede9ac26ef2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectLimitingSQL = `
SELECT limiting FROM waterway.bottlenecks bn
  WHERE bn.validity @> current_timestamp AND objnam = $1
`

	selectAvailableDepthSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.bottleneck_id
  WHERE
    bn.validity @> current_timestamp AND
    bn.objnam = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date
`

	selectGaugeLDCSQL = `
SELECT
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity @> COALESCE(upper(bns.validity), current_timestamp)
WHERE lower(bns.validity) = (SELECT max(lower(validity))
                             FROM waterway.bottlenecks WHERE objnam = $1)
  AND bns.objnam = $1
  AND grwl.depth_reference like 'LDC%'
`
)

type (
	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement
)

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func limitingFactor(limiting string) func(*availMeasurement) float64 {
	switch limiting {
	case "depth":
		return (*availMeasurement).getDepth
	case "width":
		return (*availMeasurement).getWidth
	default:
		log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting)
		return (*availMeasurement).getDepth
	}
}

// According to clarification, it has to be assumed, that at times
// with no data, the best case (which by convention is the highest
// class created by classify()) should be assumed.  That is due to the
// fact, that at times where bottlenecks are not a limiting factor on
// the waterway, services don't provide any data for the bottleneck in
// question.
//
// FIXME: A potential improvement could be to intersect the time
// ranges with the time ranges where bottlenecks were "active" (this
// _might_ be derivable from the validity periods in the bottleneck
// data.  So it _might_ be possible do detect actual missing data (BN
// valid, but no data from FA service).  Anyway, this is left out for
// now, as many clarification regarding the base assumtions would be
// needed and the results still might be unrelyable.
func optimisticPadClassification(
	from, to time.Time,
	classified []time.Duration,
) []time.Duration {

	var actualDuration time.Duration
	for _, v := range classified {
		actualDuration += v
	}

	// If the actual duration is smaller than the length
	// of the classifaction interval extend the
	// time spend in the highest class by the difference.
	if delta := to.Sub(from) - actualDuration; delta > 0 {
		log.Printf("info: time interval: (%v - %v)\n", from, to)
		log.Printf("info: found only data for %.2f hours, padding by %.2f hours\n",
			actualDuration.Hours(), delta.Hours())
		classified[len(classified)-1] += delta
	}

	return classified
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return optimisticPadClassification(from, to, result)
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return optimisticPadClassification(from, to, result)
		}

		if p2.when.Before(from) {
			continue
		}

		if p2.when.Sub(p1.when).Hours() > 1.5 {
			// Don't interpolate ranges bigger then one hour
			continue
		}

		lo, hi := maxTime(p1.when, from), minTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := orderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = maxTime(start, lo), minTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return optimisticPadClassification(from, to, result)
}

func orderTime(a, b time.Time) (time.Time, time.Time) {
	if a.Before(b) {
		return a, b
	}
	return b, a
}

func minTime(a, b time.Time) time.Time {
	if a.Before(b) {
		return a
	}
	return b
}

func maxTime(a, b time.Time) time.Time {
	if a.After(b) {
		return a
	}
	return b
}

func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 {
	percents := make([]float64, len(classes))
	total := 100 / duration.Seconds()
	for i, v := range classes {
		percents[i] = v.Seconds() * total
	}
	return percents
}

func parseFormTime(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def time.Time,
) (time.Time, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def.UTC(), true
	}
	v, err := common.ParseTime(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s'.", field),
			http.StatusBadRequest,
		)
		return time.Time{}, false
	}
	return v.UTC(), true
}

func parseFormInt(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def int,
) (int, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def, true
	}
	v, err := strconv.Atoi(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s'.", field),
			http.StatusBadRequest,
		)
		return 0, false
	}
	return v, true
}

func intervalMode(mode string) int {
	switch strings.ToLower(mode) {
	case "monthly":
		return 0
	case "quarterly":
		return 1
	case "yearly":
		return 2
	default:
		return 0
	}
}

func loadDepthValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
	los int,
	from, to time.Time,
) (availMeasurements, error) {

	rows, err := conn.QueryContext(
		ctx, selectAvailableDepthSQL, bottleneck, los, from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return nil, err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return ms, nil
}

func loadLDCReferenceValue(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]float64, error) {
	var value float64
	err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value)
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return []float64{value}, nil
}

func breaksToReferenceValue(breaks string) []float64 {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if v, err := strconv.ParseFloat(part, 64); err == nil {
			values = append(values, v)
		}
	}

	sort.Float64s(values)

	// dedup
	for i := 1; i < len(values); {
		if values[i-1] == values[i] {
			copy(values[i:], values[i+1:])
			values = values[:len(values)-1]
		} else {
			i++
		}
	}
	return values
}

func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))
	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		http.Error(
			rw,
			"Missing objnam of bottleneck",
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	var limiting string
	err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting)
	switch {
	case err == sql.ErrNoRows:
		http.Error(
			rw, fmt.Sprintf("Unknown limitation for %s.", bn),
			http.StatusNotFound)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	access := limitingFactor(limiting)

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw,
			fmt.Sprintf("Internal server error: %v", err),
			http.StatusInternalServerError,
		)
		return
	}

	if len(ldcRefs) == 0 {
		http.Error(
			rw,
			"No gauge reference values found for bottleneck",
			http.StatusNotFound,
		)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var ms availMeasurements
	if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil {
		return
	}

	if len(ms) == 0 {
		http.Error(
			rw,
			"No available fairway depth values found",
			http.StatusNotFound,
		)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [%%]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [%%]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("#d < %.1f [%%]", v)
		}
		record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		lnwl := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getValue,
		)

		afd := ms.classify(
			pfrom, pto,
			breaks,
			access,
		)

		duration := pto.Sub(pfrom)
		lnwlPercents := durationsToPercentage(duration, lnwl)
		afdPercents := durationsToPercentage(duration, afd)

		record[0] = label
		for i, v := range lnwlPercents {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}
		for i, v := range afdPercents {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))

	bn := mux.Vars(req)["objnam"]
	if bn == "" {
		http.Error(
			rw, "Missing objnam of bottleneck",
			http.StatusBadRequest)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	var limiting string
	err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting)
	switch {
	case err == sql.ErrNoRows:
		http.Error(
			rw, fmt.Sprintf("Unknown limitation for %s.", bn),
			http.StatusNotFound)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	access := limitingFactor(limiting)

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	// load the measurements
	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading measurements failed: %v.", err),
			http.StatusInternalServerError)
		return
	}

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading LDC failed: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(ldcRefs) == 0 {
		http.Error(rw, "No LDC found", http.StatusNotFound)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, ldc, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [d]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [d]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("# < %.1f [d]", v)
		}
		record[i+4] = fmt.Sprintf("# >= %.1f [d]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	//log.Println(len(ms))
	//for i := range ms {
	//	log.Println(ms[i].when, ms[i].depth)
	//}

	log.Printf("info: measurements: %d\n", len(ms))
	if len(ms) > 1 {
		log.Printf("info: first: %v\n", ms[0].when)
		log.Printf("info: last: %v\n", ms[len(ms)-1].when)
		log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours())
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		ldc := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getValue,
		)

		ranges := ms.classify(
			pfrom, pto,
			breaks,
			access,
		)

		// Round to full days
		ldcRounded := common.RoundToFullDays(ldc)
		rangesRounded := common.RoundToFullDays(ranges)

		record[0] = label
		for i, v := range ldcRounded {
			record[i+1] = fmt.Sprintf("%d", v)
		}

		for i, d := range rangesRounded {
			record[3+i] = fmt.Sprintf("%d", d)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){
	monthly,
	quarterly,
	yearly,
}

func monthly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 1, 0)
		label := fmt.Sprintf("%02d-%d", f.Month(), f.Year())
		return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label
	}
}

func quarterly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 3, 0)
		label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year())
		return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label
	}
}

func yearly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(1, 0, 0)
		label := fmt.Sprintf("%d", f.Year())
		return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label
	}
}