view pkg/controllers/gauges.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 3b3cf2083730
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"math"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/pgtype"
	"gonum.org/v1/gonum/stat"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectPredictedObserveredSQL = `
SELECT
  measure_date,
  date_issue,
  predicted,
  water_level
FROM (
  SELECT
    location,
    measure_date,
    date_issue,
    false AS predicted,
    water_level
  FROM waterway.gauge_measurements
  UNION ALL
  SELECT
    location,
    measure_date,
    date_issue,
    true AS predicted,
    water_level
  FROM waterway.gauge_predictions
) AS gmp
WHERE location = ($1, $2, $3, $4, $5)::isrs
  AND measure_date BETWEEN
    $6::timestamptz - '72hours'::interval AND $6::timestamptz
ORDER BY measure_date, date_issue
`
	selectWaterlevelsSQL = `
SELECT
  measure_date,
  water_level,
  value_min,
  value_max,
  predicted
FROM (
  SELECT
    location,
    measure_date,
    date_issue,
    water_level,
    NULL AS value_min,
    NULL AS value_max,
    false AS predicted
  FROM waterway.gauge_measurements
  UNION ALL
  SELECT
    location,
    measure_date,
    date_issue,
    water_level,
    lower(conf_interval) AS value_min,
    upper(conf_interval) AS value_max,
    true AS predicted
  FROM waterway.gauge_predictions
) AS gmp
WHERE
`

	selectAllWaterlevelsMeasuredRangeSQL = `
SELECT
  min(measure_date),
  max(measure_date)
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
   AND staging_done
`

	selectAllWaterlevelsMeasuredSQL = `
SELECT
    extract(day from measure_date)::varchar || ':' ||
    extract(month from measure_date)::varchar AS day_month,
  percentile_disc(0.25) within group (order by water_level) AS q25,
  percentile_disc(0.5)  within group (order by water_level) AS median,
  percentile_disc(0.75) within group (order by water_level) AS q75,
  avg(water_level) AS mean,
  min(water_level) AS min,
  max(water_level) AS max
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
   AND staging_done
GROUP BY extract(day from measure_date)::varchar || ':' ||
         extract(month from measure_date)::varchar;
`
	selectYearWaterlevelsMeasuredSQL = `
SELECT
  measure_date,
  water_level
FROM waterway.gauge_measurements
WHERE
  location = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
   )::isrs
  AND staging_done
  AND measure_date BETWEEN $6 AND $7
ORDER BY measure_date
`
)

func float64format(v float64) string {
	return strconv.FormatFloat(v, 'f', -1, 64)
}

func nullFloat64format(v sql.NullFloat64) string {
	if v.Valid {
		return float64format(v.Float64)
	}
	return ""
}

func boolFormat(b bool) string {
	if b {
		return "t"
	}
	return "f"
}

func yearWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	year, _ := strconv.Atoi(mux.Vars(req)["year"])

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC)
	end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond)

	log.Infof("begin %s\n", begin)
	log.Infof("end   %s\n", end)

	rows, err := conn.QueryContext(
		ctx,
		selectYearWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
		begin,
		end,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	var values []float64

	lastDay, lastMonth := -1, -1

	write := func() error {
		var err error
		if len(values) > 0 {
			mean := stat.Mean(values, nil)
			_, err = fmt.Fprintf(
				rw, "%02d-%02d,%s\n", lastDay, lastMonth,
				float64format(mean))
			values = values[:0]
		}
		return err
	}

	for rows.Next() {
		var when time.Time
		var value float64
		if err := rows.Scan(&when, &value); err != nil {
			log.Errorf("%v", err)
			// Too late for an HTTP error code.
			return
		}
		when = when.UTC()
		day, month := when.Day(), int(when.Month())
		if day != lastDay || month != lastMonth {
			if err := write(); err != nil {
				log.Errorf("%v", err)
				// Too late for an HTTP error code.
				return
			}
			lastDay, lastMonth = day, month
		}
		values = append(values, value)
	}

	if err := rows.Err(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	if err := write(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
	}
}

func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	var begin, end pgtype.Timestamp

	err = conn.QueryRowContext(
		ctx,
		selectAllWaterlevelsMeasuredRangeSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	).Scan(&begin, &end)

	switch {
	case err == sql.ErrNoRows || begin.Status != pgtype.Present || end.Status != pgtype.Present:
		http.NotFound(rw, req)
		return
	case err != nil:
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}

	rows, err := conn.QueryContext(
		ctx,
		selectAllWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	type result struct {
		day    int
		month  int
		q25    float64
		median float64
		q75    float64
		mean   float64
		min    float64
		max    float64
	}

	results := make([]result, 0, 366)

	start := time.Now()

	for rows.Next() {
		var r result
		var dayMonth string
		if err := rows.Scan(
			&dayMonth,
			&r.q25,
			&r.median,
			&r.q75,
			&r.mean,
			&r.min,
			&r.max,
		); err != nil {
			http.Error(
				rw, fmt.Sprintf("error: %v", err),
				http.StatusInternalServerError)
		}
		parts := strings.SplitN(dayMonth, ":", 2)
		r.day, _ = strconv.Atoi(parts[0])
		r.month, _ = strconv.Atoi(parts[1])
		results = append(results, r)
	}

	if err := rows.Err(); err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}

	log.Infof("loading entries took %s\n", time.Since(start))

	log.Infof("days found: %d\n", len(results))

	sort.Slice(results, func(i, j int) bool {
		if d := results[i].month - results[j].month; d != 0 {
			return d < 0
		}
		return results[i].day < results[j].day
	})

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		fmt.Sprintf("#interval: %d-%d",
			begin.Time.UTC().Year(),
			end.Time.UTC().Year()),
		"",
		"",
		"",
		"",
		"",
		"",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v\n", err)
		// Too late for an HTTP error code.
		return
	}

	record = []string{
		"#date",
		"#min",
		"#max",
		"#mean",
		"#median",
		"#q25",
		"#q75",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v\n", err)
		// Too late for an HTTP error code.
		return
	}

	for i := range results {
		r := &results[i]
		record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month)
		record[1] = float64format(r.min)
		record[2] = float64format(r.max)
		record[3] = float64format(r.mean)
		record[4] = float64format(r.median)
		record[5] = float64format(r.q25)
		record[6] = float64format(r.q75)
		if err := out.Write(record); err != nil {
			log.Errorf("%v\n", err)
			// Too late for an HTTP error code.
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}
}

func parseISRS(code string) (*models.Isrs, error) {
	isrs, err := models.IsrsFromString(code)
	if err != nil {
		return nil, mw.JSONError{
			Code:    http.StatusBadRequest,
			Message: fmt.Sprintf("error: Invalid ISRS code: %v", err),
		}
	}
	return isrs, nil
}

type observedPredictedValues struct {
	when      time.Time
	observed  float64
	predicted common.TimedValues
}

func loadNashSutcliffeData(
	ctx context.Context,
	conn *sql.Conn,
	gauge *models.Isrs,
	when time.Time,
) ([]observedPredictedValues, error) {

	var rows *sql.Rows
	var err error
	if rows, err = conn.QueryContext(
		ctx,
		selectPredictedObserveredSQL,
		gauge.CountryCode,
		gauge.LoCode,
		gauge.FairwaySection,
		gauge.Orc,
		gauge.Hectometre,
		when,
	); err != nil {
		return nil, err
	}
	defer rows.Close()

	acceptedDeltas := []time.Duration{
		-time.Hour * 24,
		-time.Hour * 48,
		-time.Hour * 72,
	}

	isAccepted := func(observed, predicted time.Time) bool {
		for _, delta := range acceptedDeltas {
			t := observed.Add(delta)
			d := predicted.Sub(t)
			if -10*time.Millisecond < d && d < 10*time.Millisecond {
				return true
			}
		}
		return false
	}

	var (
		hasCurrent bool
		current    observedPredictedValues
		values     []observedPredictedValues
	)

	for rows.Next() {
		var (
			measureDate time.Time
			issueDate   time.Time
			predicted   bool
			value       float64
		)
		if err := rows.Scan(
			&measureDate,
			&issueDate,
			&predicted,
			&value,
		); err != nil {
			return nil, err
		}
		measureDate = measureDate.UTC()
		issueDate = issueDate.UTC()

		if hasCurrent {
			if !current.when.Equal(measureDate) {
				if !math.IsNaN(current.observed) && len(current.predicted) > 0 {
					values = append(values, current)
				}
				current = observedPredictedValues{
					observed: math.NaN(),
					when:     measureDate,
				}
			}
		} else {
			hasCurrent = true
			current = observedPredictedValues{
				observed: math.NaN(),
				when:     measureDate,
			}
		}

		if predicted {
			if isAccepted(measureDate, issueDate) {
				current.predicted = append(
					current.predicted,
					common.TimedValue{When: issueDate, Value: value},
				)
			}
		} else {
			current.observed = value
		}
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	if hasCurrent && !math.IsNaN(current.observed) && len(current.predicted) > 0 {
		values = append(values, current)
	}

	// for i := range values {
	// 	log.Debugf("%v %f %d\n", values[i].when, values[i].observed, len(values[i].predicted))
	// 	if len(values[i].predicted) > 0 {
	// 		for j := range values[i].predicted {
	// 			log.Debugf("\t%v %f\n", values[i].predicted[j].When, values[i].predicted[j].Value)
	// 		}
	// 	}
	// }

	return values, nil
}

func nashSutcliffe(req *http.Request) (jr mw.JSONResult, err error) {

	gauge := mux.Vars(req)["gauge"]

	var isrs *models.Isrs
	if isrs, err = parseISRS(gauge); err != nil {
		return
	}

	var when time.Time
	if w := req.FormValue("when"); w != "" {
		if when, err = common.ParseTime(w); err != nil {
			err = mw.JSONError{
				Code:    http.StatusBadRequest,
				Message: fmt.Sprintf("error: wrong time format: %v", err),
			}
			return
		}
	} else {
		when = time.Now()
	}
	when = when.UTC()

	ctx := req.Context()

	var values []observedPredictedValues

	if values, err = loadNashSutcliffeData(ctx, mw.JSONConn(req), isrs, when); err != nil {
		return
	}

	log.Infof("found %d value(s) for Nash Sutcliffe.\n", len(values))

	type coeff struct {
		Value   float64 `json:"value"`
		Samples int     `json:"samples"`
		Hours   int     `json:"hours"`
	}

	type coeffs struct {
		When   models.ImportTime `json:"when"`
		Coeffs []coeff           `json:"coeffs"`
	}

	var predicted, observed []float64

	cs := make([]coeff, 3)
	for i := range cs {
		cs[i].Hours = (i + 1) * 24
		delta := -time.Duration(cs[i].Hours) * time.Hour

		for j := range values {
			when := values[j].when.Add(delta)
			if p, ok := values[j].predicted.Find(when); ok {
				predicted = append(predicted, p)
				observed = append(observed, values[j].observed)
			}
		}

		cs[i].Value = sanitizeFloat64(common.NashSutcliffe(predicted, observed))
		cs[i].Samples = len(predicted)

		predicted = predicted[:0]
		observed = observed[:0]
	}

	jr = mw.JSONResult{
		Result: &coeffs{
			When:   models.ImportTime{Time: when},
			Coeffs: cs,
		},
	}
	return
}

func sanitizeFloat64(x float64) float64 {
	switch {
	case math.IsNaN(x):
		return 0
	case math.IsInf(x, +1):
		return math.MaxFloat64
	case math.IsInf(x, -1):
		return -math.MaxFloat64
	}
	return x
}

func waterlevels(rw http.ResponseWriter, req *http.Request) {
	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	// TODO: FIXME The filter is not correct for predictions!?
	filters := filterAnd{
		buildFilterTerm(
			"location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)",
			isrs.CountryCode,
			isrs.LoCode,
			isrs.FairwaySection,
			isrs.Orc,
			isrs.Hectometre,
		),
		&filterOr{
			&filterNot{&filterTerm{format: "predicted"}},
			buildFilterTerm(
				`date_issue = (
                 SELECT max(date_issue)
                 FROM waterway.gauge_predictions gm
                 WHERE location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`,
				isrs.CountryCode,
				isrs.LoCode,
				isrs.FairwaySection,
				isrs.Orc,
				isrs.Hectometre,
			),
		},
	}

	if from := req.FormValue("from"); from != "" {
		fromTime, err := common.ParseTime(from)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		filters = append(filters, buildFilterTerm("measure_date >= $%d", fromTime))
	}

	if to := req.FormValue("to"); to != "" {
		toTime, err := common.ParseTime(to)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		filters = append(filters, buildFilterTerm("measure_date <= $%d", toTime))
	}

	var stmt strings.Builder
	var args []interface{}

	stmt.WriteString(selectWaterlevelsSQL)
	filters.serialize(&stmt, &args)

	conn := mw.GetDBConn(req)

	ctx := req.Context()

	rows, err := conn.QueryContext(ctx, stmt.String(), args...)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		"#date",
		"#water_level",
		"#value_min",
		"#value_max",
		"#predicted",
	}

	if err := out.Write(record); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	for rows.Next() {
		var (
			measureDate time.Time
			waterlevel  float64
			valueMin    sql.NullFloat64
			valueMax    sql.NullFloat64
			predicted   bool
		)
		if err := rows.Scan(
			&measureDate,
			&waterlevel,
			&valueMin,
			&valueMax,
			&predicted,
		); err != nil {
			log.Errorf("%v\n", err)
			// Too late for an HTTP error code.
			return
		}
		record[0] = measureDate.Format(common.TimeFormat)
		record[1] = float64format(waterlevel)
		record[2] = nullFloat64format(valueMin)
		record[3] = nullFloat64format(valueMax)
		record[4] = boolFormat(predicted)

		if err := out.Write(record); err != nil {
			log.Errorf("%v", err)
			// Too late for an HTTP error code.
			return
		}
	}

	if err := rows.Err(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Errorf("%v", err)
		// Too late for an HTTP error code.
		return
	}
}