view pkg/controllers/gauges.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"math"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"
	"gonum.org/v1/gonum/stat"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectPredictedObserveredSQL = `
SELECT
  measure_date,
  date_issue,
  predicted,
  water_level
FROM (
  SELECT
    location,
    measure_date,
    date_issue,
    false AS predicted,
    water_level
  FROM waterway.gauge_measurements
  UNION ALL
  SELECT
    location,
    measure_date,
    date_issue,
    true AS predicted,
    water_level
  FROM waterway.gauge_predictions
) AS gmp
WHERE location = ($1, $2, $3, $4, $5)::isrs
  AND measure_date BETWEEN
    $6::timestamptz - '72hours'::interval AND $6::timestamptz
ORDER BY measure_date, date_issue
`
	selectWaterlevelsSQL = `
SELECT
  measure_date,
  water_level,
  value_min,
  value_max,
  predicted
FROM (
  SELECT
    location,
    measure_date,
    date_issue,
    water_level,
    NULL AS value_min,
    NULL AS value_max,
    false AS predicted
  FROM waterway.gauge_measurements
  UNION ALL
  SELECT
    location,
    measure_date,
    date_issue,
    water_level,
    lower(conf_interval) AS value_min,
    upper(conf_interval) AS value_max,
    true AS predicted
  FROM waterway.gauge_predictions
) AS gmp
WHERE
`

	selectAllWaterlevelsMeasuredRangeSQL = `
SELECT
  min(measure_date),
  max(measure_date)
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
   AND staging_done
`

	selectAllWaterlevelsMeasuredSQL = `
SELECT
    extract(day from measure_date)::varchar || ':' ||
    extract(month from measure_date)::varchar AS day_month,
  percentile_disc(0.25) within group (order by water_level) AS q25,
  percentile_disc(0.5)  within group (order by water_level) AS median,
  percentile_disc(0.75) within group (order by water_level) AS q75,
  avg(water_level) AS mean,
  min(water_level) AS min,
  max(water_level) AS max
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
   AND staging_done
GROUP BY extract(day from measure_date)::varchar || ':' ||
         extract(month from measure_date)::varchar;
`
	selectYearWaterlevelsMeasuredSQL = `
SELECT
  measure_date,
  water_level
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
  AND staging_done
  AND measure_date BETWEEN $6 AND $7
ORDER BY measure_date
`
)

func float64format(v float64) string {
	return strconv.FormatFloat(v, 'f', -1, 64)
}

func nullFloat64format(v sql.NullFloat64) string {
	if v.Valid {
		return float64format(v.Float64)
	}
	return ""
}

func boolFormat(b bool) string {
	if b {
		return "t"
	}
	return "f"
}

func yearWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	year, _ := strconv.Atoi(mux.Vars(req)["year"])

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC)
	end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond)

	log.Infof("begin %s\n", begin)
	log.Infof("end   %s\n", end)

	rows, err := conn.QueryContext(
		ctx,
		selectYearWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
		begin,
		end,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	var values []float64

	lastDay, lastMonth := -1, -1

	write := func() error {
		var err error
		if len(values) > 0 {
			mean := stat.Mean(values, nil)
			_, err = fmt.Fprintf(
				rw, "%02d-%02d,%s\n", lastDay, lastMonth,
				float64format(mean))
			values = values[:0]
		}
		return err
	}

	for rows.Next() {
		var when time.Time
		var value float64
		if err := rows.Scan(&when, &value); err != nil {
			log.Errorf("%v", err)
			// Too late for an HTTP error code.
			return
		}
		when = when.UTC()
		day, month := when.Day(), int(when.Month())
		if day != lastDay || month != lastMonth {
			if err := write(); err != nil {
				log.Errorf("%v", err)
				// Too late for an HTTP error code.
				return
			}
			lastDay, lastMonth = day, month
		}
		values = append(values, value)
	}

	if err := rows.Err(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	if err := write(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
	}
}

func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	var begin, end pgtype.Timestamp

	err = conn.QueryRowContext(
		ctx,
		selectAllWaterlevelsMeasuredRangeSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	).Scan(&begin, &end)

	switch {
	case err == sql.ErrNoRows || begin.Status != pgtype.Present || end.Status != pgtype.Present:
		http.NotFound(rw, req)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}

	rows, err := conn.QueryContext(
		ctx,
		selectAllWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	type result struct {
		day    int
		month  int
		q25    float64
		median float64
		q75    float64
		mean   float64
		min    float64
		max    float64
	}

	results := make([]result, 0, 366)

	start := time.Now()

	for rows.Next() {
		var r result
		var dayMonth string
		if err := rows.Scan(
			&dayMonth,
			&r.q25,
			&r.median,
			&r.q75,
			&r.mean,
			&r.min,
			&r.max,
		); err != nil {
			http.Error(
				rw, fmt.Sprintf("error: %v", err),
				http.StatusInternalServerError)
		}
		parts := strings.SplitN(dayMonth, ":", 2)
		r.day, _ = strconv.Atoi(parts[0])
		r.month, _ = strconv.Atoi(parts[1])
		results = append(results, r)
	}

	if err := rows.Err(); err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}

	log.Infof("loading entries took %s\n", time.Since(start))

	log.Infof("days found: %d\n", len(results))

	sort.Slice(results, func(i, j int) bool {
		if d := results[i].month - results[j].month; d != 0 {
			return d < 0
		}
		return results[i].day < results[j].day
	})

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		fmt.Sprintf("#interval: %d-%d",
			begin.Time.UTC().Year(),
			end.Time.UTC().Year()),
		"",
		"",
		"",
		"",
		"",
		"",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v\n", err)
		// Too late for an HTTP error code.
		return
	}

	record = []string{
		"#date",
		"#min",
		"#max",
		"#mean",
		"#median",
		"#q25",
		"#q75",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v\n", err)
		// Too late for an HTTP error code.
		return
	}

	for i := range results {
		r := &results[i]
		record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month)
		record[1] = float64format(r.min)
		record[2] = float64format(r.max)
		record[3] = float64format(r.mean)
		record[4] = float64format(r.median)
		record[5] = float64format(r.q25)
		record[6] = float64format(r.q75)
		if err := out.Write(record); err != nil {
			log.Errorf("%v\n", err)
			// Too late for an HTTP error code.
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}
}

func parseISRS(code string) (*models.Isrs, error) {
	isrs, err := models.IsrsFromString(code)
	if err != nil {
		return nil, mw.JSONError{
			Code:    http.StatusBadRequest,
			Message: fmt.Sprintf("error: Invalid ISRS code: %v", err),
		}
	}
	return isrs, nil
}

type observedPredictedValues struct {
	when      time.Time
	observed  float64
	predicted common.TimedValues
}

func loadNashSutcliffeData(
	ctx context.Context,
	conn *sql.Conn,
	gauge *models.Isrs,
	when time.Time,
) ([]observedPredictedValues, error) {

	var rows *sql.Rows
	var err error
	if rows, err = conn.QueryContext(
		ctx,
		selectPredictedObserveredSQL,
		gauge.CountryCode,
		gauge.LoCode,
		gauge.FairwaySection,
		gauge.Orc,
		gauge.Hectometre,
		when,
	); err != nil {
		return nil, err
	}
	defer rows.Close()

	acceptedDeltas := []time.Duration{
		-time.Hour * 24,
		-time.Hour * 48,
		-time.Hour * 72,
	}

	isAccepted := func(observed, predicted time.Time) bool {
		for _, delta := range acceptedDeltas {
			t := observed.Add(delta)
			d := predicted.Sub(t)
			if -10*time.Millisecond < d && d < 10*time.Millisecond {
				return true
			}
		}
		return false
	}

	var (
		hasCurrent bool
		current    observedPredictedValues
		values     []observedPredictedValues
	)

	for rows.Next() {
		var (
			measureDate time.Time
			issueDate   time.Time
			predicted   bool
			value       float64
		)
		if err := rows.Scan(
			&measureDate,
			&issueDate,
			&predicted,
			&value,
		); err != nil {
			return nil, err
		}
		measureDate = measureDate.UTC()
		issueDate = issueDate.UTC()

		if hasCurrent {
			if !current.when.Equal(measureDate) {
				if !math.IsNaN(current.observed) && len(current.predicted) > 0 {
					values = append(values, current)
				}
				current = observedPredictedValues{
					observed: math.NaN(),
					when:     measureDate,
				}
			}
		} else {
			hasCurrent = true
			current = observedPredictedValues{
				observed: math.NaN(),
				when:     measureDate,
			}
		}

		if predicted {
			if isAccepted(measureDate, issueDate) {
				current.predicted = append(
					current.predicted,
					common.TimedValue{When: issueDate, Value: value},
				)
			}
		} else {
			current.observed = value
		}
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	if hasCurrent && !math.IsNaN(current.observed) && len(current.predicted) > 0 {
		values = append(values, current)
	}

	// for i := range values {
	// 	log.Debugf("%v %f %d\n", values[i].when, values[i].observed, len(values[i].predicted))
	// 	if len(values[i].predicted) > 0 {
	// 		for j := range values[i].predicted {
	// 			log.Debugf("\t%v %f\n", values[i].predicted[j].When, values[i].predicted[j].Value)
	// 		}
	// 	}
	// }

	return values, nil
}

func nashSutcliffe(req *http.Request) (jr mw.JSONResult, err error) {

	gauge := mux.Vars(req)["gauge"]

	var isrs *models.Isrs
	if isrs, err = parseISRS(gauge); err != nil {
		return
	}

	var when time.Time
	if w := req.FormValue("when"); w != "" {
		if when, err = common.ParseTime(w); err != nil {
			err = mw.JSONError{
				Code:    http.StatusBadRequest,
				Message: fmt.Sprintf("error: wrong time format: %v", err),
			}
			return
		}
	} else {
		when = time.Now()
	}
	when = when.UTC()

	ctx := req.Context()

	var values []observedPredictedValues

	if values, err = loadNashSutcliffeData(ctx, mw.JSONConn(req), isrs, when); err != nil {
		return
	}

	log.Infof("found %d value(s) for Nash Sutcliffe.\n", len(values))

	type coeff struct {
		Value   float64 `json:"value"`
		Samples int     `json:"samples"`
		Hours   int     `json:"hours"`
	}

	type coeffs struct {
		When   models.ImportTime `json:"when"`
		Coeffs []coeff           `json:"coeffs"`
	}

	var predicted, observed []float64

	cs := make([]coeff, 3)
	for i := range cs {
		cs[i].Hours = (i + 1) * 24
		delta := -time.Duration(cs[i].Hours) * time.Hour

		for j := range values {
			when := values[j].when.Add(delta)
			if p, ok := values[j].predicted.Find(when); ok {
				predicted = append(predicted, p)
				observed = append(observed, values[j].observed)
			}
		}

		cs[i].Value = sanitizeFloat64(common.NashSutcliffe(predicted, observed))
		cs[i].Samples = len(predicted)

		predicted = predicted[:0]
		observed = observed[:0]
	}

	jr = mw.JSONResult{
		Result: &coeffs{
			When:   models.ImportTime{Time: when},
			Coeffs: cs,
		},
	}
	return
}

func sanitizeFloat64(x float64) float64 {
	switch {
	case math.IsNaN(x):
		return 0
	case math.IsInf(x, +1):
		return math.MaxFloat64
	case math.IsInf(x, -1):
		return -math.MaxFloat64
	}
	return x
}

func waterlevels(rw http.ResponseWriter, req *http.Request) {
	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	// TODO: FIXME The filter is not correct for predictions!?
	filters := filterAnd{
		buildFilterTerm(
			"location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)",
			isrs.CountryCode,
			isrs.LoCode,
			isrs.FairwaySection,
			isrs.Orc,
			isrs.Hectometre,
		),
		&filterOr{
			&filterNot{&filterTerm{format: "predicted"}},
			buildFilterTerm(
				`date_issue = (
                 SELECT max(date_issue)
                 FROM waterway.gauge_predictions gm
                 WHERE location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`,
				isrs.CountryCode,
				isrs.LoCode,
				isrs.FairwaySection,
				isrs.Orc,
				isrs.Hectometre,
			),
		},
	}

	if from := req.FormValue("from"); from != "" {
		fromTime, err := common.ParseTime(from)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		filters = append(filters, buildFilterTerm("measure_date >= $%d", fromTime))
	}

	if to := req.FormValue("to"); to != "" {
		toTime, err := common.ParseTime(to)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		filters = append(filters, buildFilterTerm("measure_date <= $%d", toTime))
	}

	var stmt strings.Builder
	var args []interface{}

	stmt.WriteString(selectWaterlevelsSQL)
	filters.serialize(&stmt, &args)

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	rows, err := conn.QueryContext(ctx, stmt.String(), args...)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		"#date",
		"#water_level",
		"#value_min",
		"#value_max",
		"#predicted",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	for rows.Next() {
		var (
			measureDate time.Time
			waterlevel  float64
			valueMin    sql.NullFloat64
			valueMax    sql.NullFloat64
			predicted   bool
		)
		if err := rows.Scan(
			&measureDate,
			&waterlevel,
			&valueMin,
			&valueMax,
			&predicted,
		); err != nil {
			log.Errorf("%v\n", err)
			// Too late for an HTTP error code.
			return
		}
		record[0] = measureDate.Format(common.TimeFormat)
		record[1] = float64format(waterlevel)
		record[2] = nullFloat64format(valueMin)
		record[3] = nullFloat64format(valueMax)
		record[4] = boolFormat(predicted)

		if err := out.Write(record); err != nil {
			log.Errorf("%v", err)
			// Too late for an HTTP error code.
			return
		}
	}

	if err := rows.Err(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}
}