Mercurial > gemma
view pkg/controllers/gauges.go @ 3676:f107e82b64ae
import_configuration: enhance schedule with country of user
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 17 Jun 2019 17:12:00 +0200 |
parents | ec6163c6687d |
children | 18777f6df3ef |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "math" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" "gonum.org/v1/gonum/stat" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/models" ) const ( selectPredictedObserveredSQL = ` SELECT measure_date, date_issue, predicted, water_level FROM ( SELECT location, measure_date, date_issue, false AS predicted, water_level FROM waterway.gauge_measurements UNION ALL SELECT location, measure_date, date_issue, true AS predicted, water_level FROM waterway.gauge_predictions ) AS gmp WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND measure_date BETWEEN $6::timestamp - '72hours'::interval AND $6::timestamp ORDER BY measure_date, date_issue ` selectWaterlevelsSQL = ` SELECT measure_date, water_level, value_min, value_max, predicted FROM ( SELECT location, measure_date, date_issue, water_level, NULL AS value_min, NULL AS value_max, false AS predicted FROM waterway.gauge_measurements UNION ALL SELECT location, measure_date, date_issue, water_level, lower(conf_interval) AS value_min, upper(conf_interval) AS value_max, true AS predicted FROM waterway.gauge_predictions ) AS gmp WHERE ` selectAllWaterlevelsMeasuredRangeSQL = ` SELECT min(measure_date), max(measure_date) FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done ` selectAllWaterlevelsMeasuredSQL = ` SELECT extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar AS day_month, percentile_disc(0.25) within group (order by water_level) AS q25, percentile_disc(0.5) within group (order by water_level) AS median, percentile_disc(0.75) within group (order by water_level) AS q75, avg(water_level) AS mean, min(water_level) AS min, max(water_level) AS max FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done GROUP BY extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar; ` selectYearWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done AND measure_date BETWEEN $6 AND $7 ORDER BY measure_date ` ) func float64format(v float64) string { return strconv.FormatFloat(v, 'f', -1, 64) } func nullFloat64format(v sql.NullFloat64) string { if v.Valid { return float64format(v.Float64) } return "" } func boolFormat(b bool) string { if b { return "t" } return "f" } func yearWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } year, _ := strconv.Atoi(mux.Vars(req)["year"]) conn := middleware.GetDBConn(req) ctx := req.Context() begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond) log.Printf("info: begin %s\n", begin) log.Printf("info: end %s\n", end) rows, err := conn.QueryContext( ctx, selectYearWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, begin, end, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() var values []float64 lastDay, lastMonth := -1, -1 write := func() error { var err error if len(values) > 0 { mean := stat.Mean(values, nil) _, err = fmt.Fprintf( rw, "%02d-%02d,%s\n", lastDay, lastMonth, float64format(mean)) values = values[:0] } return err } for rows.Next() { var when time.Time var value float64 if err := rows.Scan(&when, &value); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } when = when.UTC() day, month := when.Day(), int(when.Month()) if day != lastDay || month != lastMonth { if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } lastDay, lastMonth = day, month } values = append(values, value) } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. } } func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } conn := middleware.GetDBConn(req) ctx := req.Context() var begin, end pgtype.Timestamp err = conn.QueryRowContext( ctx, selectAllWaterlevelsMeasuredRangeSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ).Scan(&begin, &end) switch { case err == sql.ErrNoRows || begin.Status != pgtype.Present || end.Status != pgtype.Present: http.NotFound(rw, req) return case err != nil: http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } rows, err := conn.QueryContext( ctx, selectAllWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() type result struct { day int month int q25 float64 median float64 q75 float64 mean float64 min float64 max float64 } results := make([]result, 0, 366) start := time.Now() for rows.Next() { var r result var dayMonth string if err := rows.Scan( &dayMonth, &r.q25, &r.median, &r.q75, &r.mean, &r.min, &r.max, ); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) } parts := strings.SplitN(dayMonth, ":", 2) r.day, _ = strconv.Atoi(parts[0]) r.month, _ = strconv.Atoi(parts[1]) results = append(results, r) } if err := rows.Err(); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } log.Printf("info: loading entries took %s\n", time.Since(start)) log.Printf("info: days found: %d\n", len(results)) sort.Slice(results, func(i, j int) bool { if d := results[i].month - results[j].month; d != 0 { return d < 0 } return results[i].day < results[j].day }) rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ fmt.Sprintf("#interval: %d-%d", begin.Time.UTC().Year(), end.Time.UTC().Year()), "", "", "", "", "", "", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } record = []string{ "#date", "#min", "#max", "#mean", "#median", "#q25", "#q75", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } for i := range results { r := &results[i] record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month) record[1] = float64format(r.min) record[2] = float64format(r.max) record[3] = float64format(r.mean) record[4] = float64format(r.median) record[5] = float64format(r.q25) record[6] = float64format(r.q75) if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } func parseISRS(code string) (*models.Isrs, error) { isrs, err := models.IsrsFromString(code) if err != nil { return nil, JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: Invalid ISRS code: %v", err), } } return isrs, nil } type observedPredictedValues struct { when time.Time observed float64 predicted common.TimedValues } func loadNashSutcliffeData( ctx context.Context, conn *sql.Conn, gauge *models.Isrs, when time.Time, ) ([]observedPredictedValues, error) { var rows *sql.Rows var err error if rows, err = conn.QueryContext( ctx, selectPredictedObserveredSQL, gauge.CountryCode, gauge.LoCode, gauge.FairwaySection, gauge.Orc, gauge.Hectometre, when, ); err != nil { return nil, err } defer rows.Close() acceptedDeltas := []time.Duration{ -time.Hour * 24, -time.Hour * 48, -time.Hour * 72, } isAccepted := func(observed, predicted time.Time) bool { for _, delta := range acceptedDeltas { t := observed.Add(delta) d := predicted.Sub(t) if -10*time.Millisecond < d && d < 10*time.Millisecond { return true } } return false } var ( hasCurrent bool current observedPredictedValues values []observedPredictedValues ) for rows.Next() { var ( measureDate time.Time issueDate time.Time predicted bool value float64 ) if err := rows.Scan( &measureDate, &issueDate, &predicted, &value, ); err != nil { return nil, err } measureDate = measureDate.UTC() issueDate = issueDate.UTC() if hasCurrent { if !current.when.Equal(measureDate) { if !math.IsNaN(current.observed) && len(current.predicted) > 0 { values = append(values, current) } current = observedPredictedValues{ observed: math.NaN(), when: measureDate, } } } else { hasCurrent = true current = observedPredictedValues{ observed: math.NaN(), when: measureDate, } } if predicted { if isAccepted(measureDate, issueDate) { current.predicted = append( current.predicted, common.TimedValue{When: issueDate, Value: value}, ) } } else { current.observed = value } } if err := rows.Err(); err != nil { return nil, err } if hasCurrent && !math.IsNaN(current.observed) && len(current.predicted) > 0 { values = append(values, current) } // for i := range values { // log.Printf("%v %f %d\n", values[i].when, values[i].observed, len(values[i].predicted)) // if len(values[i].predicted) > 0 { // for j := range values[i].predicted { // log.Printf("\t%v %f\n", values[i].predicted[j].When, values[i].predicted[j].Value) // } // } // } return values, nil } func nashSutcliffe( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { gauge := mux.Vars(req)["gauge"] var isrs *models.Isrs if isrs, err = parseISRS(gauge); err != nil { return } var when time.Time if w := req.FormValue("when"); w != "" { if when, err = time.Parse(models.ImportTimeFormat, w); err != nil { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: wrong time format: %v", err), } return } } else { when = time.Now() } when = when.UTC() ctx := req.Context() var values []observedPredictedValues if values, err = loadNashSutcliffeData(ctx, conn, isrs, when); err != nil { return } log.Printf("info: found %d value(s) for Nash Sutcliffe.\n", len(values)) type coeff struct { Value float64 `json:"value"` Samples int `json:"samples"` Hours int `json:"hours"` } type coeffs struct { When models.ImportTime `json:"when"` Coeffs []coeff `json:"coeffs"` } var predicted, observed []float64 cs := make([]coeff, 3) for i := range cs { cs[i].Hours = (i + 1) * 24 delta := -time.Duration(cs[i].Hours) * time.Hour for j := range values { when := values[j].when.Add(delta) if p, ok := values[j].predicted.Find(when); ok { predicted = append(predicted, p) observed = append(observed, values[j].observed) } } cs[i].Value = common.NashSutcliffe(predicted, observed) cs[i].Samples = len(predicted) predicted = predicted[:0] observed = observed[:0] } jr = JSONResult{ Result: &coeffs{ When: models.ImportTime{Time: when}, Coeffs: cs, }, } return } func waterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } filters := filterAnd{ buildFilterTerm( "location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)", isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ), &filterOr{ &filterNot{&filterTerm{format: "predicted"}}, buildFilterTerm( `date_issue = ( SELECT max(date_issue) FROM waterway.gauge_measurements gm WHERE location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ), }, } if from := req.FormValue("from"); from != "" { fromTime, err := time.Parse(models.ImportTimeFormat, from) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } filters = append(filters, buildFilterTerm("measure_date >= $%d", fromTime)) } if to := req.FormValue("to"); to != "" { toTime, err := time.Parse(models.ImportTimeFormat, to) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } filters = append(filters, buildFilterTerm("measure_date <= $%d", toTime)) } var stmt strings.Builder var args []interface{} stmt.WriteString(selectWaterlevelsSQL) filters.serialize(&stmt, &args) conn := middleware.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext(ctx, stmt.String(), args...) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#date", "#water_level", "#value_min", "#value_max", "#predicted", } if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } for rows.Next() { var ( measureDate time.Time waterlevel float64 valueMin sql.NullFloat64 valueMax sql.NullFloat64 predicted bool ) if err := rows.Scan( &measureDate, &waterlevel, &valueMin, &valueMax, &predicted, ); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } record[0] = measureDate.Format(models.ImportTimeFormat) record[1] = float64format(waterlevel) record[2] = nullFloat64format(valueMin) record[3] = nullFloat64format(valueMax) record[4] = boolFormat(predicted) if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } }