view pkg/controllers/fwa.go @ 5250:13e1767b63a1 new-fwa

Reverted: 8c44b518141a, which didn't fix anything.
author Sascha Wilde <wilde@intevation.de>
date Tue, 12 May 2020 23:03:21 +0200
parents 8c44b518141a
children 1fce0fd81f46
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019, 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectBottlenecksLimitingSQL = `
SELECT
  lower(validity),
  upper(validity),
  limiting
FROM
  waterway.bottlenecks
WHERE
  bottleneck_id = $1 AND
  validity && tstzrange($2, $3)`

	selectSymbolBottlenecksSQL = `
SELECT
  distinct(b.bottleneck_id)
FROM
  %s s, waterway.bottlenecks b
WHERE
  ST_Intersects(b.area, s.area)
  AND s.name = $1
  AND b.validity && tstzrange($2, $3)`

	selectLDCsSQL = `
SELECT
  lower(grwl.validity),
  upper(grwl.validity),
  grwl.value
FROM
  waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
WHERE
  grwl.depth_reference like 'LDC%'
  AND bns.bottleneck_id = $1
  AND grwl.validity && tstzrange($2, $3)`

	selectMeasurementsSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.bottleneck_id
  WHERE
    bn.bottleneck_id = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date`
)

type (
	timeRange struct {
		lower time.Time
		upper time.Time
	}

	ldc struct {
		timeRange
		value []float64
	}

	ldcs []*ldc

	limitingFactor int

	limitingValidity struct {
		timeRange
		limiting limitingFactor
		ldcs     ldcs
	}

	limitingValidities []limitingValidity

	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement

	bottleneck struct {
		id           string
		validities   limitingValidities
		measurements availMeasurements
	}

	bottlenecks []bottleneck

	fwaMode int
)

const (
	fwaMonthly fwaMode = iota
	fwaQuarterly
	fwaYearly
)

const (
	limitingDepth limitingFactor = iota
	limitingWidth
)

var limitingAccess = [...]func(*availMeasurement) float64{
	limitingDepth: (*availMeasurement).getDepth,
	limitingWidth: (*availMeasurement).getWidth,
}

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (ls ldcs) find(from, to time.Time) *ldc {
	for _, l := range ls {
		if l.intersects(from, to) {
			return l
		}
	}
	return nil
}

func fairwayAvailability(rw http.ResponseWriter, req *http.Request) {

	from, to, ok := parseFromTo(rw, req)
	if !ok {
		return
	}

	vars := mux.Vars(req)
	name := vars["name"]
	if name == "" {
		http.Error(rw, "missing 'name' parameter.", http.StatusBadRequest)
		return
	}

	los, ok := parseFormInt(rw, req, "los", 3)
	if !ok {
		return
	}

	ctx := req.Context()
	conn := middleware.GetDBConn(req)

	// Function to extract the bottleneck_id's from the query.
	var extract func(context.Context, *sql.Conn, string, time.Time, time.Time) (bottlenecks, error)

	switch vars["kind"] {
	case "bottleneck":
		extract = extractBottleneck
	case "stretch":
		extract = extractStretch
	case "section":
		extract = extractSection
	default:
		http.Error(rw, "Invalid kind type.", http.StatusBadRequest)
		return
	}

	bottlenecks, err := extract(ctx, conn, name, from, to)
	if err != nil {
		log.Printf("error: %v\n", err)
		http.Error(rw, "cannot extract bottlenecks", http.StatusBadRequest)
		return
	}

	// If there are no bottlenecks there is nothing to do.
	if len(bottlenecks) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	// load validities and limiting factors
	for i := range bottlenecks {
		if err := bottlenecks[i].loadLimitingValidities(ctx, conn, from, to); err != nil {
			log.Printf("error: %v\n", err)
			http.Error(rw, "cannot load validities", http.StatusInternalServerError)
			return
		}
		// load LCDs
		if err := bottlenecks[i].loadLDCs(ctx, conn, from, to); err != nil {
			log.Printf("error: %v\n", err)
			http.Error(rw, "cannot load LDCs", http.StatusInternalServerError)
			return
		}
		// load values
		if err := bottlenecks[i].loadValues(ctx, conn, from, to, los); err != nil {
			log.Printf("error: %v\n", err)
			http.Error(rw, "cannot load values", http.StatusInternalServerError)
			return
		}
	}

	// separate breaks for depth and width
	var (
		breaks       = parseBreaks(req.FormValue("breaks"), afdRefs)
		depthBreaks  = parseBreaks(req.FormValue("depthbreaks"), breaks)
		widthBreaks  = parseBreaks(req.FormValue("widthbreaks"), breaks)
		chooseBreaks = [...][]float64{
			limitingDepth: depthBreaks,
			limitingWidth: widthBreaks,
		}

		useDepth = bottlenecks.hasLimiting(limitingDepth, from, to)
		useWidth = bottlenecks.hasLimiting(limitingWidth, from, to)
	)

	if useDepth && useWidth && len(widthBreaks) != len(depthBreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthBreaks), len(depthBreaks)),
			http.StatusBadRequest,
		)
		return
	}

	availability := vars["type"] == "availability"

	var record []string
	if !availability {
		// in days
		record = makeHeader(useDepth && useWidth, 1, breaks, 'd')
	} else {
		// percentage
		record = makeHeader(useDepth && useWidth, 3, breaks, '%')
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	for i := range record[1:] {
		record[i+1] = "0"
	}

	// For every day on every bottleneck we need to find out if this day is valid.
	validities := make([]func(time.Time, time.Time) *limitingValidity, len(bottlenecks))
	for i := range bottlenecks {
		validities[i] = bottlenecks[i].validities.find()
	}

	// Mode reflects if we use monthly, quarterly od yearly intervals.
	mode := parseFWAMode(req.FormValue("mode"))

	label, finish := interval(mode, from)

	var (
		totalDays, overLDCDays int
		missingLDCs            = make([]int, len(validities))
		counters               = make([]int, len(breaks)+1)
	)

	var current, next time.Time

	write := func() error {
		record[0] = label(current)

		if !availability {
			record[1] = strconv.Itoa(totalDays - overLDCDays)
			record[2] = strconv.Itoa(overLDCDays)
			for i, c := range counters {
				record[3+i] = strconv.Itoa(c)
			}
		} else {
			overPerc := float64(overLDCDays) * 100 / float64(totalDays)
			record[1] = fmt.Sprintf("%.3f", 100-overPerc)
			record[2] = fmt.Sprintf("%.3f", overPerc)
			for i, c := range counters {
				perc := float64(c) * 100 / float64(totalDays)
				record[3+i] = fmt.Sprintf("%.3f", perc)
			}
		}

		return out.Write(record)
	}

	// We step through the time in steps of one day.
	for current = from; current.Before(to); {

		next = current.AddDate(0, 0, 1)

		// Assume that a bottleneck is over LDC.
		overLDC := true
		highest := -1

		var hasValid bool

		// check all bottlenecks
		for i, validity := range validities {

			// Check if bottleneck is available for this day.
			vs := validity(current, next)
			if vs == nil {
				continue
			}

			// Let's see if we have a LDC for this day.
			ldc := vs.ldcs.find(current, next)
			if ldc == nil {
				missingLDCs[i]++
				continue
			}

			hasValid = true

			if overLDC { // If its already not shipable we need no further tests.
				result := bottlenecks[i].measurements.classify(
					current, next,
					ldc.value,
					(*availMeasurement).getValue)

				if result[1] < 12*time.Hour {
					overLDC = false
				}
			}

			if min := minClass(bottlenecks[i].measurements.classify(
				current, next,
				chooseBreaks[vs.limiting],
				limitingAccess[vs.limiting]),
				12*time.Hour,
			); min > highest {
				highest = min
			}
		}

		if hasValid {
			if overLDC {
				overLDCDays++
			}
			if highest > -1 {
				counters[highest]++
			}
		} else { // assume that all is in best conditions
			overLDCDays++
			counters[len(counters)-1]++
		}

		totalDays++

		if finish(next) {
			if err := write(); err != nil {
				// Too late for HTTP status message.
				log.Printf("error: %v\n", err)
				return
			}

			// Reset counters
			overLDCDays, totalDays = 0, 0
			for i := range counters {
				counters[i] = 0
			}
		}

		current = next
	}

	// Write rest if last period was not finished.
	if totalDays > 0 {
		if err := write(); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	// TODO: Log missing LDCs

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func minClass(classes []time.Duration, threshold time.Duration) int {
	var sum time.Duration
	for i, v := range classes {
		if sum += v; sum >= threshold {
			return i
		}
	}
	return -1
}

func dusk(t time.Time) time.Time {
	return time.Date(
		t.Year(),
		t.Month(),
		t.Day(),
		0, 0, 0, 0,
		t.Location())
}

func dawn(t time.Time) time.Time {
	return time.Date(
		t.Year(),
		t.Month(),
		t.Day(),
		23, 59, 59, 999999999,
		t.Location())
}

func parseFromTo(
	rw http.ResponseWriter,
	req *http.Request,
) (time.Time, time.Time, bool) {
	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return time.Time{}, time.Time{}, false
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return time.Time{}, time.Time{}, false
	}

	from, to = common.OrderTime(from, to)
	// Operate on daily basis so go to full days.
	return dusk(from), dawn(to), true
}

func parseFWAMode(mode string) fwaMode {
	switch strings.ToLower(mode) {
	case "monthly":
		return fwaMonthly
	case "quarterly":
		return fwaQuarterly
	case "yearly":
		return fwaYearly
	default:
		return fwaMonthly
	}
}

func breaksToReferenceValue(breaks string) []float64 {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if v, err := strconv.ParseFloat(part, 64); err == nil {
			values = append(values, v)
		}
	}

	return common.DedupFloat64s(values)
}

func parseBreaks(breaks string, defaults []float64) []float64 {
	if breaks != "" {
		return breaksToReferenceValue(breaks)
	}
	return defaults
}

func (tr *timeRange) intersects(from, to time.Time) bool {
	return !(to.Before(tr.lower) || from.After(tr.upper))
}

func (tr *timeRange) toUTC() {
	tr.lower = tr.lower.UTC()
	tr.upper = tr.upper.UTC()
}

func (lvs limitingValidities) find() func(from, to time.Time) *limitingValidity {

	var last *limitingValidity

	return func(from, to time.Time) *limitingValidity {
		if last != nil && last.intersects(from, to) {
			return last
		}
		for i := range lvs {
			if lv := &lvs[i]; lv.intersects(from, to) {
				last = lv
				return lv
			}
		}
		return nil
	}
}

func (lvs limitingValidities) hasLimiting(limiting limitingFactor, from, to time.Time) bool {
	for i := range lvs {
		if lvs[i].limiting == limiting && lvs[i].intersects(from, to) {
			return true
		}
	}
	return false
}

func (bns bottlenecks) hasLimiting(limiting limitingFactor, from, to time.Time) bool {
	for i := range bns {
		if bns[i].validities.hasLimiting(limiting, from, to) {
			return true
		}
	}
	return false
}

func parseLimitingFactor(limiting string) limitingFactor {
	switch limiting {
	case "depth":
		return limitingDepth
	case "width":
		return limitingWidth
	default:
		log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting)
		return limitingDepth
	}
}

func loadLimitingValidities(
	ctx context.Context,
	conn *sql.Conn,
	bottleneckID string,
	from, to time.Time,
) (limitingValidities, error) {

	var lvs limitingValidities

	rows, err := conn.QueryContext(
		ctx,
		selectBottlenecksLimitingSQL,
		bottleneckID,
		from, to)

	if err != nil {
		return nil, err
	}
	defer rows.Close()

	for rows.Next() {
		var (
			lv       limitingValidity
			limiting string
			upper    sql.NullTime
		)
		if err := rows.Scan(
			&lv.lower,
			&upper,
			&limiting,
		); err != nil {
			return nil, err
		}
		if upper.Valid {
			lv.upper = upper.Time
		} else {
			lv.upper = to.Add(24 * time.Hour)
		}
		lv.toUTC()
		lv.limiting = parseLimitingFactor(limiting)
		lvs = append(lvs, lv)
	}

	return lvs, rows.Err()
}

func loadSymbolBottlenecksFromTo(
	ctx context.Context,
	conn *sql.Conn,
	what, name string,
	from, to time.Time,
) (bottlenecks, error) {

	rows, err := conn.QueryContext(
		ctx,
		fmt.Sprintf(selectSymbolBottlenecksSQL, what),
		name,
		from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var b bottleneck
		if err := rows.Scan(&b.id); err != nil {
			return nil, err
		}
		bns = append(bns, b)
	}

	return bns, rows.Err()
}

func extractBottleneck(
	_ context.Context,
	_ *sql.Conn,
	name string,
	_, _ time.Time,
) (bottlenecks, error) {
	return bottlenecks{{id: name}}, nil
}

func extractStretch(
	ctx context.Context,
	conn *sql.Conn,
	name string,
	from, to time.Time,
) (bottlenecks, error) {
	return loadSymbolBottlenecksFromTo(
		ctx,
		conn,
		"users.stretches", name,
		from, to)
}

func extractSection(
	ctx context.Context,
	conn *sql.Conn,
	name string,
	from, to time.Time,
) (bottlenecks, error) {
	return loadSymbolBottlenecksFromTo(
		ctx,
		conn,
		"waterway.sections", name,
		from, to)
}

func (bn *bottleneck) loadLimitingValidities(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
) error {
	vs, err := loadLimitingValidities(
		ctx,
		conn,
		bn.id,
		from, to)
	if err == nil {
		bn.validities = vs
	}
	return err
}

func (bn *bottleneck) loadLDCs(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
) error {
	rows, err := conn.QueryContext(
		ctx, selectLDCsSQL,
		bn.id,
		from, to)
	if err != nil {
		return err
	}
	defer rows.Close()
	for rows.Next() {
		l := ldc{value: []float64{0}}
		var upper sql.NullTime
		if err := rows.Scan(&l.lower, &upper, &l.value[0]); err != nil {
			return err
		}
		if upper.Valid {
			l.upper = upper.Time
		} else {
			l.upper = to.Add(24 * time.Hour)
		}
		l.toUTC()
		for i := range bn.validities {
			vs := &bn.validities[i]

			if vs.intersects(l.lower, l.upper) {
				vs.ldcs = append(vs.ldcs, &l)
			}
		}
	}
	return rows.Err()
}

func (bn *bottleneck) loadValues(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
	los int,
) error {
	rows, err := conn.QueryContext(
		ctx, selectMeasurementsSQL,
		bn.id,
		los,
		from, to)
	if err != nil {
		return err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}
	if err := rows.Err(); err != nil {
		return err
	}
	bn.measurements = ms
	return nil
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return result
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return result
		}

		if p2.when.Before(from) {
			continue
		}

		// TODO: Discuss if we want somethinh like this.
		if false && p2.when.Sub(p1.when).Hours() > 1.5 {
			// Don't interpolate ranges bigger then one and a half hour
			continue
		}

		lo, hi := common.MaxTime(p1.when, from), common.MinTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := common.OrderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = common.MaxTime(start, lo), common.MinTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return result
}

func interval(mode fwaMode, t time.Time) (
	label func(time.Time) string,
	finish func(time.Time) bool,
) {
	switch mode {
	case fwaMonthly:
		label, finish = monthLabel, otherMonth(t)
	case fwaQuarterly:
		label, finish = quarterLabel, otherQuarter(t)
	case fwaYearly:
		label, finish = yearLabel, otherYear(t)
	default:
		panic("Unknown mode")
	}
	return
}

func monthLabel(t time.Time) string {
	return fmt.Sprintf("%02d-%d", t.Month(), t.Year())
}

func quarterLabel(t time.Time) string {
	return fmt.Sprintf("Q%d-%d", (int(t.Month())-1)/3+1, t.Year())
}

func yearLabel(t time.Time) string {
	return strconv.Itoa(t.Year())
}

func otherMonth(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := t.Day() == x.Day()
		if flag {
			t = x
		}
		return flag
	}
}

func otherQuarter(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := (t.Month()-1)/3 != (x.Month()-1)/3
		if flag {
			t = x
		}
		return flag
	}
}

func otherYear(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := t.Year() != x.Year()
		if flag {
			t = x
		}
		return flag
	}
}

func makeHeader(flag bool, prec int, breaks []float64, unit rune) []string {
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = fmt.Sprintf("# < LDC [%c]", unit)
	record[2] = fmt.Sprintf("# >= LDC [%c]", unit)
	for i, v := range breaks {
		if flag {
			if i == 0 {
				record[3] = fmt.Sprintf("# < break_1 [%c]", unit)
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%c]", i+1, unit)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.*f [%c]", prec, v, unit)
			}
			record[i+4] = fmt.Sprintf("# >= %.*f [%c]", prec, v, unit)
		}
	}
	return record
}