view pkg/controllers/gauges.go @ 2809:216bc6394911

Optimized longterm waterlevel statistics.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Mar 2019 12:45:50 +0100
parents 5b6de0bde6b6
children 6f435a9558f2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"gonum.org/v1/gonum/stat"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	selectPredictedObserveredSQL = `
SELECT
  a.measure_date AS measure_date,
  a.water_level  AS predicted,
  b.water_level  AS observed
FROM waterway.gauge_measurements a JOIN waterway.gauge_measurements b
  ON a.fk_gauge_id  = b.fk_gauge_id AND
     a.measure_date = b.measure_date AND
     a.predicted AND NOT b.predicted
WHERE
  a.fk_gauge_id = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
  ) AND
  a.measure_date BETWEEN
    $6::timestamp AND $6::timestamp - '72hours'::interval
ORDER BY a.measure_date
`

	selectWaterlevelsSQL = `
SELECT
  measure_date,
  water_level,
  value_min,
  value_max,
  predicted
FROM waterway.gauge_measurements
WHERE
`

	selectAllWaterlevelsMeasuredSQL = `
WITH g AS (
  SELECT (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int)::isrs loc
)
SELECT
    extract(day from measure_date)::varchar || ':' ||
    extract(month from measure_date)::varchar AS day_month,
  percentile_disc(0.25) within group (order by water_level) AS q25,
  percentile_disc(0.5)  within group (order by water_level) AS median,
  percentile_disc(0.75) within group (order by water_level) AS q75,
  avg(water_level) AS mean,
  min(water_level) AS min,
  max(water_level) AS max
FROM waterway.gauge_measurements, g
WHERE fk_gauge_id = g.loc AND NOT predicted
GROUP BY extract(day from measure_date)::varchar || ':' ||
         extract(month from measure_date)::varchar;
`
	selectYearWaterlevelsMeasuredSQL = `
SELECT
  measure_date,
  water_level
FROM waterway.gauge_measurements
WHERE
  NOT predicted
  AND fk_gauge_id = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
  )
  AND measure_date BETWEEN $6 AND $7
ORDER BY measure_date
`
	selectWaterlevelsMeasuredSQL = `
SELECT
  measure_date,
  water_level
FROM waterway.gauge_measurements
WHERE
  NOT predicted
  AND fk_gauge_id = (
    $1::char(2),
    $2::char(3),
    $3::char(5),
    $4::char(5),
    $5::int
  ) 
  AND measure_date BETWEEN
    $6::timestamp with time zone AND
    $7::timestamp with time zone
ORDER BY measure_date
`
)

func float64format(v float64) string {
	return strconv.FormatFloat(v, 'f', -1, 64)
}

func nullFloat64format(v sql.NullFloat64) string {
	if v.Valid {
		return float64format(v.Float64)
	}
	return ""
}

func boolFormat(b bool) string {
	if b {
		return "t"
	}
	return "f"
}

func yearWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	year, _ := strconv.Atoi(mux.Vars(req)["year"])

	conn := middleware.GetDBConn(req)

	ctx := req.Context()

	begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC)
	end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond)

	log.Printf("info: begin %s\n", begin)
	log.Printf("info: end   %s\n", end)

	rows, err := conn.QueryContext(
		ctx,
		selectYearWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
		begin,
		end,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	var values []float64

	lastDay, lastMonth := -1, -1

	write := func() error {
		var err error
		if len(values) > 0 {
			mean := stat.Mean(values, nil)
			_, err = fmt.Fprintf(
				rw, "%02d-%02d,%s\n", lastDay, lastMonth,
				float64format(mean))
			values = values[:0]
		}
		return err
	}

	for rows.Next() {
		var when time.Time
		var value float64
		if err := rows.Scan(&when, &value); err != nil {
			log.Printf("error: %v", err)
			// Too late for an HTTP error code.
			return
		}
		when = when.UTC()
		day, month := when.Day(), int(when.Month())
		if day != lastDay || month != lastMonth {
			if err := write(); err != nil {
				log.Printf("error: %v", err)
				// Too late for an HTTP error code.
				return
			}
			lastDay, lastMonth = day, month
		}
		values = append(values, value)
	}

	if err := rows.Err(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}

	if err := write(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
	}
}

func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) {

	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	conn := middleware.GetDBConn(req)

	ctx := req.Context()

	rows, err := conn.QueryContext(
		ctx,
		selectAllWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	type result struct {
		day    int
		month  int
		q25    float64
		median float64
		q75    float64
		mean   float64
		min    float64
		max    float64
	}

	results := make([]result, 0, 366)

	start := time.Now()

	for rows.Next() {
		var r result
		var dayMonth string
		if err := rows.Scan(
			&dayMonth,
			&r.q25,
			&r.median,
			&r.q75,
			&r.mean,
			&r.min,
			&r.max,
		); err != nil {
			http.Error(
				rw, fmt.Sprintf("error: %v", err),
				http.StatusInternalServerError)
		}
		parts := strings.SplitN(dayMonth, ":", 2)
		r.day, _ = strconv.Atoi(parts[0])
		r.month, _ = strconv.Atoi(parts[1])
		results = append(results, r)
	}

	if err := rows.Err(); err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}

	log.Printf("info: loading entries took %s\n", time.Since(start))

	log.Printf("info: days found: %d\n", len(results))

	sort.Slice(results, func(i, j int) bool {
		if d := int(results[i].month) - int(results[j].month); d != 0 {
			return d < 0
		}
		return results[i].day < results[j].day
	})

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		"#date",
		"#min",
		"#max",
		"#mean",
		"#median",
		"#q25",
		"#q75",
	}

	if err := out.Write(record); err != nil {
		log.Printf("error: %v\n", err)
		// Too late for an HTTP error code.
		return
	}

	for i := range results {
		r := &results[i]
		record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month)
		record[1] = float64format(r.min)
		record[2] = float64format(r.max)
		record[3] = float64format(r.mean)
		record[4] = float64format(r.median)
		record[5] = float64format(r.q25)
		record[6] = float64format(r.q75)
		if err := out.Write(record); err != nil {
			log.Printf("error: %v\n", err)
			// Too late for an HTTP error code.
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}
}

func averageWaterlevels(rw http.ResponseWriter, req *http.Request) {
	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	var from, to time.Time

	if t := req.FormValue("to"); t != "" {
		var err error
		if to, err = time.ParseInLocation(common.DateFormat, t, time.UTC); err != nil {
			http.Error(
				rw, fmt.Sprintf("error: bad from date: %v", err),
				http.StatusBadRequest)
			return
		}
	} else {
		y, m, d := time.Now().Date()
		to = time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
	}

	if f := req.FormValue("from"); f != "" {
		var err error
		if from, err = time.ParseInLocation(common.DateFormat, f, time.UTC); err != nil {
			http.Error(
				rw, fmt.Sprintf("error: bad from date: %v", err),
				http.StatusBadRequest)
			return
		}
	} else {
		from = to.AddDate(-1, 0, 0)
	}

	to = to.AddDate(0, 0, 1).Add(-time.Nanosecond)

	if to.Before(from) {
		from, to = to, from
	}

	conn := middleware.GetDBConn(req)

	ctx := req.Context()

	rows, err := conn.QueryContext(
		ctx,
		selectWaterlevelsMeasuredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
		from, to,
	)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	var last time.Time
	var values []float64

	record := []string{
		"#date",
		"#min",
		"#max",
		"#mean",
		"#median",
		"#q25",
		"#q75",
	}

	if err := out.Write(record); err != nil {
		log.Printf("error: %v\n", err)
		// Too late for an HTTP error code.
		return
	}

	write := func() error {
		if len(values) > 0 {
			sort.Float64s(values)
			// date
			record[0] = last.Format(common.DateFormat)
			// min
			record[1] = float64format(values[0])
			// max
			record[2] = float64format(values[len(values)-1])
			// mean
			record[3] = float64format(stat.Mean(values, nil))
			// median
			record[4] = float64format(values[len(values)/2])
			// Q25
			record[5] = float64format(
				stat.Quantile(0.25, stat.Empirical, values, nil))
			// Q75
			record[6] = float64format(
				stat.Quantile(0.75, stat.Empirical, values, nil))

			err := out.Write(record)
			values = values[:0]
			return err
		}
		return nil
	}

	for rows.Next() {
		var (
			date  time.Time
			value float64
		)
		if err := rows.Scan(&date, &value); err != nil {
			log.Printf("error: %v\n", err)
			// Too late for an HTTP error code.
			return
		}
		oy, om, od := last.Date()
		ny, nm, nd := date.Date()
		if oy != ny || om != nm || od != nd {
			if err := write(); err != nil {
				log.Printf("error: %v\n", err)
				// Too late for an HTTP error code.
				return
			}
			last = date
		} else {
			values = append(values, value)
		}
	}
	write()

	if err := rows.Err(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}

}

func nashSutcliffe(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {
	gauge := mux.Vars(req)["gauge"]

	var isrs *models.Isrs
	if isrs, err = models.IsrsFromString(gauge); err != nil {
		err = JSONError{
			Code:    http.StatusBadRequest,
			Message: fmt.Sprintf("error: Invalid ISRS code: %v", err),
		}
		return
	}

	var when time.Time
	if w := req.FormValue("when"); w != "" {
		if when, err = time.Parse(models.ImportTimeFormat, w); err != nil {
			err = JSONError{
				Code:    http.StatusBadRequest,
				Message: fmt.Sprintf("error: wrong time format: %v", err),
			}
			return
		}
	} else {
		when = time.Now()
	}

	ctx := req.Context()

	var rows *sql.Rows
	if rows, err = conn.QueryContext(
		ctx,
		selectPredictedObserveredSQL,
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
		when,
	); err != nil {
		return
	}
	defer rows.Close()

	var measurements []common.NSMeasurement

	for rows.Next() {
		var m common.NSMeasurement
		if err = rows.Scan(
			&m.When,
			&m.Predicted,
			&m.Observed,
		); err != nil {
			return
		}
		measurements = append(measurements, m)
	}
	if err = rows.Err(); err != nil {
		return
	}

	type coeff struct {
		Value   float64 `json:"value"`
		Samples int     `json:"samples"`
		Hours   int     `json:"hours"`
	}

	type coeffs struct {
		When   models.ImportTime `json:"when"`
		Coeffs []coeff           `json:"coeffs"`
	}

	cs := make([]coeff, 3)
	for i := range cs {
		cs[i].Hours = (i + 1) * 24
		cs[i].Value, cs[i].Samples = common.NashSutcliffe(
			measurements,
			when,
			when.Add(time.Duration(-cs[i].Hours)*time.Hour),
		)
	}

	jr = JSONResult{
		Result: &coeffs{
			When:   models.ImportTime{when},
			Coeffs: cs,
		},
	}
	return
}

func waterlevels(rw http.ResponseWriter, req *http.Request) {
	gauge := mux.Vars(req)["gauge"]

	isrs, err := models.IsrsFromString(gauge)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
			http.StatusBadRequest)
		return
	}

	var fb filterBuilder
	fb.stmt.WriteString(selectWaterlevelsSQL)

	fb.cond(
		" fk_gauge_id = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int) ",
		isrs.CountryCode,
		isrs.LoCode,
		isrs.FairwaySection,
		isrs.Orc,
		isrs.Hectometre,
	)

	if from := req.FormValue("from"); from != "" {
		fromTime, err := time.Parse(models.ImportTimeFormat, from)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		fb.cond("measure_date >= $%d", fromTime)
	}

	if to := req.FormValue("to"); to != "" {
		toTime, err := time.Parse(models.ImportTimeFormat, to)
		if err != nil {
			http.Error(
				rw, fmt.Sprintf("error: Invalid from time: %v", err),
				http.StatusBadRequest)
			return
		}
		fb.cond("measure_date <= $%d", toTime)
	}

	conn := middleware.GetDBConn(req)

	ctx := req.Context()

	rows, err := conn.QueryContext(ctx, fb.stmt.String(), fb.args...)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("error: %v", err),
			http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	record := []string{
		"#date",
		"#water_level",
		"#value_min",
		"#value_max",
		"#predicted",
	}

	if err := out.Write(record); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}

	for rows.Next() {
		var (
			measureDate time.Time
			waterlevel  float64
			valueMin    sql.NullFloat64
			valueMax    sql.NullFloat64
			predicted   bool
		)
		if err := rows.Scan(
			&measureDate,
			&waterlevel,
			&valueMin,
			&valueMax,
			&predicted,
		); err != nil {
			log.Printf("error: %v\n", err)
			// Too late for an HTTP error code.
			return
		}
		record[0] = measureDate.Format(models.ImportTimeFormat)
		record[1] = float64format(waterlevel)
		record[2] = nullFloat64format(valueMin)
		record[3] = nullFloat64format(valueMax)
		record[4] = boolFormat(predicted)

		if err := out.Write(record); err != nil {
			log.Printf("error: %v", err)
			// Too late for an HTTP error code.
			return
		}
	}

	if err := rows.Err(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}

	out.Flush()
	if err := out.Error(); err != nil {
		log.Printf("error: %v", err)
		// Too late for an HTTP error code.
		return
	}
}