view pkg/controllers/bottlenecks.go @ 3373:27aca46771b5

Waterlevel calculation: fixed labels of quarters (again).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 22 May 2019 13:14:55 +0200
parents ea49febfbc60
children 7d8c41cc50f5
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectAvailableDepthSQL = `
WITH data AS (
  SELECT
    efa.measure_date,
    efa.available_depth_value,
    efa.water_level_value
  FROM waterway.effective_fairway_availability efa
  JOIN waterway.fairway_availability fa
    ON efa.fairway_availability_id = fa.id
  JOIN waterway.bottlenecks bn
    ON fa.bottleneck_id = bn.id
  WHERE
    bn.objnam = $1 AND
    efa.level_of_service = $2 AND
    efa.measure_type = 'Measured' AND
    efa.available_depth_value IS NOT NULL AND
    efa.water_level_value IS NOT NULL
),
before AS (
  SELECT * FROM data WHERE measure_date < $3
  ORDER BY measure_date DESC LIMIT 1
),
inside AS (
  SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4
),
after AS (
  SELECT * FROM data WHERE measure_date > $4
  ORDER BY measure_date LIMIT 1
)
SELECT * FROM before
UNION ALL
SELECT * FROM inside
UNION ALL
SELECT * FROM after
ORDER BY measure_date
`

	selectGaugeLevelsSQL = `
SELECT
  grwl.depth_reference,
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity = bns.gauge_validity
WHERE bns.objnam = $1 AND (
  grwl.depth_reference like 'HDC%' OR
  grwl.depth_reference like 'LDC%' OR
  grwl.depth_reference like 'MW%'
)
`
	selectGaugeLDCSQL = `
SELECT
  grwl.value
FROM waterway.gauges_reference_water_levels grwl
  JOIN waterway.bottlenecks bns
    ON grwl.location = bns.gauge_location
      AND grwl.validity = bns.gauge_validity
WHERE bns.objnam = $1 AND grwl.depth_reference like 'LDC%'
`
)

type (
	referenceValue struct {
		level int
		value float64
	}

	availMeasurement struct {
		when  time.Time
		depth int
		value int
	}

	availMeasurements []availMeasurement
)

// afdRefs are the typical available fairway depth reference values.
var afdRefs = []referenceValue{
	{0, 200},
	{1, 230},
	{2, 250},
}

func (measurement *availMeasurement) getDepth() float64 {
	return float64(measurement.depth)
}

func (measurement *availMeasurement) getValue() float64 {
	return float64(measurement.value)
}

func (measurements availMeasurements) classify(
	from, to time.Time,
	breaks []referenceValue,
	access func(*availMeasurement) float64,
) []time.Duration {

	if len(breaks) == 0 {
		return []time.Duration{}
	}

	result := make([]time.Duration, len(breaks)+1)
	classes := make([]float64, len(breaks)+2)
	values := make([]time.Time, len(classes))

	// Add sentinels
	classes[0] = breaks[0].value - 9999
	classes[len(classes)-1] = breaks[len(breaks)-1].value + 9999
	for i := range breaks {
		classes[i+1] = breaks[i].value
	}

	for i := 0; i < len(measurements)-1; i++ {
		p1 := &measurements[i]
		p2 := &measurements[i+1]

		if p1.when.After(to) {
			return result
		}

		if p2.when.Before(from) {
			continue
		}

		lo, hi := maxTime(p1.when, from), minTime(p2.when, to)

		m1, m2 := access(p1), access(p2)
		if m1 == m2 { // The whole interval is in only one class.
			for j := 0; j < len(classes)-1; j++ {
				if classes[j] <= m1 && m1 <= classes[j+1] {
					result[j] += hi.Sub(lo)
					break
				}
			}
			continue
		}

		f := common.InterpolateTime(
			p1.when, m1,
			p2.when, m2,
		)

		for j, c := range classes {
			values[j] = f(c)
		}

		for j := 0; j < len(values)-1; j++ {
			start, end := orderTime(values[j], values[j+1])

			if start.After(hi) || end.Before(lo) {
				continue
			}

			start, end = maxTime(start, lo), minTime(end, hi)
			result[j] += end.Sub(start)
		}
	}

	return result
}

func orderTime(a, b time.Time) (time.Time, time.Time) {
	if a.Before(b) {
		return a, b
	}
	return b, a
}

func minTime(a, b time.Time) time.Time {
	if a.Before(b) {
		return a
	}
	return b
}

func maxTime(a, b time.Time) time.Time {
	if a.After(b) {
		return a
	}
	return b
}

func durationsToPercentage(from, to time.Time, classes []time.Duration) []float64 {
	percents := make([]float64, len(classes))
	total := 100 / to.Sub(from).Seconds()
	for i, v := range classes {
		percents[i] = v.Seconds() * total
	}
	return percents
}

func parseTime(s, what string) (time.Time, error) {
	var t time.Time
	var err error
	if t, err = time.Parse(common.TimeFormat, s); err != nil {
		return time.Time{}, JSONError{
			Code: http.StatusBadRequest,
			Message: fmt.Sprintf(
				"Invalid time format for '%s' field: %v", what, err),
		}
	}
	return t.UTC(), nil
}

func parseInt(s, what string) (int, error) {
	i, err := strconv.Atoi(s)
	if err != nil {
		return 0, JSONError{
			Code: http.StatusBadRequest,
			Message: fmt.Sprintf(
				"Invalid value for field '%s': %v", what, err),
		}
	}
	return i, nil
}

func loadDepthValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
	los int,
	from, to time.Time,
) (availMeasurements, error) {

	rows, err := conn.QueryContext(
		ctx, selectAvailableDepthSQL, bottleneck, los, from, to)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ms availMeasurements

	for rows.Next() {
		var m availMeasurement
		if err := rows.Scan(&m.when, &m.depth, &m.value); err != nil {
			return nil, err
		}
		m.when = m.when.UTC()
		ms = append(ms, m)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return ms, nil
}

func loadLDCReferenceValue(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]referenceValue, error) {

	var value float64
	err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value)
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	log.Printf("info: LDC = %.2f\n", value)
	return []referenceValue{{0, value}}, nil
}

func loadLNWLReferenceValues(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string,
) ([]referenceValue, error) {
	rows, err := conn.QueryContext(ctx, selectGaugeLevelsSQL, bottleneck)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var levels []referenceValue

loop:
	for rows.Next() {
		var what string
		var value int
		if err := rows.Scan(&what, &value); err != nil {
			return nil, err
		}
		var level int
		switch {
		case strings.HasPrefix(what, "LDC"):
			level = 0
		case strings.HasPrefix(what, "MW"):
			level = 1
		case strings.HasPrefix(what, "HDC"):
			level = 2
		default:
			return nil, fmt.Errorf("Unexpected reference level type '%s'", what)
		}
		for i := range levels {
			if levels[i].level == level {
				levels[i].value = float64(value)
				continue loop
			}
		}
		levels = append(levels, referenceValue{
			level: level,
			value: float64(value),
		})
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	sort.Slice(levels, func(i, j int) bool {
		return levels[i].level < levels[j].level
	})

	return levels, nil
}

func bottleneckAvailabilty(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {
	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		err = JSONError{
			Code:    http.StatusBadRequest,
			Message: "Missing objnam of bottleneck",
		}
		return
	}

	var from, to time.Time

	if f := req.FormValue("from"); f != "" {
		if from, err = parseTime(f, "from"); err != nil {
			return
		}
	} else {
		from = time.Now().AddDate(-1, 0, 0).UTC()
	}

	if t := req.FormValue("to"); t != "" {
		if to, err = parseTime(t, "to"); err != nil {
			return
		}
	} else {
		to = from.AddDate(1, 0, 0).UTC()
	}

	if to.Before(from) {
		to, from = from, to
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var los int
	if l := req.FormValue("los"); l != "" {
		if los, err = parseInt(l, "los"); err != nil {
			return
		}
	} else {
		los = 1
	}

	ctx := req.Context()

	var lnwlRefs []referenceValue
	if lnwlRefs, err = loadLNWLReferenceValues(ctx, conn, bn); err != nil {
		return
	}

	if len(lnwlRefs) == 0 {
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: "No gauge reference values found for bottleneck",
		}
		return
	}

	var ms availMeasurements
	if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil {
		return
	}

	if len(ms) == 0 {
		err = JSONError{
			Code:    http.StatusNotFound,
			Message: "No available fairway depth values found",
		}
		return
	}

	lnwl := ms.classify(
		from, to,
		lnwlRefs,
		(*availMeasurement).getValue,
	)

	afd := ms.classify(
		from, to,
		afdRefs,
		(*availMeasurement).getDepth,
	)

	lnwlPercents := durationsToPercentage(from, to, lnwl)
	afdPercents := durationsToPercentage(from, to, afd)

	type lnwlOutput struct {
		Level   string  `json:"level"`
		Value   float64 `json:"value"`
		Percent float64 `json:"percent"`
	}

	type afdOutput struct {
		Value   float64 `json:"value"`
		Percent float64 `json:"percent"`
	}

	type output struct {
		LNWL []lnwlOutput `json:"lnwl"`
		AFD  []afdOutput  `json:"afd"`
	}

	out := output{}

	for i := range lnwlRefs {
		var level string
		switch lnwlRefs[i].level {
		case 0:
			level = "LDC"
		case 1:
			level = "MW"
		case 2:
			level = "HDC"
		}
		out.LNWL = append(out.LNWL, lnwlOutput{
			Level:   level,
			Value:   lnwlRefs[i].value,
			Percent: lnwlPercents[i],
		})
	}

	for i := range afdRefs {
		out.AFD = append(out.AFD, afdOutput{
			Value:   afdRefs[i].value,
			Percent: afdPercents[i],
		})
	}

	jr = JSONResult{Result: &out}
	return
}

func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	bn := mux.Vars(req)["objnam"]

	if bn == "" {
		http.Error(
			rw, "Missing objnam of bottleneck",
			http.StatusBadRequest)
		return
	}

	var mode int
	if m := req.FormValue("mode"); m != "" {
		switch strings.ToLower(m) {
		case "monthly":
			mode = 0
		case "quarterly":
			mode = 1
		case "yearly":
			mode = 2
		default:
			http.Error(
				rw, fmt.Sprintf("Unknown 'mode' value %s.", m),
				http.StatusBadRequest)
			return
		}
	}

	var from, to time.Time

	if f := req.FormValue("from"); f != "" {
		var err error
		if from, err = time.Parse(common.TimeFormat, f); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'from': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		from = time.Now().AddDate(-1, 0, 0)
	}
	from = from.UTC()

	if t := req.FormValue("to"); t != "" {
		var err error
		if to, err = time.Parse(common.TimeFormat, t); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'to': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		to = from.AddDate(1, 0, 0)
	}
	to = to.UTC()

	if to.Before(from) {
		to, from = from, to
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var los int
	if l := req.FormValue("los"); l != "" {
		var err error
		if los, err = strconv.Atoi(l); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'los': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		los = 1
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	// load the measurements
	ms, err := loadDepthValues(ctx, conn, bn, los, from, to)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading measurements failed: %v.", err),
			http.StatusInternalServerError)
		return
	}

	ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("Loading LDC failed: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(ldcRefs) == 0 {
		http.Error(rw, "No LDC found", http.StatusNotFound)
		return
	}

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, classes, lnwl
	record := make([]string, 1+1+len(afdRefs)+1)
	record[0] = "#label"
	record[1] = "# >= LDC [h]"
	for i, v := range afdRefs {
		if i == 0 {
			record[2] = fmt.Sprintf("# < %.2f [h]", v.value)
		}
		record[i+3] = fmt.Sprintf("# >= %.2f [h]", v.value)
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	//log.Println(len(ms))
	//for i := range ms {
	//	log.Println(ms[i].when, ms[i].depth)
	//}

	log.Printf("info: measurements: %d\n", len(ms))
	if len(ms) > 1 {
		log.Printf("info: first: %v\n", ms[0].when)
		log.Printf("info: last: %v\n", ms[len(ms)-1].when)
		log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours())
	}

	interval := intervals[mode](from, to)

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		ranges := ms.classify(
			pfrom, pto,
			afdRefs,
			(*availMeasurement).getDepth,
		)

		ldc := ms.classify(
			pfrom, pto,
			ldcRefs,
			(*availMeasurement).getDepth,
		)

		record[0] = label
		record[1] = fmt.Sprintf("%.3f", ldc[1].Hours())

		for i, d := range ranges {
			record[2+i] = fmt.Sprintf("%.3f", d.Hours())
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){
	monthly,
	quarterly,
	yearly,
}

func monthly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 1, 0)
		label := fmt.Sprintf("%02d-%d", f.Month(), f.Year())
		return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label
	}
}

func quarterly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(0, 3, 0)
		label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year())
		return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label
	}
}

func yearly(from, to time.Time) func() (time.Time, time.Time, string) {
	pfrom := from
	return func() (time.Time, time.Time, string) {
		if pfrom.After(to) {
			return time.Time{}, time.Time{}, ""
		}
		f := pfrom
		pfrom = pfrom.AddDate(1, 0, 0)
		label := fmt.Sprintf("%d", f.Year())
		return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label
	}
}