Mercurial > gemma
comparison pkg/controllers/gauges.go @ 2809:216bc6394911
Optimized longterm waterlevel statistics.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 26 Mar 2019 12:45:50 +0100 |
parents | 5b6de0bde6b6 |
children | 6f435a9558f2 |
comparison
equal
deleted
inserted
replaced
2808:5b6de0bde6b6 | 2809:216bc6394911 |
---|---|
17 "database/sql" | 17 "database/sql" |
18 "encoding/csv" | 18 "encoding/csv" |
19 "fmt" | 19 "fmt" |
20 "log" | 20 "log" |
21 "net/http" | 21 "net/http" |
22 "runtime" | |
23 "sort" | 22 "sort" |
24 "strconv" | 23 "strconv" |
25 "sync" | 24 "strings" |
26 "time" | 25 "time" |
27 | 26 |
28 "github.com/gorilla/mux" | 27 "github.com/gorilla/mux" |
29 "gonum.org/v1/gonum/stat" | 28 "gonum.org/v1/gonum/stat" |
30 | 29 |
66 FROM waterway.gauge_measurements | 65 FROM waterway.gauge_measurements |
67 WHERE | 66 WHERE |
68 ` | 67 ` |
69 | 68 |
70 selectAllWaterlevelsMeasuredSQL = ` | 69 selectAllWaterlevelsMeasuredSQL = ` |
71 SELECT | 70 WITH g AS ( |
72 measure_date, | 71 SELECT ( |
73 water_level | |
74 FROM waterway.gauge_measurements | |
75 WHERE | |
76 NOT predicted | |
77 AND fk_gauge_id = ( | |
78 $1::char(2), | 72 $1::char(2), |
79 $2::char(3), | 73 $2::char(3), |
80 $3::char(5), | 74 $3::char(5), |
81 $4::char(5), | 75 $4::char(5), |
82 $5::int | 76 $5::int)::isrs loc |
83 ) | 77 ) |
78 SELECT | |
79 extract(day from measure_date)::varchar || ':' || | |
80 extract(month from measure_date)::varchar AS day_month, | |
81 percentile_disc(0.25) within group (order by water_level) AS q25, | |
82 percentile_disc(0.5) within group (order by water_level) AS median, | |
83 percentile_disc(0.75) within group (order by water_level) AS q75, | |
84 avg(water_level) AS mean, | |
85 min(water_level) AS min, | |
86 max(water_level) AS max | |
87 FROM waterway.gauge_measurements, g | |
88 WHERE fk_gauge_id = g.loc AND NOT predicted | |
89 GROUP BY extract(day from measure_date)::varchar || ':' || | |
90 extract(month from measure_date)::varchar; | |
84 ` | 91 ` |
85 selectYearWaterlevelsMeasuredSQL = ` | 92 selectYearWaterlevelsMeasuredSQL = ` |
86 SELECT | 93 SELECT |
87 measure_date, | 94 measure_date, |
88 water_level | 95 water_level |
97 $5::int | 104 $5::int |
98 ) | 105 ) |
99 AND measure_date BETWEEN $6 AND $7 | 106 AND measure_date BETWEEN $6 AND $7 |
100 ORDER BY measure_date | 107 ORDER BY measure_date |
101 ` | 108 ` |
102 | |
103 selectWaterlevelsMeasuredSQL = ` | 109 selectWaterlevelsMeasuredSQL = ` |
104 SELECT | 110 SELECT |
105 measure_date, | 111 measure_date, |
106 water_level | 112 water_level |
107 FROM waterway.gauge_measurements | 113 FROM waterway.gauge_measurements |
262 http.StatusInternalServerError) | 268 http.StatusInternalServerError) |
263 return | 269 return |
264 } | 270 } |
265 defer rows.Close() | 271 defer rows.Close() |
266 | 272 |
267 type dayKey struct { | 273 type result struct { |
268 month byte | 274 day int |
269 day byte | 275 month int |
270 } | 276 q25 float64 |
271 | 277 median float64 |
272 entries := make(map[dayKey][]float64) | 278 q75 float64 |
279 mean float64 | |
280 min float64 | |
281 max float64 | |
282 } | |
283 | |
284 results := make([]result, 0, 366) | |
273 | 285 |
274 start := time.Now() | 286 start := time.Now() |
275 | 287 |
276 for rows.Next() { | 288 for rows.Next() { |
277 var value float64 | 289 var r result |
278 var date time.Time | 290 var dayMonth string |
279 if err := rows.Scan(&date, &value); err != nil { | 291 if err := rows.Scan( |
292 &dayMonth, | |
293 &r.q25, | |
294 &r.median, | |
295 &r.q75, | |
296 &r.mean, | |
297 &r.min, | |
298 &r.max, | |
299 ); err != nil { | |
280 http.Error( | 300 http.Error( |
281 rw, fmt.Sprintf("error: %v", err), | 301 rw, fmt.Sprintf("error: %v", err), |
282 http.StatusInternalServerError) | 302 http.StatusInternalServerError) |
283 } | 303 } |
284 date = date.UTC() | 304 parts := strings.SplitN(dayMonth, ":", 2) |
285 key := dayKey{ | 305 r.day, _ = strconv.Atoi(parts[0]) |
286 month: byte(date.Month()), | 306 r.month, _ = strconv.Atoi(parts[1]) |
287 day: byte(date.Day()), | 307 results = append(results, r) |
288 } | |
289 entries[key] = append(entries[key], value) | |
290 } | 308 } |
291 | 309 |
292 if err := rows.Err(); err != nil { | 310 if err := rows.Err(); err != nil { |
293 http.Error( | 311 http.Error( |
294 rw, fmt.Sprintf("error: %v", err), | 312 rw, fmt.Sprintf("error: %v", err), |
296 return | 314 return |
297 } | 315 } |
298 | 316 |
299 log.Printf("info: loading entries took %s\n", time.Since(start)) | 317 log.Printf("info: loading entries took %s\n", time.Since(start)) |
300 | 318 |
301 log.Printf("info: days found: %d\n", len(entries)) | 319 log.Printf("info: days found: %d\n", len(results)) |
302 | |
303 stats := time.Now() | |
304 | |
305 type result struct { | |
306 key dayKey | |
307 values []float64 | |
308 min float64 | |
309 max float64 | |
310 mean float64 | |
311 median float64 | |
312 q25 float64 | |
313 q75 float64 | |
314 } | |
315 | |
316 results := make([]result, len(entries)) | |
317 | |
318 jobs := make(chan int) | |
319 | |
320 var wg sync.WaitGroup | |
321 | |
322 for i, n := 0, runtime.NumCPU(); i < n; i++ { | |
323 wg.Add(1) | |
324 go func() { | |
325 defer wg.Done() | |
326 for job := range jobs { | |
327 r := &results[job] | |
328 if len(r.values) == 0 { | |
329 continue | |
330 } | |
331 sort.Float64s(r.values) | |
332 r.min = r.values[0] | |
333 r.max = r.values[len(r.values)-1] | |
334 r.median = r.values[len(r.values)/2] | |
335 r.mean = stat.Mean(r.values, nil) | |
336 r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil) | |
337 r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil) | |
338 } | |
339 }() | |
340 } | |
341 | |
342 var i int | |
343 for k, v := range entries { | |
344 results[i].key = k | |
345 results[i].values = v | |
346 jobs <- i | |
347 i++ | |
348 } | |
349 close(jobs) | |
350 wg.Wait() | |
351 | 320 |
352 sort.Slice(results, func(i, j int) bool { | 321 sort.Slice(results, func(i, j int) bool { |
353 ki, kj := results[i].key, results[j].key | 322 if d := int(results[i].month) - int(results[j].month); d != 0 { |
354 if d := int(ki.month) - int(kj.month); d != 0 { | |
355 return d < 0 | 323 return d < 0 |
356 } | 324 } |
357 return ki.day < kj.day | 325 return results[i].day < results[j].day |
358 }) | 326 }) |
359 | |
360 log.Printf("info: calculating stats took %s\n", time.Since(stats)) | |
361 | 327 |
362 rw.Header().Add("Content-Type", "text/csv") | 328 rw.Header().Add("Content-Type", "text/csv") |
363 | 329 |
364 out := csv.NewWriter(rw) | 330 out := csv.NewWriter(rw) |
365 | 331 |
379 return | 345 return |
380 } | 346 } |
381 | 347 |
382 for i := range results { | 348 for i := range results { |
383 r := &results[i] | 349 r := &results[i] |
384 record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month) | 350 record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month) |
385 record[1] = float64format(r.min) | 351 record[1] = float64format(r.min) |
386 record[2] = float64format(r.max) | 352 record[2] = float64format(r.max) |
387 record[3] = float64format(r.mean) | 353 record[3] = float64format(r.mean) |
388 record[4] = float64format(r.median) | 354 record[4] = float64format(r.median) |
389 record[5] = float64format(r.q25) | 355 record[5] = float64format(r.q25) |