Mercurial > gemma
diff pkg/controllers/fwa.go @ 5259:680be197844d
Merged branch new-fwa.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 13 May 2020 11:28:34 +0200 |
parents | 256ebbeb1252 |
children | ed62f138528a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/controllers/fwa.go Wed May 13 11:28:34 2020 +0200 @@ -0,0 +1,930 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018, 2019, 2020 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package controllers + +import ( + "context" + "database/sql" + "encoding/csv" + "fmt" + "log" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/gorilla/mux" + + "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/middleware" +) + +const ( + selectBottlenecksLimitingSQL = ` +SELECT + lower(validity), + upper(validity), + limiting +FROM + waterway.bottlenecks +WHERE + bottleneck_id = $1 AND + validity && tstzrange($2, $3)` + + selectSymbolBottlenecksSQL = ` +SELECT + distinct(b.bottleneck_id) +FROM + %s s, waterway.bottlenecks b +WHERE + ST_Intersects(b.area, s.area) + AND s.name = $1 + AND b.validity && tstzrange($2, $3)` + + selectLDCsSQL = ` +SELECT + lower(grwl.validity), + upper(grwl.validity), + grwl.value +FROM + waterway.gauges_reference_water_levels grwl + JOIN waterway.bottlenecks bns + ON grwl.location = bns.gauge_location +WHERE + grwl.depth_reference like 'LDC%' + AND bns.bottleneck_id = $1 + AND grwl.validity && tstzrange($2, $3)` + + selectMeasurementsSQL = ` +WITH data AS ( + SELECT + efa.measure_date, + efa.available_depth_value, + efa.available_width_value, + efa.water_level_value + FROM waterway.effective_fairway_availability efa + JOIN waterway.fairway_availability fa + ON efa.fairway_availability_id = fa.id + JOIN waterway.bottlenecks bn + ON fa.bottleneck_id = bn.bottleneck_id + WHERE + bn.validity @> efa.measure_date AND + bn.bottleneck_id = $1 AND + efa.level_of_service = $2 AND + efa.measure_type = 'Measured' AND + (efa.available_depth_value IS NOT NULL OR + efa.available_width_value IS NOT NULL) AND + efa.water_level_value IS NOT NULL +), +before AS ( + SELECT * FROM data WHERE measure_date < $3 + ORDER BY measure_date DESC LIMIT 1 +), +inside AS ( + SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 +), +after AS ( + SELECT * FROM data WHERE measure_date > $4 + ORDER BY measure_date LIMIT 1 +) +SELECT * FROM before +UNION ALL +SELECT * FROM inside +UNION ALL +SELECT * FROM after +ORDER BY measure_date` +) + +type ( + timeRange struct { + lower time.Time + upper time.Time + } + + ldc struct { + timeRange + value []float64 + } + + ldcs []*ldc + + limitingFactor int + + limitingValidity struct { + timeRange + limiting limitingFactor + ldcs ldcs + } + + limitingValidities []limitingValidity + + availMeasurement struct { + when time.Time + depth int16 + width int16 + value int16 + } + + availMeasurements []availMeasurement + + bottleneck struct { + id string + validities limitingValidities + measurements availMeasurements + } + + bottlenecks []bottleneck + + fwaMode int +) + +const ( + fwaMonthly fwaMode = iota + fwaQuarterly + fwaYearly +) + +const ( + limitingDepth limitingFactor = iota + limitingWidth +) + +var limitingAccess = [...]func(*availMeasurement) float64{ + limitingDepth: (*availMeasurement).getDepth, + limitingWidth: (*availMeasurement).getWidth, +} + +// afdRefs are the typical available fairway depth reference values. +var afdRefs = []float64{ + 230, + 250, +} + +func (ls ldcs) find(from, to time.Time) *ldc { + for _, l := range ls { + if l.intersects(from, to) { + return l + } + } + return nil +} + +func fairwayAvailability(rw http.ResponseWriter, req *http.Request) { + + from, to, ok := parseFromTo(rw, req) + if !ok { + return + } + + vars := mux.Vars(req) + name := vars["name"] + if name == "" { + http.Error(rw, "missing 'name' parameter.", http.StatusBadRequest) + return + } + + los, ok := parseFormInt(rw, req, "los", 3) + if !ok { + return + } + + ctx := req.Context() + conn := middleware.GetDBConn(req) + + var bns bottlenecks + var err error + + switch vars["kind"] { + case "bottleneck": + bns = bottlenecks{{id: name}} + case "stretch": + bns, err = loadSymbolBottlenecks(ctx, conn, "users.stretches", name, from, to) + case "section": + bns, err = loadSymbolBottlenecks(ctx, conn, "waterway.sections", name, from, to) + default: + http.Error(rw, "Invalid kind type.", http.StatusBadRequest) + return + } + + if err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "cannot extract bottlenecks", http.StatusBadRequest) + return + } + + // If there are no bottlenecks there is nothing to do. + if len(bns) == 0 { + http.Error(rw, "No bottlenecks found.", http.StatusNotFound) + return + } + + // load validities and limiting factors + for i := range bns { + if err := bns[i].loadLimitingValidities(ctx, conn, from, to); err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "cannot load validities", http.StatusInternalServerError) + return + } + // load LCDs + if err := bns[i].loadLDCs(ctx, conn, from, to); err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "cannot load LDCs", http.StatusInternalServerError) + return + } + // load values + if err := bns[i].loadValues(ctx, conn, from, to, los); err != nil { + log.Printf("error: %v\n", err) + http.Error(rw, "cannot load values", http.StatusInternalServerError) + return + } + } + + // separate breaks for depth and width + var ( + breaks = parseBreaks(req.FormValue("breaks"), afdRefs) + depthBreaks = parseBreaks(req.FormValue("depthbreaks"), breaks) + widthBreaks = parseBreaks(req.FormValue("widthbreaks"), breaks) + chooseBreaks = [...][]float64{ + limitingDepth: depthBreaks, + limitingWidth: widthBreaks, + } + + useDepth = bns.hasLimiting(limitingDepth, from, to) + useWidth = bns.hasLimiting(limitingWidth, from, to) + ) + + if useDepth && useWidth && len(widthBreaks) != len(depthBreaks) { + http.Error( + rw, + fmt.Sprintf("class breaks lengths differ: %d != %d", + len(widthBreaks), len(depthBreaks)), + http.StatusBadRequest, + ) + return + } + + availability := vars["type"] == "availability" + + var record []string + if !availability { + // in days + record = makeHeader(useDepth && useWidth, 1, breaks, 'd') + } else { + // percentage + record = makeHeader(useDepth && useWidth, 3, breaks, '%') + } + + rw.Header().Add("Content-Type", "text/csv") + + out := csv.NewWriter(rw) + + if err := out.Write(record); err != nil { + // Too late for HTTP status message. + log.Printf("error: %v\n", err) + return + } + + for i := range record[1:] { + record[i+1] = "0" + } + + // For every day on every bottleneck we need to find out if this day is valid. + validities := make([]func(time.Time, time.Time) *limitingValidity, len(bns)) + for i := range bns { + validities[i] = bns[i].validities.find() + } + + // Mode reflects if we use monthly, quarterly od yearly intervals. + mode := parseFWAMode(req.FormValue("mode")) + + label, finish := interval(mode, from) + + var ( + totalDays, overLDCDays int + missingLDCs = make([]int, len(validities)) + counters = make([]int, len(breaks)+1) + ) + + var current, next time.Time + + write := func() error { + record[0] = label(current) + + if !availability { + record[1] = strconv.Itoa(totalDays - overLDCDays) + record[2] = strconv.Itoa(overLDCDays) + for i, c := range counters { + record[3+i] = strconv.Itoa(c) + } + } else { + overPerc := float64(overLDCDays) * 100 / float64(totalDays) + record[1] = fmt.Sprintf("%.3f", 100-overPerc) + record[2] = fmt.Sprintf("%.3f", overPerc) + for i, c := range counters { + perc := float64(c) * 100 / float64(totalDays) + record[3+i] = fmt.Sprintf("%.3f", perc) + } + } + + return out.Write(record) + } + + // Stop yesterday + end := common.MinTime(dusk(time.Now()).Add(-time.Nanosecond), to) + + // We step through the time in steps of one day. + for current = from; current.Before(end); { + + next = current.AddDate(0, 0, 1) + + // Assume that a bottleneck is over LDC. + overLDC := true + lowest := len(counters) - 1 + + var hasValid bool + + // check all bottlenecks + for i, validity := range validities { + + // Check if bottleneck is available for this day. + vs := validity(current, next) + if vs == nil { + continue + } + + // Let's see if we have a LDC for this day. + ldc := vs.ldcs.find(current, next) + if ldc == nil { + missingLDCs[i]++ + continue + } + + hasValid = true + + if overLDC { // If its already not shipable we need no further tests. + result := bns[i].measurements.classify( + current, next, + ldc.value, + (*availMeasurement).getValue) + + if result[1] < 12*time.Hour { + overLDC = false + } + } + + if min := minClass(bns[i].measurements.classify( + current, next, + chooseBreaks[vs.limiting], + limitingAccess[vs.limiting]), + 12*time.Hour, + ); min < lowest { + lowest = min + } + } + + if hasValid { + if overLDC { + overLDCDays++ + } + counters[lowest]++ + } else { // assume that all is in best conditions + overLDCDays++ + counters[len(counters)-1]++ + } + + totalDays++ + + if finish(next) { + if err := write(); err != nil { + // Too late for HTTP status message. + log.Printf("error: %v\n", err) + return + } + + // Reset counters + overLDCDays, totalDays = 0, 0 + for i := range counters { + counters[i] = 0 + } + } + + current = next + } + + // Write rest if last period was not finished. + if totalDays > 0 { + if err := write(); err != nil { + // Too late for HTTP status message. + log.Printf("error: %v\n", err) + return + } + } + + // TODO: Log missing LDCs + + out.Flush() + if err := out.Error(); err != nil { + // Too late for HTTP status message. + log.Printf("error: %v\n", err) + } +} + +func minClass(classes []time.Duration, threshold time.Duration) int { + var sum time.Duration + for i, v := range classes { + if sum += v; sum >= threshold { + return i + } + } + return len(classes) - 1 +} + +func dusk(t time.Time) time.Time { + return time.Date( + t.Year(), + t.Month(), + t.Day(), + 0, 0, 0, 0, + t.Location()) +} + +func dawn(t time.Time) time.Time { + return time.Date( + t.Year(), + t.Month(), + t.Day(), + 23, 59, 59, 999999999, + t.Location()) +} + +func parseFromTo( + rw http.ResponseWriter, + req *http.Request, +) (time.Time, time.Time, bool) { + from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) + if !ok { + return time.Time{}, time.Time{}, false + } + + to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) + if !ok { + return time.Time{}, time.Time{}, false + } + + from, to = common.OrderTime(from, to) + // Operate on daily basis so go to full days. + return dusk(from), dawn(to), true +} + +func parseFWAMode(mode string) fwaMode { + switch strings.ToLower(mode) { + case "monthly": + return fwaMonthly + case "quarterly": + return fwaQuarterly + case "yearly": + return fwaYearly + default: + return fwaMonthly + } +} + +func breaksToReferenceValue(breaks string) []float64 { + parts := strings.Split(breaks, ",") + var values []float64 + + for _, part := range parts { + part = strings.TrimSpace(part) + if v, err := strconv.ParseFloat(part, 64); err == nil { + values = append(values, v) + } + } + + return common.DedupFloat64s(values) +} + +func parseBreaks(breaks string, defaults []float64) []float64 { + if breaks != "" { + return breaksToReferenceValue(breaks) + } + return defaults +} + +func (tr *timeRange) intersects(from, to time.Time) bool { + return !(to.Before(tr.lower) || from.After(tr.upper)) +} + +func (tr *timeRange) toUTC() { + tr.lower = tr.lower.UTC() + tr.upper = tr.upper.UTC() +} + +func (lvs limitingValidities) find() func(from, to time.Time) *limitingValidity { + + var last *limitingValidity + + return func(from, to time.Time) *limitingValidity { + if last != nil && last.intersects(from, to) { + return last + } + for i := range lvs { + if lv := &lvs[i]; lv.intersects(from, to) { + last = lv + return lv + } + } + return nil + } +} + +func (lvs limitingValidities) hasLimiting(limiting limitingFactor, from, to time.Time) bool { + for i := range lvs { + if lvs[i].limiting == limiting && lvs[i].intersects(from, to) { + return true + } + } + return false +} + +func (bns bottlenecks) hasLimiting(limiting limitingFactor, from, to time.Time) bool { + for i := range bns { + if bns[i].validities.hasLimiting(limiting, from, to) { + return true + } + } + return false +} + +func parseLimitingFactor(limiting string) limitingFactor { + switch limiting { + case "depth": + return limitingDepth + case "width": + return limitingWidth + default: + log.Printf("warn: unknown limitation '%s'. default to 'depth'\n", limiting) + return limitingDepth + } +} + +func loadLimitingValidities( + ctx context.Context, + conn *sql.Conn, + bottleneckID string, + from, to time.Time, +) (limitingValidities, error) { + + var lvs limitingValidities + + rows, err := conn.QueryContext( + ctx, + selectBottlenecksLimitingSQL, + bottleneckID, + from, to) + + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var ( + lv limitingValidity + limiting string + upper sql.NullTime + ) + if err := rows.Scan( + &lv.lower, + &upper, + &limiting, + ); err != nil { + return nil, err + } + if upper.Valid { + lv.upper = upper.Time + } else { + lv.upper = to.Add(24 * time.Hour) + } + lv.toUTC() + lv.limiting = parseLimitingFactor(limiting) + lvs = append(lvs, lv) + } + + return lvs, rows.Err() +} + +func loadSymbolBottlenecks( + ctx context.Context, + conn *sql.Conn, + what, name string, + from, to time.Time, +) (bottlenecks, error) { + + rows, err := conn.QueryContext( + ctx, + fmt.Sprintf(selectSymbolBottlenecksSQL, what), + name, + from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var bns bottlenecks + + for rows.Next() { + var b bottleneck + if err := rows.Scan(&b.id); err != nil { + return nil, err + } + bns = append(bns, b) + } + + return bns, rows.Err() +} + +func (bn *bottleneck) loadLimitingValidities( + ctx context.Context, + conn *sql.Conn, + from, to time.Time, +) error { + vs, err := loadLimitingValidities( + ctx, + conn, + bn.id, + from, to) + if err == nil { + bn.validities = vs + } + return err +} + +func (bn *bottleneck) loadLDCs( + ctx context.Context, + conn *sql.Conn, + from, to time.Time, +) error { + rows, err := conn.QueryContext( + ctx, selectLDCsSQL, + bn.id, + from, to) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + l := ldc{value: []float64{0}} + var upper sql.NullTime + if err := rows.Scan(&l.lower, &upper, &l.value[0]); err != nil { + return err + } + if upper.Valid { + l.upper = upper.Time + } else { + l.upper = to.Add(24 * time.Hour) + } + l.toUTC() + for i := range bn.validities { + vs := &bn.validities[i] + + if vs.intersects(l.lower, l.upper) { + vs.ldcs = append(vs.ldcs, &l) + } + } + } + return rows.Err() +} + +func (bn *bottleneck) loadValues( + ctx context.Context, + conn *sql.Conn, + from, to time.Time, + los int, +) error { + rows, err := conn.QueryContext( + ctx, selectMeasurementsSQL, + bn.id, + los, + from, to) + if err != nil { + return err + } + defer rows.Close() + + var ms availMeasurements + + for rows.Next() { + var m availMeasurement + if err := rows.Scan( + &m.when, + &m.depth, + &m.width, + &m.value, + ); err != nil { + return err + } + m.when = m.when.UTC() + ms = append(ms, m) + } + if err := rows.Err(); err != nil { + return err + } + bn.measurements = ms + return nil +} + +func (measurement *availMeasurement) getDepth() float64 { + return float64(measurement.depth) +} + +func (measurement *availMeasurement) getValue() float64 { + return float64(measurement.value) +} + +func (measurement *availMeasurement) getWidth() float64 { + return float64(measurement.width) +} + +func (measurements availMeasurements) classify( + from, to time.Time, + breaks []float64, + access func(*availMeasurement) float64, +) []time.Duration { + + if len(breaks) == 0 { + return []time.Duration{} + } + + result := make([]time.Duration, len(breaks)+1) + classes := make([]float64, len(breaks)+2) + values := make([]time.Time, len(classes)) + + // Add sentinels + classes[0] = breaks[0] - 9999 + classes[len(classes)-1] = breaks[len(breaks)-1] + 9999 + for i := range breaks { + classes[i+1] = breaks[i] + } + + idx := sort.Search(len(measurements), func(i int) bool { + // All values before from can be ignored. + return !measurements[i].when.Before(from) + }) + + if idx >= len(measurements) { + return result + } + + // Be safe for interpolation. + if idx > 0 { + idx-- + } + + measurements = measurements[idx:] + + for i := 0; i < len(measurements)-1; i++ { + p1 := &measurements[i] + p2 := &measurements[i+1] + + if p1.when.After(to) { + return result + } + + if p2.when.Before(from) { + continue + } + + // TODO: Discuss if we want somethinh like this. + if false && p2.when.Sub(p1.when).Hours() > 1.5 { + // Don't interpolate ranges bigger then one and a half hour + continue + } + + lo, hi := common.MaxTime(p1.when, from), common.MinTime(p2.when, to) + + m1, m2 := access(p1), access(p2) + if m1 == m2 { // The whole interval is in only one class. + for j := 0; j < len(classes)-1; j++ { + if classes[j] <= m1 && m1 <= classes[j+1] { + result[j] += hi.Sub(lo) + break + } + } + continue + } + + f := common.InterpolateTime( + p1.when, m1, + p2.when, m2, + ) + + for j, c := range classes { + values[j] = f(c) + } + + for j := 0; j < len(values)-1; j++ { + start, end := common.OrderTime(values[j], values[j+1]) + + if start.After(hi) || end.Before(lo) { + continue + } + + start, end = common.MaxTime(start, lo), common.MinTime(end, hi) + result[j] += end.Sub(start) + } + } + + return result +} + +func interval(mode fwaMode, t time.Time) ( + label func(time.Time) string, + finish func(time.Time) bool, +) { + switch mode { + case fwaMonthly: + label, finish = monthLabel, otherMonth(t) + case fwaQuarterly: + label, finish = quarterLabel, otherQuarter(t) + case fwaYearly: + label, finish = yearLabel, otherYear(t) + default: + panic("Unknown mode") + } + return +} + +func monthLabel(t time.Time) string { + return fmt.Sprintf("%02d-%d", t.Month(), t.Year()) +} + +func quarterLabel(t time.Time) string { + return fmt.Sprintf("Q%d-%d", (int(t.Month())-1)/3+1, t.Year()) +} + +func yearLabel(t time.Time) string { + return strconv.Itoa(t.Year()) +} + +func otherMonth(t time.Time) func(time.Time) bool { + return func(x time.Time) bool { + flag := t.Day() == x.Day() + if flag { + t = x + } + return flag + } +} + +func otherQuarter(t time.Time) func(time.Time) bool { + return func(x time.Time) bool { + flag := (t.Month()-1)/3 != (x.Month()-1)/3 + if flag { + t = x + } + return flag + } +} + +func otherYear(t time.Time) func(time.Time) bool { + return func(x time.Time) bool { + flag := t.Year() != x.Year() + if flag { + t = x + } + return flag + } +} + +func makeHeader(flag bool, prec int, breaks []float64, unit rune) []string { + record := make([]string, 1+2+len(breaks)+1) + record[0] = "# time" + record[1] = fmt.Sprintf("# < LDC [%c]", unit) + record[2] = fmt.Sprintf("# >= LDC [%c]", unit) + for i, v := range breaks { + if flag { + if i == 0 { + record[3] = fmt.Sprintf("# < break_1 [%c]", unit) + } + record[i+4] = fmt.Sprintf("# >= break_%d [%c]", i+1, unit) + } else { + if i == 0 { + record[3] = fmt.Sprintf("# < %.*f [%c]", prec, v, unit) + } + record[i+4] = fmt.Sprintf("# >= %.*f [%c]", prec, v, unit) + } + } + return record +}