Mercurial > gemma
comparison pkg/controllers/gauges.go @ 2800:db1052bc162a
Added GET /data/longterm-waterlevels/{gauge}
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 25 Mar 2019 17:31:56 +0100 |
parents | 2806821cfd63 |
children | 46a9a7c1281f |
comparison
equal
deleted
inserted
replaced
2799:e19fac818aab | 2800:db1052bc162a |
---|---|
17 "database/sql" | 17 "database/sql" |
18 "encoding/csv" | 18 "encoding/csv" |
19 "fmt" | 19 "fmt" |
20 "log" | 20 "log" |
21 "net/http" | 21 "net/http" |
22 "runtime" | |
22 "sort" | 23 "sort" |
23 "strconv" | 24 "strconv" |
25 "sync" | |
24 "time" | 26 "time" |
25 | 27 |
26 "github.com/gorilla/mux" | 28 "github.com/gorilla/mux" |
27 "gonum.org/v1/gonum/stat" | 29 "gonum.org/v1/gonum/stat" |
28 | 30 |
62 value_max, | 64 value_max, |
63 predicted | 65 predicted |
64 FROM waterway.gauge_measurements | 66 FROM waterway.gauge_measurements |
65 WHERE | 67 WHERE |
66 ` | 68 ` |
69 | |
70 selectAllWaterlevelsMeasuredSQL = ` | |
71 SELECT | |
72 measure_date, | |
73 water_level | |
74 FROM waterway.gauge_measurements | |
75 WHERE | |
76 NOT predicted | |
77 AND fk_gauge_id = ( | |
78 $1::char(2), | |
79 $2::char(3), | |
80 $3::char(5), | |
81 $4::char(5), | |
82 $5::int | |
83 ) | |
84 ` | |
67 selectWaterlevelsMeasuredSQL = ` | 85 selectWaterlevelsMeasuredSQL = ` |
68 SELECT | 86 SELECT |
69 measure_date, | 87 measure_date, |
70 water_level | 88 water_level |
71 FROM waterway.gauge_measurements | 89 FROM waterway.gauge_measurements |
101 return "t" | 119 return "t" |
102 } | 120 } |
103 return "f" | 121 return "f" |
104 } | 122 } |
105 | 123 |
124 func longtermWaterlevels(rw http.ResponseWriter, req *http.Request) { | |
125 | |
126 gauge := mux.Vars(req)["gauge"] | |
127 | |
128 isrs, err := models.IsrsFromString(gauge) | |
129 if err != nil { | |
130 http.Error( | |
131 rw, fmt.Sprintf("error: Invalid ISRS code: %v", err), | |
132 http.StatusBadRequest) | |
133 return | |
134 } | |
135 | |
136 conn := middleware.GetDBConn(req) | |
137 | |
138 ctx := req.Context() | |
139 | |
140 rows, err := conn.QueryContext( | |
141 ctx, | |
142 selectAllWaterlevelsMeasuredSQL, | |
143 isrs.CountryCode, | |
144 isrs.LoCode, | |
145 isrs.FairwaySection, | |
146 isrs.Orc, | |
147 isrs.Hectometre, | |
148 ) | |
149 if err != nil { | |
150 http.Error( | |
151 rw, fmt.Sprintf("error: %v", err), | |
152 http.StatusInternalServerError) | |
153 return | |
154 } | |
155 defer rows.Close() | |
156 | |
157 type dayKey struct { | |
158 month byte | |
159 day byte | |
160 } | |
161 | |
162 entries := make(map[dayKey][]float64) | |
163 | |
164 start := time.Now() | |
165 | |
166 for rows.Next() { | |
167 var value float64 | |
168 var date time.Time | |
169 if err := rows.Scan(&date, &value); err != nil { | |
170 http.Error( | |
171 rw, fmt.Sprintf("error: %v", err), | |
172 http.StatusInternalServerError) | |
173 } | |
174 key := dayKey{ | |
175 month: byte(date.Month()), | |
176 day: byte(date.Day()), | |
177 } | |
178 entries[key] = append(entries[key], value) | |
179 } | |
180 | |
181 if err := rows.Err(); err != nil { | |
182 http.Error( | |
183 rw, fmt.Sprintf("error: %v", err), | |
184 http.StatusInternalServerError) | |
185 return | |
186 } | |
187 | |
188 log.Printf("info: loading entries took %s\n", time.Since(start)) | |
189 | |
190 log.Printf("info: days found: %d\n", len(entries)) | |
191 | |
192 stats := time.Now() | |
193 | |
194 type result struct { | |
195 key dayKey | |
196 values []float64 | |
197 min float64 | |
198 max float64 | |
199 mean float64 | |
200 median float64 | |
201 q25 float64 | |
202 q75 float64 | |
203 } | |
204 | |
205 results := make([]result, len(entries)) | |
206 | |
207 jobs := make(chan int) | |
208 | |
209 var wg sync.WaitGroup | |
210 | |
211 for i, n := 0, runtime.NumCPU(); i < n; i++ { | |
212 wg.Add(1) | |
213 go func() { | |
214 defer wg.Done() | |
215 for job := range jobs { | |
216 r := &results[job] | |
217 if len(r.values) == 0 { | |
218 continue | |
219 } | |
220 sort.Float64s(r.values) | |
221 r.min = r.values[0] | |
222 r.max = r.values[len(r.values)-1] | |
223 r.median = r.values[len(r.values)/2] | |
224 r.mean = stat.Mean(r.values, nil) | |
225 r.q25 = stat.Quantile(0.25, stat.Empirical, r.values, nil) | |
226 r.q75 = stat.Quantile(0.75, stat.Empirical, r.values, nil) | |
227 } | |
228 }() | |
229 } | |
230 | |
231 var i int | |
232 for k, v := range entries { | |
233 results[i].key = k | |
234 results[i].values = v | |
235 jobs <- i | |
236 i++ | |
237 } | |
238 close(jobs) | |
239 wg.Wait() | |
240 | |
241 sort.Slice(results, func(i, j int) bool { | |
242 ki, kj := results[i].key, results[j].key | |
243 if d := int(ki.month) - int(kj.month); d != 0 { | |
244 return d < 0 | |
245 } | |
246 return ki.day < kj.day | |
247 }) | |
248 | |
249 log.Printf("info: calculating stats took %s\n", time.Since(stats)) | |
250 | |
251 rw.Header().Add("Content-Type", "text/csv") | |
252 | |
253 out := csv.NewWriter(rw) | |
254 | |
255 record := []string{ | |
256 "#date", | |
257 "#min", | |
258 "#max", | |
259 "#mean", | |
260 "#median", | |
261 "#q25", | |
262 "#q75", | |
263 } | |
264 | |
265 if err := out.Write(record); err != nil { | |
266 log.Printf("error: %v\n", err) | |
267 // Too late for an HTTP error code. | |
268 return | |
269 } | |
270 | |
271 for i := range results { | |
272 r := &results[i] | |
273 record[0] = fmt.Sprintf("%02d-%02d", r.key.day, r.key.month) | |
274 record[1] = float64format(r.min) | |
275 record[2] = float64format(r.max) | |
276 record[3] = float64format(r.mean) | |
277 record[4] = float64format(r.median) | |
278 record[5] = float64format(r.q25) | |
279 record[6] = float64format(r.q75) | |
280 if err := out.Write(record); err != nil { | |
281 log.Printf("error: %v\n", err) | |
282 // Too late for an HTTP error code. | |
283 return | |
284 } | |
285 } | |
286 | |
287 out.Flush() | |
288 if err := out.Error(); err != nil { | |
289 log.Printf("error: %v", err) | |
290 // Too late for an HTTP error code. | |
291 return | |
292 } | |
293 } | |
294 | |
106 func averageWaterlevels(rw http.ResponseWriter, req *http.Request) { | 295 func averageWaterlevels(rw http.ResponseWriter, req *http.Request) { |
107 gauge := mux.Vars(req)["gauge"] | 296 gauge := mux.Vars(req)["gauge"] |
108 | 297 |
109 isrs, err := models.IsrsFromString(gauge) | 298 isrs, err := models.IsrsFromString(gauge) |
110 if err != nil { | 299 if err != nil { |