Mercurial > gemma
view pkg/controllers/bottlenecks.go @ 4343:63c25eb9c07c
FA: be optimistic about missing data.
According to clarification, missing data has to be interpreted as the best case,
this is, because the services do not provide data for bottlenecks, which are not
considered a limitating factor on the water way at a given time.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Fri, 06 Sep 2019 17:22:42 +0200 |
parents | f543f9d4a0b5 |
children | 6ac94171a994 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectLimitingSQL = ` SELECT limiting FROM waterway.bottlenecks bn WHERE bn.validity @> current_timestamp AND objnam = $1 ` selectAvailableDepthSQL = ` WITH data AS ( SELECT efa.measure_date, efa.available_depth_value, efa.available_width_value, efa.water_level_value FROM waterway.effective_fairway_availability efa JOIN waterway.fairway_availability fa ON efa.fairway_availability_id = fa.id JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.bottleneck_id WHERE bn.validity @> current_timestamp AND bn.objnam = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND (efa.available_depth_value IS NOT NULL OR efa.available_width_value IS NOT NULL) AND efa.water_level_value IS NOT NULL ), before AS ( SELECT * FROM data WHERE measure_date < $3 ORDER BY measure_date DESC LIMIT 1 ), inside AS ( SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 ), after AS ( SELECT * FROM data WHERE measure_date > $4 ORDER BY measure_date LIMIT 1 ) SELECT * FROM before UNION ALL SELECT * FROM inside UNION ALL SELECT * FROM after ORDER BY measure_date ` selectGaugeLDCSQL = ` SELECT grwl.value FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity @> current_timestamp WHERE bns.validity @> current_timestamp AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%' ` ) type ( availMeasurement struct { when time.Time depth int16 width int16 value int16 } availMeasurements []availMeasurement ) // afdRefs are the typical available fairway depth reference values. var afdRefs = []float64{ 230, 250, } func (measurement *availMeasurement) getDepth() float64 { return float64(measurement.depth) } func (measurement *availMeasurement) getValue() float64 { return float64(measurement.value) } func (measurement *availMeasurement) getWidth() float64 { return float64(measurement.width) } func limitingFactor(limiting string) func(*availMeasurement) float64 { switch limiting { case "depth": return (*availMeasurement).getDepth case "width": return (*availMeasurement).getWidth default: log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting) return (*availMeasurement).getDepth } } // According to clarification, it has to be assumed, that at times // with no data, the best case (which by convention is the highest // class created by classify()) should be assumed. That is due to the // fact, that at times where bottlenecks are ot a limiting factor on // the waterway, services don't brovide any data for the bottleneck in // question. // // FIXME: A potentional improvement could be to intercest the time // ranges with the time ranges where bottlenecks were "active" (this // _might_ be derivable fromt the validity periods in the bottleneck // data. So it _might_ be possible do detect actual missing data (BN // valid, but no data from FA service). Anyway, this is left out for // now, as many clarification regarding the base assumtions would bee // needed and the results still might be unrelyable. func optimisticPadClassification( from, to time.Time, classified []time.Duration, ) []time.Duration { var actualDuration time.Duration maxDuration := to.Sub(from) for i := 0; i < len(classified); i++ { actualDuration += classified[i] } delta := maxDuration - actualDuration if delta > 0 { log.Printf("info: time interval: (%v - %v)\n", from, to) log.Printf("info: found only data for %.2f hours, padding by %.2f hours\n", actualDuration.Hours(), delta.Hours()) classified[len(classified)-1] = classified[len(classified)-1] + delta } return classified } func (measurements availMeasurements) classify( from, to time.Time, breaks []float64, access func(*availMeasurement) float64, ) []time.Duration { if len(breaks) == 0 { return []time.Duration{} } result := make([]time.Duration, len(breaks)+1) classes := make([]float64, len(breaks)+2) values := make([]time.Time, len(classes)) // Add sentinels classes[0] = breaks[0] - 9999 classes[len(classes)-1] = breaks[len(breaks)-1] + 9999 for i := range breaks { classes[i+1] = breaks[i] } idx := sort.Search(len(measurements), func(i int) bool { // All values before from can be ignored. return !measurements[i].when.Before(from) }) if idx >= len(measurements) { return optimisticPadClassification(from, to, result) } // Be safe for interpolation. if idx > 0 { idx-- } measurements = measurements[idx:] for i := 0; i < len(measurements)-1; i++ { p1 := &measurements[i] p2 := &measurements[i+1] if p1.when.After(to) { return optimisticPadClassification(from, to, result) } if p2.when.Before(from) { continue } lo, hi := maxTime(p1.when, from), minTime(p2.when, to) m1, m2 := access(p1), access(p2) if m1 == m2 { // The whole interval is in only one class. for j := 0; j < len(classes)-1; j++ { if classes[j] <= m1 && m1 <= classes[j+1] { result[j] += hi.Sub(lo) break } } continue } f := common.InterpolateTime( p1.when, m1, p2.when, m2, ) for j, c := range classes { values[j] = f(c) } for j := 0; j < len(values)-1; j++ { start, end := orderTime(values[j], values[j+1]) if start.After(hi) || end.Before(lo) { continue } start, end = maxTime(start, lo), minTime(end, hi) result[j] += end.Sub(start) } } return optimisticPadClassification(from, to, result) } func orderTime(a, b time.Time) (time.Time, time.Time) { if a.Before(b) { return a, b } return b, a } func minTime(a, b time.Time) time.Time { if a.Before(b) { return a } return b } func maxTime(a, b time.Time) time.Time { if a.After(b) { return a } return b } func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 { percents := make([]float64, len(classes)) total := 100 / duration.Seconds() for i, v := range classes { percents[i] = v.Seconds() * total } return percents } func parseFormTime( rw http.ResponseWriter, req *http.Request, field string, def time.Time, ) (time.Time, bool) { f := req.FormValue(field) if f == "" { return def.UTC(), true } v, err := common.ParseTime(f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return time.Time{}, false } return v.UTC(), true } func parseFormInt( rw http.ResponseWriter, req *http.Request, field string, def int, ) (int, bool) { f := req.FormValue(field) if f == "" { return def, true } v, err := strconv.Atoi(f) if err != nil { http.Error( rw, fmt.Sprintf("Invalid format for '%s': %v.", field, err), http.StatusBadRequest, ) return 0, false } return v, true } func intervalMode(mode string) int { switch strings.ToLower(mode) { case "monthly": return 0 case "quarterly": return 1 case "yearly": return 2 default: return 0 } } func loadDepthValues( ctx context.Context, conn *sql.Conn, bottleneck string, los int, from, to time.Time, ) (availMeasurements, error) { rows, err := conn.QueryContext( ctx, selectAvailableDepthSQL, bottleneck, los, from, to) if err != nil { return nil, err } defer rows.Close() var ms availMeasurements for rows.Next() { var m availMeasurement if err := rows.Scan( &m.when, &m.depth, &m.width, &m.value, ); err != nil { return nil, err } m.when = m.when.UTC() ms = append(ms, m) } if err := rows.Err(); err != nil { return nil, err } return ms, nil } func loadLDCReferenceValue( ctx context.Context, conn *sql.Conn, bottleneck string, ) ([]float64, error) { var value float64 err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return []float64{value}, nil } func breaksToReferenceValue(breaks string) []float64 { parts := strings.Split(breaks, ",") var values []float64 for _, part := range parts { part = strings.TrimSpace(part) if v, err := strconv.ParseFloat(part, 64); err == nil { values = append(values, v) } } sort.Float64s(values) // dedup for i := 1; i < len(values); { if values[i-1] == values[i] { copy(values[i:], values[i+1:]) values = values[:len(values)-1] } else { i++ } } return values } func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Internal server error: %v", err), http.StatusInternalServerError, ) return } if len(ldcRefs) == 0 { http.Error( rw, "No gauge reference values found for bottleneck", http.StatusNotFound, ) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } log.Printf("info: time interval: (%v - %v)\n", from, to) var ms availMeasurements if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil { return } if len(ms) == 0 { http.Error( rw, "No available fairway depth values found", http.StatusNotFound, ) return } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("#d < %.1f [%%]", v) } record[i+4] = fmt.Sprintf("#d >= %.1f [%%]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } interval := intervals[mode](from, to) now := time.Now() for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { // Don't interpolate for the future if now.Sub(pto) < 0 { pto = now } lnwl := ms.classify( pfrom, pto, ldcRefs, (*availMeasurement).getValue, ) afd := ms.classify( pfrom, pto, breaks, (*availMeasurement).getDepth, ) duration := pto.Sub(pfrom) lnwlPercents := durationsToPercentage(duration, lnwl) afdPercents := durationsToPercentage(duration, afd) record[0] = label for i, v := range lnwlPercents { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range afdPercents { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { mode := intervalMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() var limiting string err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting) switch { case err == sql.ErrNoRows: http.Error( rw, fmt.Sprintf("Unknown limitation for %s.", bn), http.StatusNotFound) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } access := limitingFactor(limiting) log.Printf("info: time interval: (%v - %v)\n", from, to) // load the measurements ms, err := loadDepthValues(ctx, conn, bn, los, from, to) if err != nil { http.Error( rw, fmt.Sprintf("Loading measurements failed: %v.", err), http.StatusInternalServerError) return } ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Loading LDC failed: %v.", err), http.StatusInternalServerError) return } if len(ldcRefs) == 0 { http.Error(rw, "No LDC found", http.StatusNotFound) return } var breaks []float64 if b := req.FormValue("breaks"); b != "" { breaks = breaksToReferenceValue(b) } else { breaks = afdRefs } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, ldc, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "#time" record[1] = fmt.Sprintf("# < LDC (%.1f) [h]", ldcRefs[0]) record[2] = fmt.Sprintf("# >= LDC (%.1f) [h]", ldcRefs[0]) for i, v := range breaks { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [h]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [h]", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } //log.Println(len(ms)) //for i := range ms { // log.Println(ms[i].when, ms[i].depth) //} log.Printf("info: measurements: %d\n", len(ms)) if len(ms) > 1 { log.Printf("info: first: %v\n", ms[0].when) log.Printf("info: last: %v\n", ms[len(ms)-1].when) log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours()) } interval := intervals[mode](from, to) now := time.Now() for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { // Don't interpolate for the future if now.Sub(pto) < 0 { pto = now } ldc := ms.classify( pfrom, pto, ldcRefs, access, ) ranges := ms.classify( pfrom, pto, breaks, access, ) record[0] = label for i, v := range ldc { record[i+1] = fmt.Sprintf("%.3f", v.Hours()) } for i, d := range ranges { record[3+i] = fmt.Sprintf("%.3f", d.Hours()) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){ monthly, quarterly, yearly, } func monthly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 1, 0) label := fmt.Sprintf("%02d-%d", f.Month(), f.Year()) return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label } } func quarterly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 3, 0) label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year()) return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label } } func yearly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(1, 0, 0) label := fmt.Sprintf("%d", f.Year()) return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label } }