Mercurial > gemma
view pkg/controllers/gauges.go @ 5591:0011f50cf216 surveysperbottleneckid
Removed no longer used alternative api for surveys/ endpoint.
As bottlenecks in the summary for SR imports are now identified by
their id and no longer by the (not guarantied to be unique!) name,
there is no longer the need to request survey data by the name+date
tuple (which isn't reliable anyway). So the workaround was now
reversed.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Wed, 06 Apr 2022 13:30:29 +0200 |
parents | 5f47eeea988d |
children | 6270951dda28 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "math" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" "gonum.org/v1/gonum/stat" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/log" "gemma.intevation.de/gemma/pkg/models" mw "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectPredictedObserveredSQL = ` SELECT measure_date, date_issue, predicted, water_level FROM ( SELECT location, measure_date, date_issue, false AS predicted, water_level FROM waterway.gauge_measurements UNION ALL SELECT location, measure_date, date_issue, true AS predicted, water_level FROM waterway.gauge_predictions ) AS gmp WHERE location = ($1, $2, $3, $4, $5)::isrs AND measure_date BETWEEN $6::timestamptz - '72hours'::interval AND $6::timestamptz ORDER BY measure_date, date_issue ` selectWaterlevelsSQL = ` SELECT measure_date, water_level, value_min, value_max, predicted FROM ( SELECT location, measure_date, date_issue, water_level, NULL AS value_min, NULL AS value_max, false AS predicted FROM waterway.gauge_measurements UNION ALL SELECT location, measure_date, date_issue, water_level, lower(conf_interval) AS value_min, upper(conf_interval) AS value_max, true AS predicted FROM waterway.gauge_predictions ) AS gmp WHERE ` selectAllWaterlevelsMeasuredRangeSQL = ` SELECT min(measure_date), max(measure_date) FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done ` selectAllWaterlevelsMeasuredSQL = ` SELECT extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar AS day_month, percentile_disc(0.25) within group (order by water_level) AS q25, percentile_disc(0.5) within group (order by water_level) AS median, percentile_disc(0.75) within group (order by water_level) AS q75, avg(water_level) AS mean, min(water_level) AS min, max(water_level) AS max FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done GROUP BY extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar; ` selectYearWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE location = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND staging_done AND measure_date BETWEEN $6 AND $7 ORDER BY measure_date ` ) func float64format(v float64) string { return strconv.FormatFloat(v, 'f', -1, 64) } func nullFloat64format(v sql.NullFloat64) string { if v.Valid { return float64format(v.Float64) } return "" } func boolFormat(b bool) string { if b { return "t" } return "f" } func yearWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } year, _ := strconv.Atoi(mux.Vars(req)["year"]) conn := mw.GetDBConn(req) ctx := req.Context() begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond) log.Infof("begin %s\n", begin) log.Infof("end %s\n", end) rows, err := conn.QueryContext( ctx, selectYearWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, begin, end, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() var values []float64 lastDay, lastMonth := -1, -1 write := func() error { var err error if len(values) > 0 { mean := stat.Mean(values, nil) _, err = fmt.Fprintf( rw, "%02d-%02d,%s\n", lastDay, lastMonth, float64format(mean)) values = values[:0] } return err } for rows.Next() { var when time.Time var value float64 if err := rows.Scan(&when, &value); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } when = when.UTC() day, month := when.Day(), int(when.Month()) if day != lastDay || month != lastMonth { if err := write(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } lastDay, lastMonth = day, month } values = append(values, value) } if err := rows.Err(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } if err := write(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. } } func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } conn := mw.GetDBConn(req) ctx := req.Context() var begin, end pgtype.Timestamp err = conn.QueryRowContext( ctx, selectAllWaterlevelsMeasuredRangeSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ).Scan(&begin, &end) switch { case err == sql.ErrNoRows || begin.Status != pgtype.Present || end.Status != pgtype.Present: http.NotFound(rw, req) return case err != nil: http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } rows, err := conn.QueryContext( ctx, selectAllWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() type result struct { day int month int q25 float64 median float64 q75 float64 mean float64 min float64 max float64 } results := make([]result, 0, 366) start := time.Now() for rows.Next() { var r result var dayMonth string if err := rows.Scan( &dayMonth, &r.q25, &r.median, &r.q75, &r.mean, &r.min, &r.max, ); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) } parts := strings.SplitN(dayMonth, ":", 2) r.day, _ = strconv.Atoi(parts[0]) r.month, _ = strconv.Atoi(parts[1]) results = append(results, r) } if err := rows.Err(); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } log.Infof("loading entries took %s\n", time.Since(start)) log.Infof("days found: %d\n", len(results)) sort.Slice(results, func(i, j int) bool { if d := results[i].month - results[j].month; d != 0 { return d < 0 } return results[i].day < results[j].day }) rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ fmt.Sprintf("#interval: %d-%d", begin.Time.UTC().Year(), end.Time.UTC().Year()), "", "", "", "", "", "", } if err := out.Write(record); err != nil { log.Errorf("%v\n", err) // Too late for an HTTP error code. return } record = []string{ "#date", "#min", "#max", "#mean", "#median", "#q25", "#q75", } if err := out.Write(record); err != nil { log.Errorf("%v\n", err) // Too late for an HTTP error code. return } for i := range results { r := &results[i] record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month) record[1] = float64format(r.min) record[2] = float64format(r.max) record[3] = float64format(r.mean) record[4] = float64format(r.median) record[5] = float64format(r.q25) record[6] = float64format(r.q75) if err := out.Write(record); err != nil { log.Errorf("%v\n", err) // Too late for an HTTP error code. return } } out.Flush() if err := out.Error(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } } func parseISRS(code string) (*models.Isrs, error) { isrs, err := models.IsrsFromString(code) if err != nil { return nil, mw.JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: Invalid ISRS code: %v", err), } } return isrs, nil } type observedPredictedValues struct { when time.Time observed float64 predicted common.TimedValues } func loadNashSutcliffeData( ctx context.Context, conn *sql.Conn, gauge *models.Isrs, when time.Time, ) ([]observedPredictedValues, error) { var rows *sql.Rows var err error if rows, err = conn.QueryContext( ctx, selectPredictedObserveredSQL, gauge.CountryCode, gauge.LoCode, gauge.FairwaySection, gauge.Orc, gauge.Hectometre, when, ); err != nil { return nil, err } defer rows.Close() acceptedDeltas := []time.Duration{ -time.Hour * 24, -time.Hour * 48, -time.Hour * 72, } isAccepted := func(observed, predicted time.Time) bool { for _, delta := range acceptedDeltas { t := observed.Add(delta) d := predicted.Sub(t) if -10*time.Millisecond < d && d < 10*time.Millisecond { return true } } return false } var ( hasCurrent bool current observedPredictedValues values []observedPredictedValues ) for rows.Next() { var ( measureDate time.Time issueDate time.Time predicted bool value float64 ) if err := rows.Scan( &measureDate, &issueDate, &predicted, &value, ); err != nil { return nil, err } measureDate = measureDate.UTC() issueDate = issueDate.UTC() if hasCurrent { if !current.when.Equal(measureDate) { if !math.IsNaN(current.observed) && len(current.predicted) > 0 { values = append(values, current) } current = observedPredictedValues{ observed: math.NaN(), when: measureDate, } } } else { hasCurrent = true current = observedPredictedValues{ observed: math.NaN(), when: measureDate, } } if predicted { if isAccepted(measureDate, issueDate) { current.predicted = append( current.predicted, common.TimedValue{When: issueDate, Value: value}, ) } } else { current.observed = value } } if err := rows.Err(); err != nil { return nil, err } if hasCurrent && !math.IsNaN(current.observed) && len(current.predicted) > 0 { values = append(values, current) } // for i := range values { // log.Debugf("%v %f %d\n", values[i].when, values[i].observed, len(values[i].predicted)) // if len(values[i].predicted) > 0 { // for j := range values[i].predicted { // log.Debugf("\t%v %f\n", values[i].predicted[j].When, values[i].predicted[j].Value) // } // } // } return values, nil } func nashSutcliffe(req *http.Request) (jr mw.JSONResult, err error) { gauge := mux.Vars(req)["gauge"] var isrs *models.Isrs if isrs, err = parseISRS(gauge); err != nil { return } var when time.Time if w := req.FormValue("when"); w != "" { if when, err = common.ParseTime(w); err != nil { err = mw.JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: wrong time format: %v", err), } return } } else { when = time.Now() } when = when.UTC() ctx := req.Context() var values []observedPredictedValues if values, err = loadNashSutcliffeData(ctx, mw.JSONConn(req), isrs, when); err != nil { return } log.Infof("found %d value(s) for Nash Sutcliffe.\n", len(values)) type coeff struct { Value float64 `json:"value"` Samples int `json:"samples"` Hours int `json:"hours"` } type coeffs struct { When models.ImportTime `json:"when"` Coeffs []coeff `json:"coeffs"` } var predicted, observed []float64 cs := make([]coeff, 3) for i := range cs { cs[i].Hours = (i + 1) * 24 delta := -time.Duration(cs[i].Hours) * time.Hour for j := range values { when := values[j].when.Add(delta) if p, ok := values[j].predicted.Find(when); ok { predicted = append(predicted, p) observed = append(observed, values[j].observed) } } cs[i].Value = sanitizeFloat64(common.NashSutcliffe(predicted, observed)) cs[i].Samples = len(predicted) predicted = predicted[:0] observed = observed[:0] } jr = mw.JSONResult{ Result: &coeffs{ When: models.ImportTime{Time: when}, Coeffs: cs, }, } return } func sanitizeFloat64(x float64) float64 { switch { case math.IsNaN(x): return 0 case math.IsInf(x, +1): return math.MaxFloat64 case math.IsInf(x, -1): return -math.MaxFloat64 } return x } func waterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } // TODO: FIXME The filter is not correct for predictions!? filters := filterAnd{ buildFilterTerm( "location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int)", isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ), &filterOr{ &filterNot{&filterTerm{format: "predicted"}}, buildFilterTerm( `date_issue = ( SELECT max(date_issue) FROM waterway.gauge_predictions gm WHERE location = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int))`, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ), }, } if from := req.FormValue("from"); from != "" { fromTime, err := common.ParseTime(from) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } filters = append(filters, buildFilterTerm("measure_date >= $%d", fromTime)) } if to := req.FormValue("to"); to != "" { toTime, err := common.ParseTime(to) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } filters = append(filters, buildFilterTerm("measure_date <= $%d", toTime)) } var stmt strings.Builder var args []interface{} stmt.WriteString(selectWaterlevelsSQL) filters.serialize(&stmt, &args) conn := mw.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext(ctx, stmt.String(), args...) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#date", "#water_level", "#value_min", "#value_max", "#predicted", } if err := out.Write(record); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } for rows.Next() { var ( measureDate time.Time waterlevel float64 valueMin sql.NullFloat64 valueMax sql.NullFloat64 predicted bool ) if err := rows.Scan( &measureDate, &waterlevel, &valueMin, &valueMax, &predicted, ); err != nil { log.Errorf("%v\n", err) // Too late for an HTTP error code. return } record[0] = measureDate.Format(common.TimeFormat) record[1] = float64format(waterlevel) record[2] = nullFloat64format(valueMin) record[3] = nullFloat64format(valueMax) record[4] = boolFormat(predicted) if err := out.Write(record); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } } if err := rows.Err(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } out.Flush() if err := out.Error(); err != nil { log.Errorf("%v", err) // Too late for an HTTP error code. return } }