Mercurial > gemma
view pkg/controllers/gauges.go @ 2806:c6374c520228
Use between filter to speed up SQL for fetching yearly waterlevels.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 26 Mar 2019 07:41:53 +0100 |
parents | 46a9a7c1281f |
children | 1fa57ad05e1e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "database/sql" "encoding/csv" "fmt" "log" "net/http" "runtime" "sort" "strconv" "sync" "time" "github.com/gorilla/mux" "gonum.org/v1/gonum/stat" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/models" ) const ( selectPredictedObserveredSQL = ` SELECT a.measure_date AS measure_date, a.water_level AS predicted, b.water_level AS observed FROM waterway.gauge_measurements a JOIN waterway.gauge_measurements b ON a.fk_gauge_id = b.fk_gauge_id AND a.measure_date = b.measure_date AND a.predicted AND NOT b.predicted WHERE a.fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND a.measure_date BETWEEN $6::timestamp AND $6::timestamp - '72hours'::interval ORDER BY a.measure_date ` selectWaterlevelsSQL = ` SELECT measure_date, water_level, value_min, value_max, predicted FROM waterway.gauge_measurements WHERE ` selectAllWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE NOT predicted AND fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) ` selectYearWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE NOT predicted AND fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND measure_date BETWEEN $6 AND $7 ORDER BY measure_date ` selectWaterlevelsMeasuredSQL = ` SELECT measure_date, water_level FROM waterway.gauge_measurements WHERE NOT predicted AND fk_gauge_id = ( $1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int ) AND measure_date BETWEEN $6::timestamp with time zone AND $7::timestamp with time zone ORDER BY measure_date ` ) func float64format(v float64) string { return strconv.FormatFloat(v, 'f', -1, 64) } func nullFloat64format(v sql.NullFloat64) string { if v.Valid { return float64format(v.Float64) } return "" } func boolFormat(b bool) string { if b { return "t" } return "f" } func yearWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } year, _ := strconv.Atoi(mux.Vars(req)["year"]) conn := middleware.GetDBConn(req) ctx := req.Context() begin := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-time.Microsecond) log.Printf("info: begin %s\n", begin) log.Printf("info: end %s\n", end) rows, err := conn.QueryContext( ctx, selectYearWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, begin, end, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() var values []float64 lastDay, lastMonth := -1, -1 write := func() error { var err error if len(values) > 0 { mean := stat.Mean(values, nil) _, err = fmt.Fprintf( rw, "%02d-%02d,%s\n", lastDay, lastMonth, float64format(mean)) values = values[:0] } return err } for rows.Next() { var when time.Time var value float64 if err := rows.Scan(&when, &value); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } day, month := when.Day(), int(when.Month()) if day != lastDay || month != lastMonth { if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } lastDay, lastMonth = day, month } values = append(values, value) } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } if err := write(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. } } func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } conn := middleware.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext( ctx, selectAllWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() type dayKey struct { month byte day byte } entries := make(map[dayKey][]float64) start := time.Now() for rows.Next() { var value float64 var date time.Time if err := rows.Scan(&date, &value); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) } key := dayKey{ month: byte(date.Month()), day: byte(date.Day()), } entries[key] = append(entries[key], value) } if err := rows.Err(); err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } log.Printf("info: loading entries took %s\n", time.Since(start)) log.Printf("info: days found: %d\n", len(entries)) stats := time.Now() type result struct { key dayKey values []float64 min float64 max float64 mean float64 median float64 q25 float64 q75 float64 } results := make([]result, len(entries)) jobs := make(chan int) var wg sync.WaitGroup for i, n := 0, runtime.NumCPU(); i < n; i++ { wg.Add(1) go func() { defer wg.Done() for job := range jobs { r := &results[job] if len(r.values) == 0 { continue } sort.Float64s(r.values) r.min = r.values[0] r.max = r.values[len(r.values)-1] r.median = r.values[len(r.values)/2] r.mean = stat.Mean(r.values, nil) r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil) r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil) } }() } var i int for k, v := range entries { results[i].key = k results[i].values = v jobs <- i i++ } close(jobs) wg.Wait() sort.Slice(results, func(i, j int) bool { ki, kj := results[i].key, results[j].key if d := int(ki.month) - int(kj.month); d != 0 { return d < 0 } return ki.day < kj.day }) log.Printf("info: calculating stats took %s\n", time.Since(stats)) rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#date", "#min", "#max", "#mean", "#median", "#q25", "#q75", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } for i := range results { r := &results[i] record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month) record[1] = float64format(r.min) record[2] = float64format(r.max) record[3] = float64format(r.mean) record[4] = float64format(r.median) record[5] = float64format(r.q25) record[6] = float64format(r.q75) if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } func averageWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } var from, to time.Time if t := req.FormValue("to"); t != "" { var err error if to, err = time.ParseInLocation(common.DateFormat, t, time.UTC); err != nil { http.Error( rw, fmt.Sprintf("error: bad from date: %v", err), http.StatusBadRequest) return } } else { y, m, d := time.Now().Date() to = time.Date(y, m, d, 0, 0, 0, 0, time.UTC) } if f := req.FormValue("from"); f != "" { var err error if from, err = time.ParseInLocation(common.DateFormat, f, time.UTC); err != nil { http.Error( rw, fmt.Sprintf("error: bad from date: %v", err), http.StatusBadRequest) return } } else { from = to.AddDate(-1, 0, 0) } to = to.AddDate(0, 0, 1).Add(-time.Nanosecond) if to.Before(from) { from, to = to, from } conn := middleware.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext( ctx, selectWaterlevelsMeasuredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, from, to, ) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) var last time.Time var values []float64 record := []string{ "#date", "#min", "#max", "#mean", "#median", "#q25", "#q75", } if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } write := func() error { if len(values) > 0 { sort.Float64s(values) // date record[0] = last.Format(common.DateFormat) // min record[1] = float64format(values[0]) // max record[2] = float64format(values[len(values)-1]) // mean record[3] = float64format(stat.Mean(values, nil)) // median record[4] = float64format(values[len(values)/2]) // Q25 record[5] = float64format( stat.Quantile(0.25, stat.Empirical, values, nil)) // Q75 record[6] = float64format( stat.Quantile(0.75, stat.Empirical, values, nil)) err := out.Write(record) values = values[:0] return err } return nil } for rows.Next() { var ( date time.Time value float64 ) if err := rows.Scan(&date, &value); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } oy, om, od := last.Date() ny, nm, nd := date.Date() if oy != ny || om != nm || od != nd { if err := write(); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } last = date } else { values = append(values, value) } } write() if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } func nashSutcliffe( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { gauge := mux.Vars(req)["gauge"] var isrs *models.Isrs if isrs, err = models.IsrsFromString(gauge); err != nil { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: Invalid ISRS code: %v", err), } return } var when time.Time if w := req.FormValue("when"); w != "" { if when, err = time.Parse(models.ImportTimeFormat, w); err != nil { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("error: wrong time format: %v", err), } return } } else { when = time.Now() } ctx := req.Context() var rows *sql.Rows if rows, err = conn.QueryContext( ctx, selectPredictedObserveredSQL, isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, when, ); err != nil { return } defer rows.Close() var measurements []common.NSMeasurement for rows.Next() { var m common.NSMeasurement if err = rows.Scan( &m.When, &m.Predicted, &m.Observed, ); err != nil { return } measurements = append(measurements, m) } if err = rows.Err(); err != nil { return } type coeff struct { Value float64 `json:"value"` Samples int `json:"samples"` Hours int `json:"hours"` } type coeffs struct { When models.ImportTime `json:"when"` Coeffs []coeff `json:"coeffs"` } cs := make([]coeff, 3) for i := range cs { cs[i].Hours = (i + 1) * 24 cs[i].Value, cs[i].Samples = common.NashSutcliffe( measurements, when, when.Add(time.Duration(-cs[i].Hours)*time.Hour), ) } jr = JSONResult{ Result: &coeffs{ When: models.ImportTime{when}, Coeffs: cs, }, } return } func waterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"] isrs, err := models.IsrsFromString(gauge) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), http.StatusBadRequest) return } var fb filterBuilder fb.stmt.WriteString(selectWaterlevelsSQL) fb.cond( " fk_gauge_id = ($%d::char(2), $%d::char(3), $%d::char(5), $%d::char(5), $%d::int) ", isrs.CountryCode, isrs.LoCode, isrs.FairwaySection, isrs.Orc, isrs.Hectometre, ) if from := req.FormValue("from"); from != "" { fromTime, err := time.Parse(models.ImportTimeFormat, from) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } fb.cond("measure_date >= $%d", fromTime) } if to := req.FormValue("to"); to != "" { toTime, err := time.Parse(models.ImportTimeFormat, to) if err != nil { http.Error( rw, fmt.Sprintf("error: Invalid from time: %v", err), http.StatusBadRequest) return } fb.cond("measure_date <= $%d", toTime) } conn := middleware.GetDBConn(req) ctx := req.Context() rows, err := conn.QueryContext(ctx, fb.stmt.String(), fb.args...) if err != nil { http.Error( rw, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) return } defer rows.Close() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#date", "#water_level", "#value_min", "#value_max", "#predicted", } if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } for rows.Next() { var ( measureDate time.Time waterlevel float64 valueMin sql.NullFloat64 valueMax sql.NullFloat64 predicted bool ) if err := rows.Scan( &measureDate, &waterlevel, &valueMin, &valueMax, &predicted, ); err != nil { log.Printf("error: %v\n", err) // Too late for an HTTP error code. return } record[0] = measureDate.Format(models.ImportTimeFormat) record[1] = float64format(waterlevel) record[2] = nullFloat64format(valueMin) record[3] = nullFloat64format(valueMax) record[4] = boolFormat(predicted) if err := out.Write(record); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } } if err := rows.Err(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v", err) // Too late for an HTTP error code. return } }