Mercurial > gemma
view pkg/controllers/fwa.go @ 5490:5f47eeea988d logging
Use own logging package.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 20 Sep 2021 17:45:39 +0200 |
parents | d71ebe576c76 |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019, 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "net/http" "sort" "strconv" "strings" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/log" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectBottlenecksLimitingSQL = ` SELECT lower(validity), upper(validity), limiting FROM waterway.bottlenecks WHERE bottleneck_id = $1 AND validity && tstzrange($2, $3)` selectSymbolBottlenecksSQL = ` SELECT distinct(b.bottleneck_id) FROM %s s, waterway.bottlenecks b WHERE ST_Intersects(b.area, s.area) AND s.name = $1 AND b.validity && tstzrange($2, $3)` selectLDCsSQL = ` SELECT lower(grwl.validity), upper(grwl.validity), grwl.value FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location WHERE grwl.depth_reference like 'LDC%' AND bns.bottleneck_id = $1 AND grwl.validity && tstzrange($2, $3)` selectMeasurementsSQL = ` WITH data AS ( SELECT efa.measure_date, efa.available_depth_value, efa.available_width_value, efa.water_level_value FROM waterway.effective_fairway_availability efa JOIN waterway.fairway_availability fa ON efa.fairway_availability_id = fa.id JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.bottleneck_id WHERE bn.validity @> efa.measure_date AND bn.bottleneck_id = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND (efa.available_depth_value IS NOT NULL OR efa.available_width_value IS NOT NULL) AND efa.water_level_value IS NOT NULL ), before AS ( SELECT * FROM data WHERE measure_date < $3 ORDER BY measure_date DESC LIMIT 1 ), inside AS ( SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 ), after AS ( SELECT * FROM data WHERE measure_date > $4 ORDER BY measure_date LIMIT 1 ) SELECT * FROM before UNION ALL SELECT * FROM inside UNION ALL SELECT * FROM after ORDER BY measure_date` ) type ( timeRange struct { lower time.Time upper time.Time } ldc struct { timeRange value []float64 } ldcs []*ldc limitingFactor int limitingValidity struct { timeRange limiting limitingFactor ldcs ldcs } limitingValidities []limitingValidity availMeasurement struct { when time.Time depth int16 width int16 value int16 } availMeasurements []availMeasurement bottleneck struct { id string validities limitingValidities measurements availMeasurements } bottlenecks []bottleneck fwaMode int ) const ( fwaMonthly fwaMode = iota fwaQuarterly fwaYearly ) const ( limitingDepth limitingFactor = iota limitingWidth ) var limitingAccess = [...]func(*availMeasurement) float64{ limitingDepth: (*availMeasurement).getDepth, limitingWidth: (*availMeasurement).getWidth, } // afdRefs are the typical available fairway depth reference values. var afdRefs = []float64{ 230, 250, } func (ls ldcs) find(from, to time.Time) *ldc { for _, l := range ls { if l.intersects(from, to) { return l } } return nil } func fairwayAvailability(rw http.ResponseWriter, req *http.Request) { from, to, ok := parseFromTo(rw, req) if !ok { return } vars := mux.Vars(req) name := vars["name"] if name == "" { http.Error(rw, "missing 'name' parameter.", http.StatusBadRequest) return } los, ok := parseFormInt(rw, req, "los", 3) if !ok { return } ctx := req.Context() conn := middleware.GetDBConn(req) var bns bottlenecks var err error switch vars["kind"] { case "bottleneck": bns = bottlenecks{{id: name}} case "stretch": bns, err = loadSymbolBottlenecks(ctx, conn, "users.stretches", name, from, to) case "section": bns, err = loadSymbolBottlenecks(ctx, conn, "waterway.sections", name, from, to) default: http.Error(rw, "Invalid kind type.", http.StatusBadRequest) return } if err != nil { log.Errorf("%v\n", err) http.Error(rw, "cannot extract bottlenecks", http.StatusBadRequest) return } // If there are no bottlenecks there is nothing to do. if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } // load validities and limiting factors for i := range bns { if err := bns[i].loadLimitingValidities(ctx, conn, from, to); err != nil { log.Errorf("%v\n", err) http.Error(rw, "cannot load validities", http.StatusInternalServerError) return } // load LCDs if err := bns[i].loadLDCs(ctx, conn, from, to); err != nil { log.Errorf("%v\n", err) http.Error(rw, "cannot load LDCs", http.StatusInternalServerError) return } // load values if err := bns[i].loadValues(ctx, conn, from, to, los); err != nil { log.Errorf("%v\n", err) http.Error(rw, "cannot load values", http.StatusInternalServerError) return } } // separate breaks for depth and width breaks, ok := parseBreaks(rw, req, "breaks", afdRefs) if !ok { return } depthBreaks, ok := parseBreaks(rw, req, "depthbreaks", breaks) if !ok { return } widthBreaks, ok := parseBreaks(rw, req, "widthbreaks", breaks) if !ok { return } chooseBreaks := [...][]float64{ limitingDepth: depthBreaks, limitingWidth: widthBreaks, } useDepth := bns.hasLimiting(limitingDepth, from, to) useWidth := bns.hasLimiting(limitingWidth, from, to) if useDepth && useWidth && len(widthBreaks) != len(depthBreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthBreaks), len(depthBreaks)), http.StatusBadRequest, ) return } availability := vars["type"] == "availability" var record []string if !availability { // in days record = makeHeader(useDepth && useWidth, 1, breaks, 'd') } else { // percentage record = makeHeader(useDepth && useWidth, 3, breaks, '%') } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Errorf("%v\n", err) return } for i := range record[1:] { record[i+1] = "0" } // For every day on every bottleneck we need to find out if this day is valid. validities := make([]func(time.Time, time.Time) *limitingValidity, len(bns)) for i := range bns { validities[i] = bns[i].validities.find() } // Mode reflects if we use monthly, quarterly od yearly intervals. mode := parseFWAMode(req.FormValue("mode")) label, finish := interval(mode, from) var ( totalDays, overLDCDays int missingLDCs = make([]int, len(validities)) counters = make([]int, len(breaks)+1) ) var current, next time.Time write := func() error { record[0] = label(current) if !availability { record[1] = strconv.Itoa(totalDays - overLDCDays) record[2] = strconv.Itoa(overLDCDays) for i, c := range counters { record[3+i] = strconv.Itoa(c) } } else { overPerc := float64(overLDCDays) * 100 / float64(totalDays) record[1] = fmt.Sprintf("%.3f", 100-overPerc) record[2] = fmt.Sprintf("%.3f", overPerc) for i, c := range counters { perc := float64(c) * 100 / float64(totalDays) record[3+i] = fmt.Sprintf("%.3f", perc) } } return out.Write(record) } // Stop yesterday end := common.MinTime(common.Dusk(time.Now()).Add(-time.Nanosecond), to) // We step through the time in steps of one day. for current = from; current.Before(end); { next = current.AddDate(0, 0, 1) // Assume that a bottleneck is over LDC. overLDC := true lowest := len(counters) - 1 var hasValid bool // check all bottlenecks for i, validity := range validities { // Check if bottleneck is available for this day. vs := validity(current, next) if vs == nil { continue } // Let's see if we have a LDC for this day. ldc := vs.ldcs.find(current, next) if ldc == nil { missingLDCs[i]++ continue } hasValid = true if overLDC { // If its already not shipable we need no further tests. result := bns[i].measurements.classify( current, next, ldc.value, (*availMeasurement).getValue) if (result[0] != 0 || result[1] != 0) && result[1] < 12*time.Hour { overLDC = false } } classes := bns[i].measurements.classify( current, next, chooseBreaks[vs.limiting], limitingAccess[vs.limiting]) if min := minClass(classes, 12*time.Hour); min < lowest { lowest = min } } if hasValid { if overLDC { overLDCDays++ } counters[lowest]++ } else { // assume that all is in best conditions overLDCDays++ counters[len(counters)-1]++ } totalDays++ if finish(next) { if err := write(); err != nil { // Too late for HTTP status message. log.Errorf("%v\n", err) return } // Reset counters overLDCDays, totalDays = 0, 0 for i := range counters { counters[i] = 0 } } current = next } // Write rest if last period was not finished. if totalDays > 0 { if err := write(); err != nil { // Too late for HTTP status message. log.Errorf("%v\n", err) return } } for i, days := range missingLDCs { if missingLDCs[i] > 0 { log.Warnf("warn: Missing LDCs for %s on %d days.\n", bns[i].id, days) } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Errorf("%v\n", err) } } func minClass(classes []time.Duration, threshold time.Duration) int { var sum time.Duration for i, v := range classes { if sum += v; sum >= threshold { return i } } return len(classes) - 1 } func parseFromTo( rw http.ResponseWriter, req *http.Request, ) (time.Time, time.Time, bool) { from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return time.Time{}, time.Time{}, false } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return time.Time{}, time.Time{}, false } from, to = common.OrderTime(from, to) // Operate on daily basis so go to full days. return common.Dusk(from), common.Dawn(to), true } func parseFWAMode(mode string) fwaMode { switch strings.ToLower(mode) { case "monthly": return fwaMonthly case "quarterly": return fwaQuarterly case "yearly": return fwaYearly default: return fwaMonthly } } func breaksToReferenceValue(breaks string) ([]float64, error) { parts := strings.Split(breaks, ",") var values []float64 for _, part := range parts { part = strings.TrimSpace(part) v, err := strconv.ParseFloat(part, 64) if err != nil { return nil, err } values = append(values, v) } return common.DedupFloat64s(values), nil } func parseBreaks( rw http.ResponseWriter, req *http.Request, parameter string, defaults []float64, ) ([]float64, bool) { breaks := strings.TrimSpace(req.FormValue(parameter)) if breaks == "" { return defaults, true } defaults, err := breaksToReferenceValue(breaks) if err != nil { msg := fmt.Sprintf("Parameter '%s' is invalid: %s.", parameter, err) log.Errorf("%s\n", msg) http.Error(rw, msg, http.StatusBadRequest) return nil, false } return defaults, true } func (tr *timeRange) intersects(from, to time.Time) bool { return !(to.Before(tr.lower) || from.After(tr.upper)) } func (tr *timeRange) toUTC() { tr.lower = tr.lower.UTC() tr.upper = tr.upper.UTC() } func (lvs limitingValidities) find() func(from, to time.Time) *limitingValidity { var last *limitingValidity return func(from, to time.Time) *limitingValidity { if last != nil && last.intersects(from, to) { return last } for i := range lvs { if lv := &lvs[i]; lv.intersects(from, to) { last = lv return lv } } return nil } } func (lvs limitingValidities) hasLimiting(limiting limitingFactor, from, to time.Time) bool { for i := range lvs { if lvs[i].limiting == limiting && lvs[i].intersects(from, to) { return true } } return false } func (bns bottlenecks) hasLimiting(limiting limitingFactor, from, to time.Time) bool { for i := range bns { if bns[i].validities.hasLimiting(limiting, from, to) { return true } } return false } func parseLimitingFactor(limiting string) limitingFactor { switch limiting { case "depth": return limitingDepth case "width": return limitingWidth default: log.Warnf("unknown limitation '%s'. default to 'depth'\n", limiting) return limitingDepth } } func loadLimitingValidities( ctx context.Context, conn *sql.Conn, bottleneckID string, from, to time.Time, ) (limitingValidities, error) { var lvs limitingValidities rows, err := conn.QueryContext( ctx, selectBottlenecksLimitingSQL, bottleneckID, from, to) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ( lv limitingValidity limiting string upper sql.NullTime ) if err := rows.Scan( &lv.lower, &upper, &limiting, ); err != nil { return nil, err } if upper.Valid { lv.upper = upper.Time } else { lv.upper = to.Add(24 * time.Hour) } lv.toUTC() lv.limiting = parseLimitingFactor(limiting) lvs = append(lvs, lv) } return lvs, rows.Err() } func loadSymbolBottlenecks( ctx context.Context, conn *sql.Conn, what, name string, from, to time.Time, ) (bottlenecks, error) { rows, err := conn.QueryContext( ctx, fmt.Sprintf(selectSymbolBottlenecksSQL, what), name, from, to) if err != nil { return nil, err } defer rows.Close() var bns bottlenecks for rows.Next() { var b bottleneck if err := rows.Scan(&b.id); err != nil { return nil, err } bns = append(bns, b) } return bns, rows.Err() } func (bn *bottleneck) loadLimitingValidities( ctx context.Context, conn *sql.Conn, from, to time.Time, ) error { vs, err := loadLimitingValidities( ctx, conn, bn.id, from, to) if err == nil { bn.validities = vs } return err } func (bn *bottleneck) loadLDCs( ctx context.Context, conn *sql.Conn, from, to time.Time, ) error { rows, err := conn.QueryContext( ctx, selectLDCsSQL, bn.id, from, to) if err != nil { return err } defer rows.Close() for rows.Next() { l := ldc{value: []float64{0}} var upper sql.NullTime if err := rows.Scan(&l.lower, &upper, &l.value[0]); err != nil { return err } if upper.Valid { l.upper = upper.Time } else { l.upper = to.Add(24 * time.Hour) } l.toUTC() for i := range bn.validities { vs := &bn.validities[i] if vs.intersects(l.lower, l.upper) { vs.ldcs = append(vs.ldcs, &l) } } } return rows.Err() } func (bn *bottleneck) loadValues( ctx context.Context, conn *sql.Conn, from, to time.Time, los int, ) error { rows, err := conn.QueryContext( ctx, selectMeasurementsSQL, bn.id, los, from, to) if err != nil { return err } defer rows.Close() var ms availMeasurements for rows.Next() { var m availMeasurement if err := rows.Scan( &m.when, &m.depth, &m.width, &m.value, ); err != nil { return err } m.when = m.when.UTC() ms = append(ms, m) } if err := rows.Err(); err != nil { return err } bn.measurements = ms return nil } func (measurement *availMeasurement) getDepth() float64 { return float64(measurement.depth) } func (measurement *availMeasurement) getValue() float64 { return float64(measurement.value) } func (measurement *availMeasurement) getWidth() float64 { return float64(measurement.width) } func (measurements availMeasurements) classify( from, to time.Time, breaks []float64, access func(*availMeasurement) float64, ) []time.Duration { if len(breaks) == 0 { return []time.Duration{} } result := make([]time.Duration, len(breaks)+1) classes := make([]float64, len(breaks)+2) values := make([]time.Time, len(classes)) // Add sentinels classes[0] = breaks[0] - 9999 classes[len(classes)-1] = breaks[len(breaks)-1] + 9999 for i := range breaks { classes[i+1] = breaks[i] } idx := sort.Search(len(measurements), func(i int) bool { // All values before from can be ignored. return !measurements[i].when.Before(from) }) if idx >= len(measurements) { return result } // Be safe for interpolation. if idx > 0 { idx-- } measurements = measurements[idx:] for i := 0; i < len(measurements)-1; i++ { p1 := &measurements[i] p2 := &measurements[i+1] if p1.when.After(to) { return result } if p2.when.Before(from) { continue } // TODO: Discuss if we want somethinh like this. if false && p2.when.Sub(p1.when).Hours() > 1.5 { // Don't interpolate ranges bigger then one and a half hour continue } lo, hi := common.MaxTime(p1.when, from), common.MinTime(p2.when, to) m1, m2 := access(p1), access(p2) if m1 == m2 { // The whole interval is in only one class. for j := 0; j < len(classes)-1; j++ { if classes[j] <= m1 && m1 <= classes[j+1] { result[j] += hi.Sub(lo) break } } continue } f := common.InterpolateTime( p1.when, m1, p2.when, m2, ) for j, c := range classes { values[j] = f(c) } for j := 0; j < len(values)-1; j++ { start, end := common.OrderTime(values[j], values[j+1]) if start.After(hi) || end.Before(lo) { continue } start, end = common.MaxTime(start, lo), common.MinTime(end, hi) result[j] += end.Sub(start) } } return result } func interval(mode fwaMode, t time.Time) ( label func(time.Time) string, finish func(time.Time) bool, ) { switch mode { case fwaMonthly: label, finish = monthLabel, otherMonth(t) case fwaQuarterly: label, finish = quarterLabel, otherQuarter(t) case fwaYearly: label, finish = yearLabel, otherYear(t) default: panic("Unknown mode") } return } func monthLabel(t time.Time) string { return fmt.Sprintf("%02d-%d", t.Month(), t.Year()) } func quarterLabel(t time.Time) string { return fmt.Sprintf("Q%d-%d", (int(t.Month())-1)/3+1, t.Year()) } func yearLabel(t time.Time) string { return strconv.Itoa(t.Year()) } func otherMonth(t time.Time) func(time.Time) bool { return func(x time.Time) bool { flag := t.Day() == x.Day() if flag { t = x } return flag } } func otherQuarter(t time.Time) func(time.Time) bool { return func(x time.Time) bool { flag := (t.Month()-1)/3 != (x.Month()-1)/3 if flag { t = x } return flag } } func otherYear(t time.Time) func(time.Time) bool { return func(x time.Time) bool { flag := t.Year() != x.Year() if flag { t = x } return flag } } func makeHeader(flag bool, prec int, breaks []float64, unit rune) []string { record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = fmt.Sprintf("# < LDC [%c]", unit) record[2] = fmt.Sprintf("# >= LDC [%c]", unit) for i, v := range breaks { if flag { if i == 0 { record[3] = fmt.Sprintf("# < break_1 [%c]", unit) } record[i+4] = fmt.Sprintf("# >= break_%d [%c]", i+1, unit) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.*f [%c]", prec, v, unit) } record[i+4] = fmt.Sprintf("# >= %.*f [%c]", prec, v, unit) } } return record }