view pkg/controllers/bottlenecks.go @ 4343:63c25eb9c07c

FA: be optimistic about missing data. According to clarification, missing data has to be interpreted as the best case, this is, because the services do not provide data for bottlenecks, which are not considered a limitating factor on the water way at a given time.
author Sascha Wilde <wilde@intevation.de>
date Fri, 06 Sep 2019 17:22:42 +0200
parents f543f9d4a0b5
children 6ac94171a994
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectLimitingSQL = `
SELECT limiting FROM waterway.bottlenecks bn
  WHERE bn.validity @> current_timestamp AND objnam = $1
`

	selectAvailableDepthSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.bottleneck_id
  WHERE
    bn.validity @> current_timestamp AND
    bn.objnam = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date
`

	selectGaugeLDCSQL = `
SELECT
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity @> current_timestamp
WHERE bns.validity @> current_timestamp
  AND bns.objnam = $1
  AND grwl.depth_reference like 'LDC%'
`
)

type (
	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement
)

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func limitingFactor(limiting string) func(*availMeasurement) float64 {
	switch limiting {
	case "depth":
		return (*availMeasurement).getDepth
	case "width":
		return (*availMeasurement).getWidth
	default:
		log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting)
		return (*availMeasurement).getDepth
	}
}

// According to clarification, it has to be assumed, that at times
// with no data, the best case (which by convention is the highest
// class created by classify()) should be assumed.  That is due to the
// fact, that at times where bottlenecks are ot a limiting factor on
// the waterway, services don't brovide any data for the bottleneck in
// question.
//
// FIXME: A potentional improvement could be to intercest the time
// ranges with the time ranges where bottlenecks were "active" (this
// _might_ be derivable fromt the validity periods in the bottleneck
// data.  So it _might_ be possible do detect actual missing data (BN
// valid, but no data from FA service).  Anyway, this is left out for
// now, as many clarification regarding the base assumtions would bee
// needed and the results still might be unrelyable.
func optimisticPadClassification(
	from, to time.Time,
	classified []time.Duration,
) []time.Duration {
	var actualDuration time.Duration
	maxDuration := to.Sub(from)

	for i := 0; i < len(classified); i++ {
		actualDuration += classified[i]
	}

	delta := maxDuration - actualDuration
	if delta > 0 {
		log.Printf("info: time interval: (%v - %v)\n", from, to)
		log.Printf("info: found only data for %.2f hours, padding by %.2f hours\n",
			actualDuration.Hours(), delta.Hours())
		classified[len(classified)-1] = classified[len(classified)-1] + delta
	}
	
	return classified
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return optimisticPadClassification(from, to, result)
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return optimisticPadClassification(from, to, result)
		}

		if p2.when.Before(from) {
			continue
		}

		lo, hi := maxTime(p1.when, from), minTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := orderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = maxTime(start, lo), minTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return optimisticPadClassification(from, to, result)
}

func orderTime(a, b time.Time) (time.Time, time.Time) {
	if a.Before(b) {
		return a, b
	}
	return b, a
}

func minTime(a, b time.Time) time.Time {
	if a.Before(b) {
		return a
	}
	return b
}

func maxTime(a, b time.Time) time.Time {
	if a.After(b) {
		return a
	}
	return b
}

func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 {
	percents := make([]float64, len(classes))
	total := 100 / duration.Seconds()
	for i, v := range classes {
		percents[i] = v.Seconds() * total
	}
	return percents
}

func parseFormTime(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def time.Time,
) (time.Time, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def.UTC(), true
	}
	v, err := common.ParseTime(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return time.Time{}, false
	}
	return v.UTC(), true
}

func parseFormInt(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def int,
) (int, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def, true
	}
	v, err := strconv.Atoi(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return 0, false
	}
	return v, true
}

func intervalMode(mode string) int {
	switch strings.ToLower(mode) {
	case "monthly":
		return 0
	case "quarterly":
		return 1
	case "yearly":
		return 2
	default:
		return 0
	}
}

func loadDepthValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
	los int,
	from, to time.Time,
) (availMeasurements, error) {

	rows, err := conn.QueryContext(
		ctx, selectAvailableDepthSQL, bottleneck, los, from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return nil, err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return ms, nil
}

func loadLDCReferenceValue(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]float64, error) {
	var value float64
	err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value)
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return []float64{value}, nil
}

func breaksToReferenceValue(breaks string) []float64 {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if v, err := strconv.ParseFloat(part, 64); err == nil {
			values = append(values, v)
		}
	}

	sort.Float64s(values)

	// dedup
	for i := 1; i < len(values); {
		if values[i-1] == values[i] {
			copy(values[i:], values[i+1:])
			values = values[:len(values)-1]
		} else {
			i++
		}
	}
	return values
}

func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))
	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		http.Error(
			rw,
			"Missing objnam of bottleneck",
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw,
			fmt.Sprintf("Internal server error: %v", err),
			http.StatusInternalServerError,
		)
		return
	}

	if len(ldcRefs) == 0 {
		http.Error(
			rw,
			"No gauge reference values found for bottleneck",
			http.StatusNotFound,
		)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var ms availMeasurements
	if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil {
		return
	}

	if len(ms) == 0 {
		http.Error(
			rw,
			"No available fairway depth values found",
			http.StatusNotFound,
		)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("#d < %.1f [%%]", v)
		}
		record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		lnwl := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getValue,
		)

		afd := ms.classify(
			pfrom, pto,
			breaks,
			(*availMeasurement).getDepth,
		)

		duration := pto.Sub(pfrom)
		lnwlPercents := durationsToPercentage(duration, lnwl)
		afdPercents := durationsToPercentage(duration, afd)

		record[0] = label
		for i, v := range lnwlPercents {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}
		for i, v := range afdPercents {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))

	bn := mux.Vars(req)["objnam"]
	if bn == "" {
		http.Error(
			rw, "Missing objnam of bottleneck",
			http.StatusBadRequest)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	var limiting string
	err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting)
	switch {
	case err == sql.ErrNoRows:
		http.Error(
			rw, fmt.Sprintf("Unknown limitation for %s.", bn),
			http.StatusNotFound)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	access := limitingFactor(limiting)

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	// load the measurements
	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading measurements failed: %v.", err),
			http.StatusInternalServerError)
		return
	}

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading LDC failed: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(ldcRefs) == 0 {
		http.Error(rw, "No LDC found", http.StatusNotFound)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, ldc, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("# < %.1f [h]", v)
		}
		record[i+4] = fmt.Sprintf("# >= %.1f [h]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	//log.Println(len(ms))
	//for i := range ms {
	//	log.Println(ms[i].when, ms[i].depth)
	//}

	log.Printf("info: measurements: %d\n", len(ms))
	if len(ms) > 1 {
		log.Printf("info: first: %v\n", ms[0].when)
		log.Printf("info: last: %v\n", ms[len(ms)-1].when)
		log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours())
	}

	interval := intervals[mode](from, to)

	now := time.Now()
	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		// Don't interpolate for the future
		if now.Sub(pto) < 0 {
			pto = now
		}

		ldc := ms.classify(
			pfrom, pto,
			ldcRefs,
			access,
		)

		ranges := ms.classify(
			pfrom, pto,
			breaks,
			access,
		)

		record[0] = label
		for i, v := range ldc {
			record[i+1] = fmt.Sprintf("%.3f", v.Hours())
		}

		for i, d := range ranges {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours())
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){
	monthly,
	quarterly,
	yearly,
}

func monthly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 1, 0)
		label := fmt.Sprintf("%02d-%d", f.Month(), f.Year())
		return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label
	}
}

func quarterly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 3, 0)
		label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year())
		return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label
	}
}

func yearly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(1, 0, 0)
		label := fmt.Sprintf("%d", f.Year())
		return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label
	}
}