Mercurial > gemma
view pkg/controllers/bottlenecks.go @ 3676:f107e82b64ae
import_configuration: enhance schedule with country of user
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 17 Jun 2019 17:12:00 +0200 |
parents | db87f34805fb |
children | 268348a58c9c |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectLimitingSQL = ` SELECT limiting from waterway.bottlenecks WHERE bn.validity @> current_timestamp AND objnam = $1 ` selectAvailableDepthSQL = ` WITH data AS ( SELECT efa.measure_date, efa.available_depth_value, efa.available_width_value, efa.water_level_value FROM waterway.effective_fairway_availability efa JOIN waterway.fairway_availability fa ON efa.fairway_availability_id = fa.id JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.id WHERE bn.validity @> current_timestamp AND bn.objnam = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND (efa.available_depth_value IS NOT NULL OR efa.available_width_value IS NOT NULL) AND efa.water_level_value IS NOT NULL ), before AS ( SELECT * FROM data WHERE measure_date < $3 ORDER BY measure_date DESC LIMIT 1 ), inside AS ( SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 ), after AS ( SELECT * FROM data WHERE measure_date > $4 ORDER BY measure_date LIMIT 1 ) SELECT * FROM before UNION ALL SELECT * FROM inside UNION ALL SELECT * FROM after ORDER BY measure_date ` selectGaugeLDCSQL = ` SELECT grwl.value FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity WHERE bns.validity @> current_timestamp AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%' ` ) type ( availMeasurement struct { when time.Time depth int16 width int16 value int16 } availMeasurements []availMeasurement ) // afdRefs are the typical available fairway depth reference values. var afdRefs = []float64{ 230, 250, } func (measurement *availMeasurement) getDepth() float64 { return float64(measurement.depth) } func (measurement *availMeasurement) getValue() float64 { return float64(measurement.value) } func (measurement *availMeasurement) getWidth() float64 { return float64(measurement.width) } func limitingFactor(limiting string) func(*availMeasurement) float64 { switch limiting { case "depth": return (*availMeasurement).getDepth case "width": return (*availMeasurement).getWidth default: log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting) return (*availMeasurement).getDepth } } func (measurements availMeasurements) classify( from, to time.Time, breaks []float64, access func(*availMeasurement) float64, ) []time.Duration { if len(breaks) == 0 { return []time.Duration{} } result := make([]time.Duration, len(breaks)+1) classes := make([]float64, len(breaks)+2) values := make([]time.Time, len(classes)) // Add sentinels classes[0] = breaks[0] - 9999 classes[len(classes)-1] = breaks[len(breaks)-1] + 9999 for i := range breaks { classes[i+1] = breaks[i] } idx := sort.Search(len(measurements), func(i int) bool { // All values before from can be ignored. return !measurements[i].when.Before(from) }) if idx >= len(measurements) { return result } // Be safe for interpolation. if idx > 0 { idx-- } measurements = measurements[idx:] for i := 0; i < len(measurements)-1; i++ { p1 := &measurements[i] p2 := &measurements[i+1] if p1.when.After(to) { return result } if p2.when.Before(from) { continue } lo, hi := maxTime(p1.when, from), minTime(p2.when, to) m1, m2 := access(p1), access(p2) if m1 == m2 { // The whole interval is in only one class. for j := 0; j < len(classes)-1; j++ { if classes[j] <= m1 && m1 <= classes[j+1] { result[j] += hi.Sub(lo) break } } continue } f := common.InterpolateTime( p1.when, m1, p2.when, m2, ) for j, c := range classes { values[j] = f(c) } for j := 0; j < len(values)-1; j++ { start, end := orderTime(values[j], values[j+1]) if start.After(hi) || end.Before(lo) { continue } start, end = maxTime(start, lo), minTime(end, hi) result[j] += end.Sub(start) } } return result } func orderTime(a, b time.Time) (time.Time, time.Time) { if a.Before(b) { return a, b } return b, a } func minTime(a, b time.Time) time.Time { if a.Before(b) { return a } return b } func maxTime(a, b time.Time) time.Time { if a.After(b) { return a } return b } func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 { percents := make([]float64, len(classes)) total := 100 / duration.Seconds() for i, v := range classes { percents[i] = v.Seconds() * total } return percents } func parseFormTime( rw http.ResponseWriter, req *http.Request, field string, def time.Time, ) (time.Time, bool) { f := req.FormValue(field) if f == "" { return def.UTC(), true } v, err := time.Parse(common.TimeFormat, f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return time.Time{}, false } return v.UTC(), true } func parseFormInt( rw http.ResponseWriter, req *http.Request, field string, def int, ) (int, bool) { f := req.FormValue(field) if f == "" { return def, true } v, err := strconv.Atoi(f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return 0, false } return v, true } func intervalMode(mode string) int { switch strings.ToLower(mode) { case "monthly": return 0 case "quarterly": return 1 case "yearly": return 2 default: return 0 } } func loadDepthValues( ctx context.Context, conn *sql.Conn, bottleneck string, los int, from, to time.Time, ) (availMeasurements, error) { rows, err := conn.QueryContext( ctx, selectAvailableDepthSQL, bottleneck, los, from, to) if err != nil { return nil, err } defer rows.Close() var ms availMeasurements for rows.Next() { var m availMeasurement if err := rows.Scan( &m.when, &m.depth, &m.width, &m.value, ); err != nil { return nil, err } m.when = m.when.UTC() ms = append(ms, m) } if err := rows.Err(); err != nil { return nil, err } return ms, nil } func loadLDCReferenceValue( ctx context.Context, conn *sql.Conn, bottleneck string, ) ([]float64, error) { var value float64 err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return []float64{value}, nil } func breaksToReferenceValue(breaks string) []float64 { parts := strings.Split(breaks, ",") var values []float64 for _, part := range parts { part = strings.TrimSpace(part) if v, err := strconv.ParseFloat(part, 64); err == nil { values = append(values, v) } } sort.Float64s(values) // dedup for i := 1; i < len(values); { if values[i-1] == values[i] { copy(values[i:], values[i+1:]) values = values[:len(values)-1] } else { i++ } } return values } func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Internal server error: %v", err), http.StatusInternalServerError, ) return } if len(ldcRefs) == 0 { http.Error( rw, "No gauge reference values found for bottleneck", http.StatusNotFound, ) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } log.Printf("info: time interval: (%v - %v)\n", from, to) var ms availMeasurements if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil { return } if len(ms) == 0 { http.Error( rw, "No available fairway depth values found", http.StatusNotFound, ) return } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("#d < %.1f [%%]", v) } record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } interval := intervals[mode](from, to) for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { lnwl := ms.classify( pfrom, pto, ldcRefs, (*availMeasurement).getValue, ) afd := ms.classify( pfrom, pto, breaks, (*availMeasurement).getDepth, ) duration := pto.Sub(pfrom) lnwlPercents := durationsToPercentage(duration, lnwl) afdPercents := durationsToPercentage(duration, afd) record[0] = label for i, v := range lnwlPercents { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range afdPercents { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } return } func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() var limiting string err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting) switch { case err == sql.ErrNoRows: http.Error( rw, fmt.Sprintf("Unknown limitation for %s.", bn), http.StatusNotFound) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } access := limitingFactor(limiting) log.Printf("info: time interval: (%v - %v)\n", from, to) // load the measurements ms, err := loadDepthValues(ctx, conn, bn, los, from, to) if err != nil { http.Error( rw, fmt.Sprintf("Loading measurements failed: %v.", err), http.StatusInternalServerError) return } ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Loading LDC failed: %v.", err), http.StatusInternalServerError) return } if len(ldcRefs) == 0 { http.Error(rw, "No LDC found", http.StatusNotFound) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, ldc, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [h]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [h]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } //log.Println(len(ms)) //for i := range ms { // log.Println(ms[i].when, ms[i].depth) //} log.Printf("info: measurements: %d\n", len(ms)) if len(ms) > 1 { log.Printf("info: first: %v\n", ms[0].when) log.Printf("info: last: %v\n", ms[len(ms)-1].when) log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours()) } interval := intervals[mode](from, to) for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { ldc := ms.classify( pfrom, pto, ldcRefs, access, ) ranges := ms.classify( pfrom, pto, breaks, access, ) record[0] = label for i, v := range ldc { record[i+1] = fmt.Sprintf("%.3f", v.Hours()) } for i, d := range ranges { record[3+i] = fmt.Sprintf("%.3f", d.Hours()) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){ monthly, quarterly, yearly, } func monthly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 1, 0) label := fmt.Sprintf("%02d-%d", f.Month(), f.Year()) return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label } } func quarterly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 3, 0) label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year()) return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label } } func yearly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(1, 0, 0) label := fmt.Sprintf("%d", f.Year()) return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label } }