Mercurial > gemma
changeset 2800:db1052bc162a
Added GET /data/longterm-waterlevels/{gauge}
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 25 Mar 2019 17:31:56 +0100 |
parents | e19fac818aab |
children | 054f5d61452d |
files | pkg/controllers/gauges.go pkg/controllers/routes.go |
diffstat | 2 files changed, 192 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/gauges.go Mon Mar 25 16:16:58 2019 +0100 +++ b/pkg/controllers/gauges.go Mon Mar 25 17:31:56 2019 +0100 @@ -19,8 +19,10 @@ "fmt" "log" "net/http" + "runtime" "sort" "strconv" + "sync" "time" "github.com/gorilla/mux" @@ -64,6 +66,22 @@ FROM waterway.gauge_measurements WHERE ` + + selectAllWaterlevelsMeasuredSQL = ` +SELECT + measure_date, + water_level +FROM waterway.gauge_measurements +WHERE + NOT predicted + AND fk_gauge_id = ( + $1::char(2), + $2::char(3), + $3::char(5), + $4::char(5), + $5::int + ) +` selectWaterlevelsMeasuredSQL = ` SELECT measure_date, @@ -103,6 +121,177 @@ return "f" } +func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { + + gauge := mux.Vars(req)["gauge"] + + isrs, err := models.IsrsFromString(gauge) + if err != nil { + http.Error( + rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), + http.StatusBadRequest) + return + } + + conn := middleware.GetDBConn(req) + + ctx := req.Context() + + rows, err := conn.QueryContext( + ctx, + selectAllWaterlevelsMeasuredSQL, + isrs.CountryCode, + isrs.LoCode, + isrs.FairwaySection, + isrs.Orc, + isrs.Hectometre, + ) + if err != nil { + http.Error( + rw, fmt.Sprintf("error: %v", err), + http.StatusInternalServerError) + return + } + defer rows.Close() + + type dayKey struct { + month byte + day byte + } + + entries := make(map[dayKey][]float64) + + start := time.Now() + + for rows.Next() { + var value float64 + var date time.Time + if err := rows.Scan(&date, &value); err != nil { + http.Error( + rw, fmt.Sprintf("error: %v", err), + http.StatusInternalServerError) + } + key := dayKey{ + month: byte(date.Month()), + day: byte(date.Day()), + } + entries[key] = append(entries[key], value) + } + + if err := rows.Err(); err != nil { + http.Error( + rw, fmt.Sprintf("error: %v", err), + http.StatusInternalServerError) + return + } + + log.Printf("info: loading entries took %s\n", time.Since(start)) + + log.Printf("info: days found: %d\n", len(entries)) + + stats := time.Now() + + type result struct { + key dayKey + values []float64 + min float64 + max float64 + mean float64 + median float64 + q25 float64 + q75 float64 + } + + results := make([]result, len(entries)) + + jobs := make(chan int) + + var wg sync.WaitGroup + + for i, n := 0, runtime.NumCPU(); i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range jobs { + r := &results[job] + if len(r.values) == 0 { + continue + } + sort.Float64s(r.values) + r.min = r.values[0] + r.max = r.values[len(r.values)-1] + r.median = r.values[len(r.values)/2] + r.mean = stat.Mean(r.values, nil) + r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil) + r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil) + } + }() + } + + var i int + for k, v := range entries { + results[i].key = k + results[i].values = v + jobs <- i + i++ + } + close(jobs) + wg.Wait() + + sort.Slice(results, func(i, j int) bool { + ki, kj := results[i].key, results[j].key + if d := int(ki.month) - int(kj.month); d != 0 { + return d < 0 + } + return ki.day < kj.day + }) + + log.Printf("info: calculating stats took %s\n", time.Since(stats)) + + rw.Header().Add("Content-Type", "text/csv") + + out := csv.NewWriter(rw) + + record := []string{ + "#date", + "#min", + "#max", + "#mean", + "#median", + "#q25", + "#q75", + } + + if err := out.Write(record); err != nil { + log.Printf("error: %v\n", err) + // Too late for an HTTP error code. + return + } + + for i := range results { + r := &results[i] + record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month) + record[1] = float64format(r.min) + record[2] = float64format(r.max) + record[3] = float64format(r.mean) + record[4] = float64format(r.median) + record[5] = float64format(r.q25) + record[6] = float64format(r.q75) + if err := out.Write(record); err != nil { + log.Printf("error: %v\n", err) + // Too late for an HTTP error code. + return + } + } + + out.Flush() + if err := out.Error(); err != nil { + log.Printf("error: %v", err) + // Too late for an HTTP error code. + return + } +} + func averageWaterlevels(rw http.ResponseWriter, req *http.Request) { gauge := mux.Vars(req)["gauge"]
--- a/pkg/controllers/routes.go Mon Mar 25 16:16:58 2019 +0100 +++ b/pkg/controllers/routes.go Mon Mar 25 17:31:56 2019 +0100 @@ -305,6 +305,9 @@ api.Handle("/data/average-waterlevels/{gauge}", any( middleware.DBConn(http.HandlerFunc(averageWaterlevels)))).Methods(http.MethodGet) + api.Handle("/data/longterm-waterlevels/{gauge}", any( + middleware.DBConn(http.HandlerFunc(longtermWaterlevels)))).Methods(http.MethodGet) + api.Handle("/data/nash-sutcliffe/{gauge}", any(&JSONHandler{ Handle: nashSutcliffe, })).Methods(http.MethodGet)