Mercurial > gemma
view pkg/controllers/gauges.go @ 2826:f7ae108c2838
longterm waterlevels statistics: Generate a comment line with the interval of years.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 27 Mar 2019 13:43:41 +0100 |
parents | ff58440ce809 |
children | b5555005f51e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "database/sql" "encoding/csv" "fmt" "log" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" "gonum.org/v1/gonum/stat" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/models" ) const ( selectPredictedObserveredSQL = ` SELECT a.measure_date AS measure_date, a.water_level AS predicted, b.water_level AS observed FROM waterway.gauge_measurements a JOIN waterway.gauge_measurements b ON a.fk_gauge_id = b.fk_gauge_id AND a.measure_date = b.measure_date AND a.predicted AND NOT b.predicted WHERE a.fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND a.measure_date BETWEEN $6::timestamp AND $6::timestamp - '72hours'::interval ORDER BY a.measure_date ` selectWaterlevelsSQL = ` SELECT measure_date, water_level, value_min, value_max, predicted FROM waterway.gauge_measurements WHERE ` selectAllWaterlevelsMeasuredRangeSQL = ` SELECT min(measure_date), max(measure_date) FROM waterway.gauge_measurements WHERE fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND NOT predicted AND staging_done ` selectAllWaterlevelsMeasuredSQL = ` SELECT extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar AS day_month, percentile_disc(0.25) within group (order by water_level) AS q25, percentile_disc(0.5) within group (order by water_level) AS median, percentile_disc(0.75) within group (order by water_level) AS q75, avg(water_level) AS mean, min(water_level) AS min, max(water_level) AS max FROM waterway.gauge_measurements WHERE fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int )::isrs AND NOT predicted AND staging_done GROUP BY extract(day from measure_date)::varchar || ':' || extract(month from measure_date)::varchar; ` selectYearWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE NOT predicted AND staging_done AND fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND measure_date BETWEEN $6 AND $7 ORDER BY measure_date ` ) func float64format(v float64) string { return strconv.FormatFloat(v, 'f', -1, 64) } func nullFloat64format(v sql.NullFloat64) string { if v.Valid { return float64format(v.Float64) } return "" } func boolFormat(b bool) string { if b { return "t" } return "f" } func yearWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } year, _ := strconv.Atoi(mux.Vars(req)["year"]) conn := middleware.GetDBConn(req) ctx := req.Context() begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond) log.Printf("info: begin %s\n", begin) log.Printf("info: end %s\n", end) rows, err := conn.QueryContext( ctx, selectYearWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, begin, end, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() var values []float64 lastDay, lastMonth := -1, -1 write := func() error { var err error if len(values) > 0 { mean := stat.Mean(values, nil) _, err = fmt.Fprintf( rw, "%02d-%02d,%s\n", lastDay, lastMonth, float64format(mean)) values = values[:0] } return err } for rows.Next() { var when time.Time var value float64 if err := rows.Scan(&when, &value); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } when = when.UTC() day, month := when.Day(), int(when.Month()) if day != lastDay || month != lastMonth { if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } lastDay, lastMonth = day, month } values = append(values, value) } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. } } func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } conn := middleware.GetDBConn(req) ctx := req.Context() var begin, end pgtype.Timestamp err = conn.QueryRowContext( ctx, selectAllWaterlevelsMeasuredRangeSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ).Scan(&begin, &end) switch { case err == sql.ErrNoRows || begin.Status != pgtype.Present || end.Status != pgtype.Present: http.NotFound(rw, req) return case err != nil: http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } rows, err := conn.QueryContext( ctx, selectAllWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() type result struct { day int month int q25 float64 median float64 q75 float64 mean float64 min float64 max float64 } results := make([]result, 0, 366) start := time.Now() for rows.Next() { var r result var dayMonth string if err := rows.Scan( &dayMonth, &r.q25, &r.median, &r.q75, &r.mean, &r.min, &r.max, ); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) } parts := strings.SplitN(dayMonth, ":", 2) r.day, _ = strconv.Atoi(parts[0]) r.month, _ = strconv.Atoi(parts[1]) results = append(results, r) } if err := rows.Err(); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } log.Printf("info: loading entries took %s\n", time.Since(start)) log.Printf("info: days found: %d\n", len(results)) sort.Slice(results, func(i, j int) bool { if d := int(results[i].month) - int(results[j].month); d != 0 { return d < 0 } return results[i].day < results[j].day }) rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ fmt.Sprintf("#interval: %d-%d", begin.Time.UTC().Year(), end.Time.UTC().Year()), "", "", "", "", "", "", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } record = []string{ "#date", "#min", "#max", "#mean", "#median", "#q25", "#q75", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } for i := range results { r := &results[i] record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month) record[1] = float64format(r.min) record[2] = float64format(r.max) record[3] = float64format(r.mean) record[4] = float64format(r.median) record[5] = float64format(r.q25) record[6] = float64format(r.q75) if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } func nashSutcliffe( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { gauge := mux.Vars(req)["gauge"] var isrs *models.Isrs if isrs, err = models.IsrsFromString(gauge); err != nil { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: Invalid ISRS code: %v", err), } return } var when time.Time if w := req.FormValue("when"); w != "" { if when, err = time.Parse(models.ImportTimeFormat, w); err != nil { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: wrong time format: %v", err), } return } } else { when = time.Now() } ctx := req.Context() var rows *sql.Rows if rows, err = conn.QueryContext( ctx, selectPredictedObserveredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, when, ); err != nil { return } defer rows.Close() var measurements []common.NSMeasurement for rows.Next() { var m common.NSMeasurement if err = rows.Scan( &m.When, &m.Predicted, &m.Observed, ); err != nil { return } measurements = append(measurements, m) } if err = rows.Err(); err != nil { return } type coeff struct { Value float64 `json:"value"` Samples int `json:"samples"` Hours int `json:"hours"` } type coeffs struct { When models.ImportTime `json:"when"` Coeffs []coeff `json:"coeffs"` } cs := make([]coeff, 3) for i := range cs { cs[i].Hours = (i + 1) * 24 cs[i].Value, cs[i].Samples = common.NashSutcliffe( measurements, when, when.Add(time.Duration(-cs[i].Hours)*time.Hour), ) } jr = JSONResult{ Result: &coeffs{ When: models.ImportTime{when}, Coeffs: cs, }, } return } func waterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } var fb filterBuilder fb.stmt.WriteString(selectWaterlevelsSQL) fb.cond( " fk_gauge_id = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int) ", isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if from := req.FormValue("from"); from != "" { fromTime, err := time.Parse(models.ImportTimeFormat, from) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } fb.cond("measure_date >= $%d", fromTime) } if to := req.FormValue("to"); to != "" { toTime, err := time.Parse(models.ImportTimeFormat, to) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } fb.cond("measure_date <= $%d", toTime) } conn := middleware.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext(ctx, fb.stmt.String(), fb.args...) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#date", "#water_level", "#value_min", "#value_max", "#predicted", } if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } for rows.Next() { var ( measureDate time.Time waterlevel float64 valueMin sql.NullFloat64 valueMax sql.NullFloat64 predicted bool ) if err := rows.Scan( &measureDate, &waterlevel, &valueMin, &valueMax, &predicted, ); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } record[0] = measureDate.Format(models.ImportTimeFormat) record[1] = float64format(waterlevel) record[2] = nullFloat64format(valueMin) record[3] = nullFloat64format(valueMax) record[4] = boolFormat(predicted) if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } }