Mercurial > gemma
view pkg/controllers/bottlenecks.go @ 3678:8f58851927c0
client: make layer factory only return new layer config for individual maps
instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers.
But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory
reuses the same objects it created before, per map.
author | Markus Kottlaender <markus@intevation.de> |
---|---|
date | Mon, 17 Jun 2019 17:31:35 +0200 |
parents | db87f34805fb |
children | 268348a58c9c |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectLimitingSQL = ` SELECT limiting from waterway.bottlenecks WHERE bn.validity @> current_timestamp AND objnam = $1 ` selectAvailableDepthSQL = ` WITH data AS ( SELECT efa.measure_date, efa.available_depth_value, efa.available_width_value, efa.water_level_value FROM waterway.effective_fairway_availability efa JOIN waterway.fairway_availability fa ON efa.fairway_availability_id = fa.id JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.id WHERE bn.validity @> current_timestamp AND bn.objnam = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND (efa.available_depth_value IS NOT NULL OR efa.available_width_value IS NOT NULL) AND efa.water_level_value IS NOT NULL ), before AS ( SELECT * FROM data WHERE measure_date < $3 ORDER BY measure_date DESC LIMIT 1 ), inside AS ( SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 ), after AS ( SELECT * FROM data WHERE measure_date > $4 ORDER BY measure_date LIMIT 1 ) SELECT * FROM before UNION ALL SELECT * FROM inside UNION ALL SELECT * FROM after ORDER BY measure_date ` selectGaugeLDCSQL = ` SELECT grwl.value FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity WHERE bns.validity @> current_timestamp AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%' ` ) type ( availMeasurement struct { when time.Time depth int16 width int16 value int16 } availMeasurements []availMeasurement ) // afdRefs are the typical available fairway depth reference values. var afdRefs = []float64{ 230, 250, } func (measurement *availMeasurement) getDepth() float64 { return float64(measurement.depth) } func (measurement *availMeasurement) getValue() float64 { return float64(measurement.value) } func (measurement *availMeasurement) getWidth() float64 { return float64(measurement.width) } func limitingFactor(limiting string) func(*availMeasurement) float64 { switch limiting { case "depth": return (*availMeasurement).getDepth case "width": return (*availMeasurement).getWidth default: log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting) return (*availMeasurement).getDepth } } func (measurements availMeasurements) classify( from, to time.Time, breaks []float64, access func(*availMeasurement) float64, ) []time.Duration { if len(breaks) == 0 { return []time.Duration{} } result := make([]time.Duration, len(breaks)+1) classes := make([]float64, len(breaks)+2) values := make([]time.Time, len(classes)) // Add sentinels classes[0] = breaks[0] - 9999 classes[len(classes)-1] = breaks[len(breaks)-1] + 9999 for i := range breaks { classes[i+1] = breaks[i] } idx := sort.Search(len(measurements), func(i int) bool { // All values before from can be ignored. return !measurements[i].when.Before(from) }) if idx >= len(measurements) { return result } // Be safe for interpolation. if idx > 0 { idx-- } measurements = measurements[idx:] for i := 0; i < len(measurements)-1; i++ { p1 := &measurements[i] p2 := &measurements[i+1] if p1.when.After(to) { return result } if p2.when.Before(from) { continue } lo, hi := maxTime(p1.when, from), minTime(p2.when, to) m1, m2 := access(p1), access(p2) if m1 == m2 { // The whole interval is in only one class. for j := 0; j < len(classes)-1; j++ { if classes[j] <= m1 && m1 <= classes[j+1] { result[j] += hi.Sub(lo) break } } continue } f := common.InterpolateTime( p1.when, m1, p2.when, m2, ) for j, c := range classes { values[j] = f(c) } for j := 0; j < len(values)-1; j++ { start, end := orderTime(values[j], values[j+1]) if start.After(hi) || end.Before(lo) { continue } start, end = maxTime(start, lo), minTime(end, hi) result[j] += end.Sub(start) } } return result } func orderTime(a, b time.Time) (time.Time, time.Time) { if a.Before(b) { return a, b } return b, a } func minTime(a, b time.Time) time.Time { if a.Before(b) { return a } return b } func maxTime(a, b time.Time) time.Time { if a.After(b) { return a } return b } func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 { percents := make([]float64, len(classes)) total := 100 / duration.Seconds() for i, v := range classes { percents[i] = v.Seconds() * total } return percents } func parseFormTime( rw http.ResponseWriter, req *http.Request, field string, def time.Time, ) (time.Time, bool) { f := req.FormValue(field) if f == "" { return def.UTC(), true } v, err := time.Parse(common.TimeFormat, f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return time.Time{}, false } return v.UTC(), true } func parseFormInt( rw http.ResponseWriter, req *http.Request, field string, def int, ) (int, bool) { f := req.FormValue(field) if f == "" { return def, true } v, err := strconv.Atoi(f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return 0, false } return v, true } func intervalMode(mode string) int { switch strings.ToLower(mode) { case "monthly": return 0 case "quarterly": return 1 case "yearly": return 2 default: return 0 } } func loadDepthValues( ctx context.Context, conn *sql.Conn, bottleneck string, los int, from, to time.Time, ) (availMeasurements, error) { rows, err := conn.QueryContext( ctx, selectAvailableDepthSQL, bottleneck, los, from, to) if err != nil { return nil, err } defer rows.Close() var ms availMeasurements for rows.Next() { var m availMeasurement if err := rows.Scan( &m.when, &m.depth, &m.width, &m.value, ); err != nil { return nil, err } m.when = m.when.UTC() ms = append(ms, m) } if err := rows.Err(); err != nil { return nil, err } return ms, nil } func loadLDCReferenceValue( ctx context.Context, conn *sql.Conn, bottleneck string, ) ([]float64, error) { var value float64 err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return []float64{value}, nil } func breaksToReferenceValue(breaks string) []float64 { parts := strings.Split(breaks, ",") var values []float64 for _, part := range parts { part = strings.TrimSpace(part) if v, err := strconv.ParseFloat(part, 64); err == nil { values = append(values, v) } } sort.Float64s(values) // dedup for i := 1; i < len(values); { if values[i-1] == values[i] { copy(values[i:], values[i+1:]) values = values[:len(values)-1] } else { i++ } } return values } func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Internal server error: %v", err), http.StatusInternalServerError, ) return } if len(ldcRefs) == 0 { http.Error( rw, "No gauge reference values found for bottleneck", http.StatusNotFound, ) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } log.Printf("info: time interval: (%v - %v)\n", from, to) var ms availMeasurements if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil { return } if len(ms) == 0 { http.Error( rw, "No available fairway depth values found", http.StatusNotFound, ) return } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("#d < %.1f [%%]", v) } record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } interval := intervals[mode](from, to) for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { lnwl := ms.classify( pfrom, pto, ldcRefs, (*availMeasurement).getValue, ) afd := ms.classify( pfrom, pto, breaks, (*availMeasurement).getDepth, ) duration := pto.Sub(pfrom) lnwlPercents := durationsToPercentage(duration, lnwl) afdPercents := durationsToPercentage(duration, afd) record[0] = label for i, v := range lnwlPercents { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range afdPercents { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } return } func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() var limiting string err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting) switch { case err == sql.ErrNoRows: http.Error( rw, fmt.Sprintf("Unknown limitation for %s.", bn), http.StatusNotFound) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } access := limitingFactor(limiting) log.Printf("info: time interval: (%v - %v)\n", from, to) // load the measurements ms, err := loadDepthValues(ctx, conn, bn, los, from, to) if err != nil { http.Error( rw, fmt.Sprintf("Loading measurements failed: %v.", err), http.StatusInternalServerError) return } ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Loading LDC failed: %v.", err), http.StatusInternalServerError) return } if len(ldcRefs) == 0 { http.Error(rw, "No LDC found", http.StatusNotFound) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, ldc, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [h]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [h]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } //log.Println(len(ms)) //for i := range ms { // log.Println(ms[i].when, ms[i].depth) //} log.Printf("info: measurements: %d\n", len(ms)) if len(ms) > 1 { log.Printf("info: first: %v\n", ms[0].when) log.Printf("info: last: %v\n", ms[len(ms)-1].when) log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours()) } interval := intervals[mode](from, to) for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { ldc := ms.classify( pfrom, pto, ldcRefs, access, ) ranges := ms.classify( pfrom, pto, breaks, access, ) record[0] = label for i, v := range ldc { record[i+1] = fmt.Sprintf("%.3f", v.Hours()) } for i, d := range ranges { record[3+i] = fmt.Sprintf("%.3f", d.Hours()) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){ monthly, quarterly, yearly, } func monthly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 1, 0) label := fmt.Sprintf("%02d-%d", f.Month(), f.Year()) return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label } } func quarterly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 3, 0) label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year()) return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label } } func yearly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(1, 0, 0) label := fmt.Sprintf("%d", f.Year()) return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label } }