view pkg/controllers/bottlenecks.go @ 3678:8f58851927c0

client: make layer factory only return new layer config for individual maps instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers. But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory reuses the same objects it created before, per map.
author Markus Kottlaender <markus@intevation.de>
date Mon, 17 Jun 2019 17:31:35 +0200
parents db87f34805fb
children 268348a58c9c
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectLimitingSQL = `
SELECT limiting from waterway.bottlenecks
  WHERE bn.validity @> current_timestamp AND objnam = $1
`

	selectAvailableDepthSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.available_width_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.id
  WHERE
    bn.validity @> current_timestamp AND
    bn.objnam = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    (efa.available_depth_value IS NOT NULL OR
     efa.available_width_value IS NOT NULL) AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date
`

	selectGaugeLDCSQL = `
SELECT
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity = bns.gauge_validity
WHERE bns.validity @> current_timestamp
  AND bns.objnam = $1
  AND grwl.depth_reference like 'LDC%'
`
)

type (
	availMeasurement struct {
		when  time.Time
		depth int16
		width int16
		value int16
	}

	availMeasurements []availMeasurement
)

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []float64{
	230,
	250,
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurement *availMeasurement) getWidth() float64 {
	return float64(measurement.width)
}

func limitingFactor(limiting string) func(*availMeasurement) float64 {
	switch limiting {
	case "depth":
		return (*availMeasurement).getDepth
	case "width":
		return (*availMeasurement).getWidth
	default:
		log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting)
		return (*availMeasurement).getDepth
	}
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []float64,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0] - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1] + 9999
	for i := range breaks {
		classes[i+1] = breaks[i]
	}

	idx := sort.Search(len(measurements), func(i int) bool {
		// All values before from can be ignored.
		return !measurements[i].when.Before(from)
	})

	if idx >= len(measurements) {
		return result
	}

	// Be safe for interpolation.
	if idx > 0 {
		idx--
	}

	measurements = measurements[idx:]

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return result
		}

		if p2.when.Before(from) {
			continue
		}

		lo, hi := maxTime(p1.when, from), minTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := orderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = maxTime(start, lo), minTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return result
}

func orderTime(a, b time.Time) (time.Time, time.Time) {
	if a.Before(b) {
		return a, b
	}
	return b, a
}

func minTime(a, b time.Time) time.Time {
	if a.Before(b) {
		return a
	}
	return b
}

func maxTime(a, b time.Time) time.Time {
	if a.After(b) {
		return a
	}
	return b
}

func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 {
	percents := make([]float64, len(classes))
	total := 100 / duration.Seconds()
	for i, v := range classes {
		percents[i] = v.Seconds() * total
	}
	return percents
}

func parseFormTime(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def time.Time,
) (time.Time, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def.UTC(), true
	}
	v, err := time.Parse(common.TimeFormat, f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return time.Time{}, false
	}
	return v.UTC(), true
}

func parseFormInt(
	rw http.ResponseWriter,
	req *http.Request,
	field string,
	def int,
) (int, bool) {
	f := req.FormValue(field)
	if f == "" {
		return def, true
	}
	v, err := strconv.Atoi(f)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err),
			http.StatusBadRequest,
		)
		return 0, false
	}
	return v, true
}

func intervalMode(mode string) int {
	switch strings.ToLower(mode) {
	case "monthly":
		return 0
	case "quarterly":
		return 1
	case "yearly":
		return 2
	default:
		return 0
	}
}

func loadDepthValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
	los int,
	from, to time.Time,
) (availMeasurements, error) {

	rows, err := conn.QueryContext(
		ctx, selectAvailableDepthSQL, bottleneck, los, from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(
			&m.when,
			&m.depth,
			&m.width,
			&m.value,
		); err != nil {
			return nil, err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return ms, nil
}

func loadLDCReferenceValue(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]float64, error) {
	var value float64
	err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value)
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return []float64{value}, nil
}

func breaksToReferenceValue(breaks string) []float64 {
	parts := strings.Split(breaks, ",")
	var values []float64

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if v, err := strconv.ParseFloat(part, 64); err == nil {
			values = append(values, v)
		}
	}

	sort.Float64s(values)

	// dedup
	for i := 1; i < len(values); {
		if values[i-1] == values[i] {
			copy(values[i:], values[i+1:])
			values = values[:len(values)-1]
		} else {
			i++
		}
	}
	return values
}

func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))
	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		http.Error(
			rw,
			"Missing objnam of bottleneck",
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw,
			fmt.Sprintf("Internal server error: %v", err),
			http.StatusInternalServerError,
		)
		return
	}

	if len(ldcRefs) == 0 {
		http.Error(
			rw,
			"No gauge reference values found for bottleneck",
			http.StatusNotFound,
		)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var ms availMeasurements
	if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil {
		return
	}

	if len(ms) == 0 {
		http.Error(
			rw,
			"No available fairway depth values found",
			http.StatusNotFound,
		)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("#d < %.1f [%%]", v)
		}
		record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	interval := intervals[mode](from, to)

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {
		lnwl := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getValue,
		)

		afd := ms.classify(
			pfrom, pto,
			breaks,
			(*availMeasurement).getDepth,
		)

		duration := pto.Sub(pfrom)
		lnwlPercents := durationsToPercentage(duration, lnwl)
		afdPercents := durationsToPercentage(duration, afd)

		record[0] = label
		for i, v := range lnwlPercents {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}
		for i, v := range afdPercents {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
	return
}

func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	mode := intervalMode(req.FormValue("mode"))

	bn := mux.Vars(req)["objnam"]
	if bn == "" {
		http.Error(
			rw, "Missing objnam of bottleneck",
			http.StatusBadRequest)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	var limiting string
	err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting)
	switch {
	case err == sql.ErrNoRows:
		http.Error(
			rw, fmt.Sprintf("Unknown limitation for %s.", bn),
			http.StatusNotFound)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	access := limitingFactor(limiting)

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	// load the measurements
	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading measurements failed: %v.", err),
			http.StatusInternalServerError)
		return
	}

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading LDC failed: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(ldcRefs) == 0 {
		http.Error(rw, "No LDC found", http.StatusNotFound)
		return
	}

	var breaks []float64
	if b := req.FormValue("breaks"); b != "" {
		breaks = breaksToReferenceValue(b)
	} else {
		breaks = afdRefs
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, ldc, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "#time"
	record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0])
	record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0])
	for i, v := range breaks {
		if i == 0 {
			record[3] = fmt.Sprintf("# < %.1f [h]", v)
		}
		record[i+4] = fmt.Sprintf("# >= %.1f [h]", v)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	//log.Println(len(ms))
	//for i := range ms {
	//	log.Println(ms[i].when, ms[i].depth)
	//}

	log.Printf("info: measurements: %d\n", len(ms))
	if len(ms) > 1 {
		log.Printf("info: first: %v\n", ms[0].when)
		log.Printf("info: last: %v\n", ms[len(ms)-1].when)
		log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours())
	}

	interval := intervals[mode](from, to)

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		ldc := ms.classify(
			pfrom, pto,
			ldcRefs,
			access,
		)

		ranges := ms.classify(
			pfrom, pto,
			breaks,
			access,
		)

		record[0] = label
		for i, v := range ldc {
			record[i+1] = fmt.Sprintf("%.3f", v.Hours())
		}

		for i, d := range ranges {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours())
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){
	monthly,
	quarterly,
	yearly,
}

func monthly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 1, 0)
		label := fmt.Sprintf("%02d-%d", f.Month(), f.Year())
		return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label
	}
}

func quarterly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 3, 0)
		label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year())
		return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label
	}
}

func yearly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(1, 0, 0)
		label := fmt.Sprintf("%d", f.Year())
		return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label
	}
}