comparison pkg/controllers/gauges.go @ 2809:216bc6394911

Optimized longterm waterlevel statistics.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Mar 2019 12:45:50 +0100
parents 5b6de0bde6b6
children 6f435a9558f2
comparison
equal deleted inserted replaced
2808:5b6de0bde6b6 2809:216bc6394911
17 "database/sql" 17 "database/sql"
18 "encoding/csv" 18 "encoding/csv"
19 "fmt" 19 "fmt"
20 "log" 20 "log"
21 "net/http" 21 "net/http"
22 "runtime"
23 "sort" 22 "sort"
24 "strconv" 23 "strconv"
25 "sync" 24 "strings"
26 "time" 25 "time"
27 26
28 "github.com/gorilla/mux" 27 "github.com/gorilla/mux"
29 "gonum.org/v1/gonum/stat" 28 "gonum.org/v1/gonum/stat"
30 29
66 FROM waterway.gauge_measurements 65 FROM waterway.gauge_measurements
67 WHERE 66 WHERE
68 ` 67 `
69 68
70 selectAllWaterlevelsMeasuredSQL = ` 69 selectAllWaterlevelsMeasuredSQL = `
71 SELECT 70 WITH g AS (
72 measure_date, 71 SELECT (
73 water_level
74 FROM waterway.gauge_measurements
75 WHERE
76 NOT predicted
77 AND fk_gauge_id = (
78 $1::char(2), 72 $1::char(2),
79 $2::char(3), 73 $2::char(3),
80 $3::char(5), 74 $3::char(5),
81 $4::char(5), 75 $4::char(5),
82 $5::int 76 $5::int)::isrs loc
83 ) 77 )
78 SELECT
79 extract(day from measure_date)::varchar || ':' ||
80 extract(month from measure_date)::varchar AS day_month,
81 percentile_disc(0.25) within group (order by water_level) AS q25,
82 percentile_disc(0.5) within group (order by water_level) AS median,
83 percentile_disc(0.75) within group (order by water_level) AS q75,
84 avg(water_level) AS mean,
85 min(water_level) AS min,
86 max(water_level) AS max
87 FROM waterway.gauge_measurements, g
88 WHERE fk_gauge_id = g.loc AND NOT predicted
89 GROUP BY extract(day from measure_date)::varchar || ':' ||
90 extract(month from measure_date)::varchar;
84 ` 91 `
85 selectYearWaterlevelsMeasuredSQL = ` 92 selectYearWaterlevelsMeasuredSQL = `
86 SELECT 93 SELECT
87 measure_date, 94 measure_date,
88 water_level 95 water_level
97 $5::int 104 $5::int
98 ) 105 )
99 AND measure_date BETWEEN $6 AND $7 106 AND measure_date BETWEEN $6 AND $7
100 ORDER BY measure_date 107 ORDER BY measure_date
101 ` 108 `
102
103 selectWaterlevelsMeasuredSQL = ` 109 selectWaterlevelsMeasuredSQL = `
104 SELECT 110 SELECT
105 measure_date, 111 measure_date,
106 water_level 112 water_level
107 FROM waterway.gauge_measurements 113 FROM waterway.gauge_measurements
262 http.StatusInternalServerError) 268 http.StatusInternalServerError)
263 return 269 return
264 } 270 }
265 defer rows.Close() 271 defer rows.Close()
266 272
267 type dayKey struct { 273 type result struct {
268 month byte 274 day int
269 day byte 275 month int
270 } 276 q25 float64
271 277 median float64
272 entries := make(map[dayKey][]float64) 278 q75 float64
279 mean float64
280 min float64
281 max float64
282 }
283
284 results := make([]result, 0, 366)
273 285
274 start := time.Now() 286 start := time.Now()
275 287
276 for rows.Next() { 288 for rows.Next() {
277 var value float64 289 var r result
278 var date time.Time 290 var dayMonth string
279 if err := rows.Scan(&date, &value); err != nil { 291 if err := rows.Scan(
292 &dayMonth,
293 &r.q25,
294 &r.median,
295 &r.q75,
296 &r.mean,
297 &r.min,
298 &r.max,
299 ); err != nil {
280 http.Error( 300 http.Error(
281 rw, fmt.Sprintf("error: %v", err), 301 rw, fmt.Sprintf("error: %v", err),
282 http.StatusInternalServerError) 302 http.StatusInternalServerError)
283 } 303 }
284 date = date.UTC() 304 parts := strings.SplitN(dayMonth, ":", 2)
285 key := dayKey{ 305 r.day, _ = strconv.Atoi(parts[0])
286 month: byte(date.Month()), 306 r.month, _ = strconv.Atoi(parts[1])
287 day: byte(date.Day()), 307 results = append(results, r)
288 }
289 entries[key] = append(entries[key], value)
290 } 308 }
291 309
292 if err := rows.Err(); err != nil { 310 if err := rows.Err(); err != nil {
293 http.Error( 311 http.Error(
294 rw, fmt.Sprintf("error: %v", err), 312 rw, fmt.Sprintf("error: %v", err),
296 return 314 return
297 } 315 }
298 316
299 log.Printf("info: loading entries took %s\n", time.Since(start)) 317 log.Printf("info: loading entries took %s\n", time.Since(start))
300 318
301 log.Printf("info: days found: %d\n", len(entries)) 319 log.Printf("info: days found: %d\n", len(results))
302
303 stats := time.Now()
304
305 type result struct {
306 key dayKey
307 values []float64
308 min float64
309 max float64
310 mean float64
311 median float64
312 q25 float64
313 q75 float64
314 }
315
316 results := make([]result, len(entries))
317
318 jobs := make(chan int)
319
320 var wg sync.WaitGroup
321
322 for i, n := 0, runtime.NumCPU(); i < n; i++ {
323 wg.Add(1)
324 go func() {
325 defer wg.Done()
326 for job := range jobs {
327 r := &results[job]
328 if len(r.values) == 0 {
329 continue
330 }
331 sort.Float64s(r.values)
332 r.min = r.values[0]
333 r.max = r.values[len(r.values)-1]
334 r.median = r.values[len(r.values)/2]
335 r.mean = stat.Mean(r.values, nil)
336 r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil)
337 r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil)
338 }
339 }()
340 }
341
342 var i int
343 for k, v := range entries {
344 results[i].key = k
345 results[i].values = v
346 jobs <- i
347 i++
348 }
349 close(jobs)
350 wg.Wait()
351 320
352 sort.Slice(results, func(i, j int) bool { 321 sort.Slice(results, func(i, j int) bool {
353 ki, kj := results[i].key, results[j].key 322 if d := int(results[i].month) - int(results[j].month); d != 0 {
354 if d := int(ki.month) - int(kj.month); d != 0 {
355 return d < 0 323 return d < 0
356 } 324 }
357 return ki.day < kj.day 325 return results[i].day < results[j].day
358 }) 326 })
359
360 log.Printf("info: calculating stats took %s\n", time.Since(stats))
361 327
362 rw.Header().Add("Content-Type", "text/csv") 328 rw.Header().Add("Content-Type", "text/csv")
363 329
364 out := csv.NewWriter(rw) 330 out := csv.NewWriter(rw)
365 331
379 return 345 return
380 } 346 }
381 347
382 for i := range results { 348 for i := range results {
383 r := &results[i] 349 r := &results[i]
384 record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month) 350 record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month)
385 record[1] = float64format(r.min) 351 record[1] = float64format(r.min)
386 record[2] = float64format(r.max) 352 record[2] = float64format(r.max)
387 record[3] = float64format(r.mean) 353 record[3] = float64format(r.mean)
388 record[4] = float64format(r.median) 354 record[4] = float64format(r.median)
389 record[5] = float64format(r.q25) 355 record[5] = float64format(r.q25)