changeset 2809:216bc6394911

Optimized longterm waterlevel statistics.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Mar 2019 12:45:50 +0100
parents 5b6de0bde6b6
children 97cf32cf2562
files pkg/controllers/gauges.go
diffstat 1 files changed, 47 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/gauges.go	Tue Mar 26 07:47:16 2019 +0100
+++ b/pkg/controllers/gauges.go	Tue Mar 26 12:45:50 2019 +0100
@@ -19,10 +19,9 @@
 	"fmt"
 	"log"
 	"net/http"
-	"runtime"
 	"sort"
 	"strconv"
-	"sync"
+	"strings"
 	"time"
 
 	"github.com/gorilla/mux"
@@ -68,19 +67,27 @@
 `
 
 	selectAllWaterlevelsMeasuredSQL = `
-SELECT
-  measure_date,
-  water_level
-FROM waterway.gauge_measurements
-WHERE
-  NOT predicted
-  AND fk_gauge_id = (
+WITH g AS (
+  SELECT (
     $1::char(2),
     $2::char(3),
     $3::char(5),
     $4::char(5),
-    $5::int
-  )
+    $5::int)::isrs loc
+)
+SELECT
+    extract(day from measure_date)::varchar || ':' ||
+    extract(month from measure_date)::varchar AS day_month,
+  percentile_disc(0.25) within group (order by water_level) AS q25,
+  percentile_disc(0.5)  within group (order by water_level) AS median,
+  percentile_disc(0.75) within group (order by water_level) AS q75,
+  avg(water_level) AS mean,
+  min(water_level) AS min,
+  max(water_level) AS max
+FROM waterway.gauge_measurements, g
+WHERE fk_gauge_id = g.loc AND NOT predicted
+GROUP BY extract(day from measure_date)::varchar || ':' ||
+         extract(month from measure_date)::varchar;
 `
 	selectYearWaterlevelsMeasuredSQL = `
 SELECT
@@ -99,7 +106,6 @@
   AND measure_date BETWEEN $6 AND $7
 ORDER BY measure_date
 `
-
 	selectWaterlevelsMeasuredSQL = `
 SELECT
   measure_date,
@@ -264,29 +270,41 @@
 	}
 	defer rows.Close()
 
-	type dayKey struct {
-		month byte
-		day   byte
+	type result struct {
+		day    int
+		month  int
+		q25    float64
+		median float64
+		q75    float64
+		mean   float64
+		min    float64
+		max    float64
 	}
 
-	entries := make(map[dayKey][]float64)
+	results := make([]result, 0, 366)
 
 	start := time.Now()
 
 	for rows.Next() {
-		var value float64
-		var date time.Time
-		if err := rows.Scan(&date, &value); err != nil {
+		var r result
+		var dayMonth string
+		if err := rows.Scan(
+			&dayMonth,
+			&r.q25,
+			&r.median,
+			&r.q75,
+			&r.mean,
+			&r.min,
+			&r.max,
+		); err != nil {
 			http.Error(
 				rw, fmt.Sprintf("error: %v", err),
 				http.StatusInternalServerError)
 		}
-		date = date.UTC()
-		key := dayKey{
-			month: byte(date.Month()),
-			day:   byte(date.Day()),
-		}
-		entries[key] = append(entries[key], value)
+		parts := strings.SplitN(dayMonth, ":", 2)
+		r.day, _ = strconv.Atoi(parts[0])
+		r.month, _ = strconv.Atoi(parts[1])
+		results = append(results, r)
 	}
 
 	if err := rows.Err(); err != nil {
@@ -298,67 +316,15 @@
 
 	log.Printf("info: loading entries took %s\n", time.Since(start))
 
-	log.Printf("info: days found: %d\n", len(entries))
-
-	stats := time.Now()
-
-	type result struct {
-		key    dayKey
-		values []float64
-		min    float64
-		max    float64
-		mean   float64
-		median float64
-		q25    float64
-		q75    float64
-	}
-
-	results := make([]result, len(entries))
-
-	jobs := make(chan int)
-
-	var wg sync.WaitGroup
-
-	for i, n := 0, runtime.NumCPU(); i < n; i++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			for job := range jobs {
-				r := &results[job]
-				if len(r.values) == 0 {
-					continue
-				}
-				sort.Float64s(r.values)
-				r.min = r.values[0]
-				r.max = r.values[len(r.values)-1]
-				r.median = r.values[len(r.values)/2]
-				r.mean = stat.Mean(r.values, nil)
-				r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil)
-				r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil)
-			}
-		}()
-	}
-
-	var i int
-	for k, v := range entries {
-		results[i].key = k
-		results[i].values = v
-		jobs <- i
-		i++
-	}
-	close(jobs)
-	wg.Wait()
+	log.Printf("info: days found: %d\n", len(results))
 
 	sort.Slice(results, func(i, j int) bool {
-		ki, kj := results[i].key, results[j].key
-		if d := int(ki.month) - int(kj.month); d != 0 {
+		if d := int(results[i].month) - int(results[j].month); d != 0 {
 			return d < 0
 		}
-		return ki.day < kj.day
+		return results[i].day < results[j].day
 	})
 
-	log.Printf("info: calculating stats took %s\n", time.Since(stats))
-
 	rw.Header().Add("Content-Type", "text/csv")
 
 	out := csv.NewWriter(rw)
@@ -381,7 +347,7 @@
 
 	for i := range results {
 		r := &results[i]
-		record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month)
+		record[0] = fmt.Sprintf("%02d-%02d", r.day, r.month)
 		record[1] = float64format(r.min)
 		record[2] = float64format(r.max)
 		record[3] = float64format(r.mean)