comparison pkg/controllers/gauges.go @ 2800:db1052bc162a

Added GET /data/longterm-waterlevels/{gauge}
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 25 Mar 2019 17:31:56 +0100
parents 2806821cfd63
children 46a9a7c1281f
comparison
equal deleted inserted replaced
2799:e19fac818aab 2800:db1052bc162a
17 "database/sql" 17 "database/sql"
18 "encoding/csv" 18 "encoding/csv"
19 "fmt" 19 "fmt"
20 "log" 20 "log"
21 "net/http" 21 "net/http"
22 "runtime"
22 "sort" 23 "sort"
23 "strconv" 24 "strconv"
25 "sync"
24 "time" 26 "time"
25 27
26 "github.com/gorilla/mux" 28 "github.com/gorilla/mux"
27 "gonum.org/v1/gonum/stat" 29 "gonum.org/v1/gonum/stat"
28 30
62 value_max, 64 value_max,
63 predicted 65 predicted
64 FROM waterway.gauge_measurements 66 FROM waterway.gauge_measurements
65 WHERE 67 WHERE
66 ` 68 `
69
70 selectAllWaterlevelsMeasuredSQL = `
71 SELECT
72 measure_date,
73 water_level
74 FROM waterway.gauge_measurements
75 WHERE
76 NOT predicted
77 AND fk_gauge_id = (
78 $1::char(2),
79 $2::char(3),
80 $3::char(5),
81 $4::char(5),
82 $5::int
83 )
84 `
67 selectWaterlevelsMeasuredSQL = ` 85 selectWaterlevelsMeasuredSQL = `
68 SELECT 86 SELECT
69 measure_date, 87 measure_date,
70 water_level 88 water_level
71 FROM waterway.gauge_measurements 89 FROM waterway.gauge_measurements
101 return "t" 119 return "t"
102 } 120 }
103 return "f" 121 return "f"
104 } 122 }
105 123
124 func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) {
125
126 gauge := mux.Vars(req)["gauge"]
127
128 isrs, err := models.IsrsFromString(gauge)
129 if err != nil {
130 http.Error(
131 rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
132 http.StatusBadRequest)
133 return
134 }
135
136 conn := middleware.GetDBConn(req)
137
138 ctx := req.Context()
139
140 rows, err := conn.QueryContext(
141 ctx,
142 selectAllWaterlevelsMeasuredSQL,
143 isrs.CountryCode,
144 isrs.LoCode,
145 isrs.FairwaySection,
146 isrs.Orc,
147 isrs.Hectometre,
148 )
149 if err != nil {
150 http.Error(
151 rw, fmt.Sprintf("error: %v", err),
152 http.StatusInternalServerError)
153 return
154 }
155 defer rows.Close()
156
157 type dayKey struct {
158 month byte
159 day byte
160 }
161
162 entries := make(map[dayKey][]float64)
163
164 start := time.Now()
165
166 for rows.Next() {
167 var value float64
168 var date time.Time
169 if err := rows.Scan(&date, &value); err != nil {
170 http.Error(
171 rw, fmt.Sprintf("error: %v", err),
172 http.StatusInternalServerError)
173 }
174 key := dayKey{
175 month: byte(date.Month()),
176 day: byte(date.Day()),
177 }
178 entries[key] = append(entries[key], value)
179 }
180
181 if err := rows.Err(); err != nil {
182 http.Error(
183 rw, fmt.Sprintf("error: %v", err),
184 http.StatusInternalServerError)
185 return
186 }
187
188 log.Printf("info: loading entries took %s\n", time.Since(start))
189
190 log.Printf("info: days found: %d\n", len(entries))
191
192 stats := time.Now()
193
194 type result struct {
195 key dayKey
196 values []float64
197 min float64
198 max float64
199 mean float64
200 median float64
201 q25 float64
202 q75 float64
203 }
204
205 results := make([]result, len(entries))
206
207 jobs := make(chan int)
208
209 var wg sync.WaitGroup
210
211 for i, n := 0, runtime.NumCPU(); i < n; i++ {
212 wg.Add(1)
213 go func() {
214 defer wg.Done()
215 for job := range jobs {
216 r := &results[job]
217 if len(r.values) == 0 {
218 continue
219 }
220 sort.Float64s(r.values)
221 r.min = r.values[0]
222 r.max = r.values[len(r.values)-1]
223 r.median = r.values[len(r.values)/2]
224 r.mean = stat.Mean(r.values, nil)
225 r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil)
226 r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil)
227 }
228 }()
229 }
230
231 var i int
232 for k, v := range entries {
233 results[i].key = k
234 results[i].values = v
235 jobs <- i
236 i++
237 }
238 close(jobs)
239 wg.Wait()
240
241 sort.Slice(results, func(i, j int) bool {
242 ki, kj := results[i].key, results[j].key
243 if d := int(ki.month) - int(kj.month); d != 0 {
244 return d < 0
245 }
246 return ki.day < kj.day
247 })
248
249 log.Printf("info: calculating stats took %s\n", time.Since(stats))
250
251 rw.Header().Add("Content-Type", "text/csv")
252
253 out := csv.NewWriter(rw)
254
255 record := []string{
256 "#date",
257 "#min",
258 "#max",
259 "#mean",
260 "#median",
261 "#q25",
262 "#q75",
263 }
264
265 if err := out.Write(record); err != nil {
266 log.Printf("error: %v\n", err)
267 // Too late for an HTTP error code.
268 return
269 }
270
271 for i := range results {
272 r := &results[i]
273 record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month)
274 record[1] = float64format(r.min)
275 record[2] = float64format(r.max)
276 record[3] = float64format(r.mean)
277 record[4] = float64format(r.median)
278 record[5] = float64format(r.q25)
279 record[6] = float64format(r.q75)
280 if err := out.Write(record); err != nil {
281 log.Printf("error: %v\n", err)
282 // Too late for an HTTP error code.
283 return
284 }
285 }
286
287 out.Flush()
288 if err := out.Error(); err != nil {
289 log.Printf("error: %v", err)
290 // Too late for an HTTP error code.
291 return
292 }
293 }
294
106 func averageWaterlevels(rw http.ResponseWriter, req *http.Request) { 295 func averageWaterlevels(rw http.ResponseWriter, req *http.Request) {
107 gauge := mux.Vars(req)["gauge"] 296 gauge := mux.Vars(req)["gauge"]
108 297
109 isrs, err := models.IsrsFromString(gauge) 298 isrs, err := models.IsrsFromString(gauge)
110 if err != nil { 299 if err != nil {