diff pkg/controllers/gauges.go @ 2800:db1052bc162a

Added GET /data/longterm-waterlevels/{gauge}
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 25 Mar 2019 17:31:56 +0100
parents 2806821cfd63
children 46a9a7c1281f
line wrap: on
line diff
--- a/pkg/controllers/gauges.go	Mon Mar 25 16:16:58 2019 +0100
+++ b/pkg/controllers/gauges.go	Mon Mar 25 17:31:56 2019 +0100
@@ -19,8 +19,10 @@
 	"fmt"
 	"log"
 	"net/http"
+	"runtime"
 	"sort"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/gorilla/mux"
@@ -64,6 +66,22 @@
 FROM waterway.gauge_measurements
 WHERE
 `
+
+	selectAllWaterlevelsMeasuredSQL = `
+SELECT
+  measure_date,
+  water_level
+FROM waterway.gauge_measurements
+WHERE
+  NOT predicted
+  AND fk_gauge_id = (
+    $1::char(2),
+    $2::char(3),
+    $3::char(5),
+    $4::char(5),
+    $5::int
+  )
+`
 	selectWaterlevelsMeasuredSQL = `
 SELECT
   measure_date,
@@ -103,6 +121,177 @@
 	return "f"
 }
 
+func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) {
+
+	gauge := mux.Vars(req)["gauge"]
+
+	isrs, err := models.IsrsFromString(gauge)
+	if err != nil {
+		http.Error(
+			rw, fmt.Sprintf("error: Invalid ISRS code: %v", err),
+			http.StatusBadRequest)
+		return
+	}
+
+	conn := middleware.GetDBConn(req)
+
+	ctx := req.Context()
+
+	rows, err := conn.QueryContext(
+		ctx,
+		selectAllWaterlevelsMeasuredSQL,
+		isrs.CountryCode,
+		isrs.LoCode,
+		isrs.FairwaySection,
+		isrs.Orc,
+		isrs.Hectometre,
+	)
+	if err != nil {
+		http.Error(
+			rw, fmt.Sprintf("error: %v", err),
+			http.StatusInternalServerError)
+		return
+	}
+	defer rows.Close()
+
+	type dayKey struct {
+		month byte
+		day   byte
+	}
+
+	entries := make(map[dayKey][]float64)
+
+	start := time.Now()
+
+	for rows.Next() {
+		var value float64
+		var date time.Time
+		if err := rows.Scan(&date, &value); err != nil {
+			http.Error(
+				rw, fmt.Sprintf("error: %v", err),
+				http.StatusInternalServerError)
+		}
+		key := dayKey{
+			month: byte(date.Month()),
+			day:   byte(date.Day()),
+		}
+		entries[key] = append(entries[key], value)
+	}
+
+	if err := rows.Err(); err != nil {
+		http.Error(
+			rw, fmt.Sprintf("error: %v", err),
+			http.StatusInternalServerError)
+		return
+	}
+
+	log.Printf("info: loading entries took %s\n", time.Since(start))
+
+	log.Printf("info: days found: %d\n", len(entries))
+
+	stats := time.Now()
+
+	type result struct {
+		key    dayKey
+		values []float64
+		min    float64
+		max    float64
+		mean   float64
+		median float64
+		q25    float64
+		q75    float64
+	}
+
+	results := make([]result, len(entries))
+
+	jobs := make(chan int)
+
+	var wg sync.WaitGroup
+
+	for i, n := 0, runtime.NumCPU(); i < n; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for job := range jobs {
+				r := &results[job]
+				if len(r.values) == 0 {
+					continue
+				}
+				sort.Float64s(r.values)
+				r.min = r.values[0]
+				r.max = r.values[len(r.values)-1]
+				r.median = r.values[len(r.values)/2]
+				r.mean = stat.Mean(r.values, nil)
+				r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil)
+				r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil)
+			}
+		}()
+	}
+
+	var i int
+	for k, v := range entries {
+		results[i].key = k
+		results[i].values = v
+		jobs <- i
+		i++
+	}
+	close(jobs)
+	wg.Wait()
+
+	sort.Slice(results, func(i, j int) bool {
+		ki, kj := results[i].key, results[j].key
+		if d := int(ki.month) - int(kj.month); d != 0 {
+			return d < 0
+		}
+		return ki.day < kj.day
+	})
+
+	log.Printf("info: calculating stats took %s\n", time.Since(stats))
+
+	rw.Header().Add("Content-Type", "text/csv")
+
+	out := csv.NewWriter(rw)
+
+	record := []string{
+		"#date",
+		"#min",
+		"#max",
+		"#mean",
+		"#median",
+		"#q25",
+		"#q75",
+	}
+
+	if err := out.Write(record); err != nil {
+		log.Printf("error: %v\n", err)
+		// Too late for an HTTP error code.
+		return
+	}
+
+	for i := range results {
+		r := &results[i]
+		record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month)
+		record[1] = float64format(r.min)
+		record[2] = float64format(r.max)
+		record[3] = float64format(r.mean)
+		record[4] = float64format(r.median)
+		record[5] = float64format(r.q25)
+		record[6] = float64format(r.q75)
+		if err := out.Write(record); err != nil {
+			log.Printf("error: %v\n", err)
+			// Too late for an HTTP error code.
+			return
+		}
+	}
+
+	out.Flush()
+	if err := out.Error(); err != nil {
+		log.Printf("error: %v", err)
+		// Too late for an HTTP error code.
+		return
+	}
+}
+
 func averageWaterlevels(rw http.ResponseWriter, req *http.Request) {
 	gauge := mux.Vars(req)["gauge"]