view pkg/controllers/fwa.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019, 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectBottlenecksLimitingSQL = `
SELECT
  lower(validity),
  upper(validity),
  limiting
FROM
  waterway.bottlenecks
WHERE
  bottleneck_id = $1 AND
  validity && tstzrange($2, $3)`

	selectSymbolBottlenecksSQL = `
SELECT
  distinct(b.bottleneck_id)
FROM
  %s s, waterway.bottlenecks b
WHERE
  ST_Intersects(b.area, s.area)
  AND s.name = $1
  AND b.validity && tstzrange($2, $3)`

	selectLDCsSQL = `
SELECT
  lower(grwl.validity),
  upper(grwl.validity),
  grwl.value
FROM
  waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
WHERE
  grwl.depth_reference like 'LDC%'
  AND bns.bottleneck_id = $1
  AND grwl.validity && tstzrange($2, $3)`

	selectMeasurementsSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.bottleneck_id
  WHERE
    bn.validity @> efa.measure_date AND
    bn.bottleneck_id = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date`
)

type (
	timeRange struct {
		lower time.Time
		upper time.Time
	}

	ldc struct {
		timeRange
		value []float64
	}

	ldcs []*ldc

	limitingFactor int

	limitingValidity struct {
		timeRange
		limiting limitingFactor
		ldcs     ldcs
	}

	limitingValidities []limitingValidity

	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement

	bottleneck struct {
		id           string
		validities   limitingValidities
		measurements availMeasurements
	}

	bottlenecks []bottleneck

	fwaMode int
)

const (
	fwaMonthly fwaMode = iota
	fwaQuarterly
	fwaYearly
)

const (
	limitingDepth limitingFactor = iota
	limitingWidth
)

var limitingAccess = [...]func(*availMeasurement) float64{
	limitingDepth: (*availMeasurement).getDepth,
	limitingWidth: (*availMeasurement).getWidth,
}

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (ls ldcs) find(from, to time.Time) *ldc {
	for _, l := range ls {
		if l.intersects(from, to) {
			return l
		}
	}
	return nil
}

func fairwayAvailability(rw http.ResponseWriter, req *http.Request) {

	from, to, ok := parseFromTo(rw, req)
	if !ok {
		return
	}

	vars := mux.Vars(req)
	name := vars["name"]
	if name == "" {
		http.Error(rw, "missing 'name' parameter.", http.StatusBadRequest)
		return
	}

	los, ok := parseFormInt(rw, req, "los", 3)
	if !ok {
		return
	}

	ctx := req.Context()
	conn := middleware.GetDBConn(req)

	var bns bottlenecks
	var err error

	switch vars["kind"] {
	case "bottleneck":
		bns = bottlenecks{{id: name}}
	case "stretch":
		bns, err = loadSymbolBottlenecks(ctx, conn, "users.stretches", name, from, to)
	case "section":
		bns, err = loadSymbolBottlenecks(ctx, conn, "waterway.sections", name, from, to)
	default:
		http.Error(rw, "Invalid kind type.", http.StatusBadRequest)
		return
	}

	if err != nil {
		log.Errorf("%v\n", err)
		http.Error(rw, "cannot extract bottlenecks", http.StatusBadRequest)
		return
	}

	// If there are no bottlenecks there is nothing to do.
	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	// load validities and limiting factors
	for i := range bns {
		if err := bns[i].loadLimitingValidities(ctx, conn, from, to); err != nil {
			log.Errorf("%v\n", err)
			http.Error(rw, "cannot load validities", http.StatusInternalServerError)
			return
		}
		// load LCDs
		if err := bns[i].loadLDCs(ctx, conn, from, to); err != nil {
			log.Errorf("%v\n", err)
			http.Error(rw, "cannot load LDCs", http.StatusInternalServerError)
			return
		}
		// load values
		if err := bns[i].loadValues(ctx, conn, from, to, los); err != nil {
			log.Errorf("%v\n", err)
			http.Error(rw, "cannot load values", http.StatusInternalServerError)
			return
		}
	}

	// separate breaks for depth and width
	breaks, ok := parseBreaks(rw, req, "breaks", afdRefs)
	if !ok {
		return
	}
	depthBreaks, ok := parseBreaks(rw, req, "depthbreaks", breaks)
	if !ok {
		return
	}
	widthBreaks, ok := parseBreaks(rw, req, "widthbreaks", breaks)
	if !ok {
		return
	}

	chooseBreaks := [...][]float64{
		limitingDepth: depthBreaks,
		limitingWidth: widthBreaks,
	}

	useDepth := bns.hasLimiting(limitingDepth, from, to)
	useWidth := bns.hasLimiting(limitingWidth, from, to)

	if useDepth && useWidth && len(widthBreaks) != len(depthBreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthBreaks), len(depthBreaks)),
			http.StatusBadRequest,
		)
		return
	}

	availability := vars["type"] == "availability"

	var record []string
	if !availability {
		// in days
		record = makeHeader(useDepth && useWidth, 1, breaks, 'd')
	} else {
		// percentage
		record = makeHeader(useDepth && useWidth, 3, breaks, '%')
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Errorf("%v\n", err)
		return
	}

	for i := range record[1:] {
		record[i+1] = "0"
	}

	// For every day on every bottleneck we need to find out if this day is valid.
	validities := make([]func(time.Time, time.Time) *limitingValidity, len(bns))
	for i := range bns {
		validities[i] = bns[i].validities.find()
	}

	// Mode reflects if we use monthly, quarterly od yearly intervals.
	mode := parseFWAMode(req.FormValue("mode"))

	label, finish := interval(mode, from)

	var (
		totalDays, overLDCDays int
		missingLDCs            = make([]int, len(validities))
		counters               = make([]int, len(breaks)+1)
	)

	var current, next time.Time

	write := func() error {
		record[0] = label(current)

		if !availability {
			record[1] = strconv.Itoa(totalDays - overLDCDays)
			record[2] = strconv.Itoa(overLDCDays)
			for i, c := range counters {
				record[3+i] = strconv.Itoa(c)
			}
		} else {
			overPerc := float64(overLDCDays) * 100 / float64(totalDays)
			record[1] = fmt.Sprintf("%.3f", 100-overPerc)
			record[2] = fmt.Sprintf("%.3f", overPerc)
			for i, c := range counters {
				perc := float64(c) * 100 / float64(totalDays)
				record[3+i] = fmt.Sprintf("%.3f", perc)
			}
		}

		return out.Write(record)
	}

	// Stop yesterday
	end := common.MinTime(common.Dusk(time.Now()).Add(-time.Nanosecond), to)

	// We step through the time in steps of one day.
	for current = from; current.Before(end); {

		next = current.AddDate(0, 0, 1)

		// Assume that a bottleneck is over LDC.
		overLDC := true
		lowest := len(counters) - 1

		var hasValid bool

		// check all bottlenecks
		for i, validity := range validities {

			// Check if bottleneck is available for this day.
			vs := validity(current, next)
			if vs == nil {
				continue
			}

			// Let's see if we have a LDC for this day.
			ldc := vs.ldcs.find(current, next)
			if ldc == nil {
				missingLDCs[i]++
				continue
			}

			hasValid = true

			if overLDC { // If its already not shipable we need no further tests.
				result := bns[i].measurements.classify(
					current, next,
					ldc.value,
					(*availMeasurement).getValue)

				if (result[0] != 0 || result[1] != 0) && result[1] < 12*time.Hour {
					overLDC = false
				}
			}

			classes := bns[i].measurements.classify(
				current, next,
				chooseBreaks[vs.limiting],
				limitingAccess[vs.limiting])

			if min := minClass(classes, 12*time.Hour); min < lowest {
				lowest = min
			}
		}

		if hasValid {
			if overLDC {
				overLDCDays++
			}
			counters[lowest]++
		} else { // assume that all is in best conditions
			overLDCDays++
			counters[len(counters)-1]++
		}

		totalDays++

		if finish(next) {
			if err := write(); err != nil {
				// Too late for HTTP status message.
				log.Errorf("%v\n", err)
				return
			}

			// Reset counters
			overLDCDays, totalDays = 0, 0
			for i := range counters {
				counters[i] = 0
			}
		}

		current = next
	}

	// Write rest if last period was not finished.
	if totalDays > 0 {
		if err := write(); err != nil {
			// Too late for HTTP status message.
			log.Errorf("%v\n", err)
			return
		}
	}

	for i, days := range missingLDCs {
		if missingLDCs[i] > 0 {
			log.Warnf("warn: Missing LDCs for %s on %d days.\n",
				bns[i].id, days)
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Errorf("%v\n", err)
	}
}

func minClass(classes []time.Duration, threshold time.Duration) int {
	var sum time.Duration
	for i, v := range classes {
		if sum += v; sum >= threshold {
			return i
		}
	}
	return len(classes) - 1
}

func parseFromTo(
	rw http.ResponseWriter,
	req *http.Request,
) (time.Time, time.Time, bool) {
	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return time.Time{}, time.Time{}, false
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return time.Time{}, time.Time{}, false
	}

	from, to = common.OrderTime(from, to)
	// Operate on daily basis so go to full days.
	return common.Dusk(from), common.Dawn(to), true
}

func parseFWAMode(mode string) fwaMode {
	switch strings.ToLower(mode) {
	case "monthly":
		return fwaMonthly
	case "quarterly":
		return fwaQuarterly
	case "yearly":
		return fwaYearly
	default:
		return fwaMonthly
	}
}

func breaksToReferenceValue(breaks string) ([]float64, error) {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		v, err := strconv.ParseFloat(part, 64)
		if err != nil {
			return nil, err
		}
		values = append(values, v)
	}

	return common.DedupFloat64s(values), nil
}

func parseBreaks(
	rw http.ResponseWriter, req *http.Request,
	parameter string,
	defaults []float64,
) ([]float64, bool) {

	breaks := strings.TrimSpace(req.FormValue(parameter))
	if breaks == "" {
		return defaults, true
	}

	defaults, err := breaksToReferenceValue(breaks)
	if err != nil {
		msg := fmt.Sprintf("Parameter '%s' is invalid: %s.", parameter, err)
		log.Errorf("%s\n", msg)
		http.Error(rw, msg, http.StatusBadRequest)
		return nil, false
	}

	return defaults, true
}

func (tr *timeRange) intersects(from, to time.Time) bool {
	return !(to.Before(tr.lower) || from.After(tr.upper))
}

func (tr *timeRange) toUTC() {
	tr.lower = tr.lower.UTC()
	tr.upper = tr.upper.UTC()
}

func (lvs limitingValidities) find() func(from, to time.Time) *limitingValidity {

	var last *limitingValidity

	return func(from, to time.Time) *limitingValidity {
		if last != nil && last.intersects(from, to) {
			return last
		}
		for i := range lvs {
			if lv := &lvs[i]; lv.intersects(from, to) {
				last = lv
				return lv
			}
		}
		return nil
	}
}

func (lvs limitingValidities) hasLimiting(limiting limitingFactor, from, to time.Time) bool {
	for i := range lvs {
		if lvs[i].limiting == limiting && lvs[i].intersects(from, to) {
			return true
		}
	}
	return false
}

func (bns bottlenecks) hasLimiting(limiting limitingFactor, from, to time.Time) bool {
	for i := range bns {
		if bns[i].validities.hasLimiting(limiting, from, to) {
			return true
		}
	}
	return false
}

func parseLimitingFactor(limiting string) limitingFactor {
	switch limiting {
	case "depth":
		return limitingDepth
	case "width":
		return limitingWidth
	default:
		log.Warnf("unknown limitation '%s'. default to 'depth'\n", limiting)
		return limitingDepth
	}
}

func loadLimitingValidities(
	ctx context.Context,
	conn *sql.Conn,
	bottleneckID string,
	from, to time.Time,
) (limitingValidities, error) {

	var lvs limitingValidities

	rows, err := conn.QueryContext(
		ctx,
		selectBottlenecksLimitingSQL,
		bottleneckID,
		from, to)

	if err != nil {
		return nil, err
	}
	defer rows.Close()

	for rows.Next() {
		var (
			lv       limitingValidity
			limiting string
			upper    sql.NullTime
		)
		if err := rows.Scan(
			&lv.lower,
			&upper,
			&limiting,
		); err != nil {
			return nil, err
		}
		if upper.Valid {
			lv.upper = upper.Time
		} else {
			lv.upper = to.Add(24 * time.Hour)
		}
		lv.toUTC()
		lv.limiting = parseLimitingFactor(limiting)
		lvs = append(lvs, lv)
	}

	return lvs, rows.Err()
}

func loadSymbolBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	what, name string,
	from, to time.Time,
) (bottlenecks, error) {

	rows, err := conn.QueryContext(
		ctx,
		fmt.Sprintf(selectSymbolBottlenecksSQL, what),
		name,
		from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var b bottleneck
		if err := rows.Scan(&b.id); err != nil {
			return nil, err
		}
		bns = append(bns, b)
	}

	return bns, rows.Err()
}

func (bn *bottleneck) loadLimitingValidities(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
) error {
	vs, err := loadLimitingValidities(
		ctx,
		conn,
		bn.id,
		from, to)
	if err == nil {
		bn.validities = vs
	}
	return err
}

func (bn *bottleneck) loadLDCs(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
) error {
	rows, err := conn.QueryContext(
		ctx, selectLDCsSQL,
		bn.id,
		from, to)
	if err != nil {
		return err
	}
	defer rows.Close()
	for rows.Next() {
		l := ldc{value: []float64{0}}
		var upper sql.NullTime
		if err := rows.Scan(&l.lower, &upper, &l.value[0]); err != nil {
			return err
		}
		if upper.Valid {
			l.upper = upper.Time
		} else {
			l.upper = to.Add(24 * time.Hour)
		}
		l.toUTC()
		for i := range bn.validities {
			vs := &bn.validities[i]

			if vs.intersects(l.lower, l.upper) {
				vs.ldcs = append(vs.ldcs, &l)
			}
		}
	}
	return rows.Err()
}

func (bn *bottleneck) loadValues(
	ctx context.Context,
	conn *sql.Conn,
	from, to time.Time,
	los int,
) error {
	rows, err := conn.QueryContext(
		ctx, selectMeasurementsSQL,
		bn.id,
		los,
		from, to)
	if err != nil {
		return err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}
	if err := rows.Err(); err != nil {
		return err
	}
	bn.measurements = ms
	return nil
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return result
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return result
		}

		if p2.when.Before(from) {
			continue
		}

		// TODO: Discuss if we want somethinh like this.
		if false && p2.when.Sub(p1.when).Hours() > 1.5 {
			// Don't interpolate ranges bigger then one and a half hour
			continue
		}

		lo, hi := common.MaxTime(p1.when, from), common.MinTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := common.OrderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = common.MaxTime(start, lo), common.MinTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return result
}

func interval(mode fwaMode, t time.Time) (
	label func(time.Time) string,
	finish func(time.Time) bool,
) {
	switch mode {
	case fwaMonthly:
		label, finish = monthLabel, otherMonth(t)
	case fwaQuarterly:
		label, finish = quarterLabel, otherQuarter(t)
	case fwaYearly:
		label, finish = yearLabel, otherYear(t)
	default:
		panic("Unknown mode")
	}
	return
}

func monthLabel(t time.Time) string {
	return fmt.Sprintf("%02d-%d", t.Month(), t.Year())
}

func quarterLabel(t time.Time) string {
	return fmt.Sprintf("Q%d-%d", (int(t.Month())-1)/3+1, t.Year())
}

func yearLabel(t time.Time) string {
	return strconv.Itoa(t.Year())
}

func otherMonth(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := t.Day() == x.Day()
		if flag {
			t = x
		}
		return flag
	}
}

func otherQuarter(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := (t.Month()-1)/3 != (x.Month()-1)/3
		if flag {
			t = x
		}
		return flag
	}
}

func otherYear(t time.Time) func(time.Time) bool {
	return func(x time.Time) bool {
		flag := t.Year() != x.Year()
		if flag {
			t = x
		}
		return flag
	}
}

func makeHeader(flag bool, prec int, breaks []float64, unit rune) []string {
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = fmt.Sprintf("# < LDC [%c]", unit)
	record[2] = fmt.Sprintf("# >= LDC [%c]", unit)
	for i, v := range breaks {
		if flag {
			if i == 0 {
				record[3] = fmt.Sprintf("# < break_1 [%c]", unit)
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%c]", i+1, unit)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.*f [%c]", prec, v, unit)
			}
			record[i+4] = fmt.Sprintf("# >= %.*f [%c]", prec, v, unit)
		}
	}
	return record
}