Mercurial > gemma
view pkg/controllers/bottlenecks.go @ 5220:142ac550bd9a new-fwa
Unify headers for CSV export.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 12 May 2020 10:15:32 +0200 |
parents | de417840dfee |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectLimitingSQL = ` SELECT limiting FROM waterway.bottlenecks bn WHERE bn.validity @> current_timestamp AND objnam = $1 ` selectAvailableDepthSQL = ` WITH data AS ( SELECT efa.measure_date, efa.available_depth_value, efa.available_width_value, efa.water_level_value FROM waterway.effective_fairway_availability efa JOIN waterway.fairway_availability fa ON efa.fairway_availability_id = fa.id JOIN waterway.bottlenecks bn ON fa.bottleneck_id = bn.bottleneck_id WHERE bn.validity @> current_timestamp AND bn.objnam = $1 AND efa.level_of_service = $2 AND efa.measure_type = 'Measured' AND (efa.available_depth_value IS NOT NULL OR efa.available_width_value IS NOT NULL) AND efa.water_level_value IS NOT NULL ), before AS ( SELECT * FROM data WHERE measure_date < $3 ORDER BY measure_date DESC LIMIT 1 ), inside AS ( SELECT * FROM data WHERE measure_date BETWEEN $3 AND $4 ), after AS ( SELECT * FROM data WHERE measure_date > $4 ORDER BY measure_date LIMIT 1 ) SELECT * FROM before UNION ALL SELECT * FROM inside UNION ALL SELECT * FROM after ORDER BY measure_date ` selectGaugeLDCSQL = ` SELECT grwl.value FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity @> COALESCE(upper(bns.validity), current_timestamp) WHERE lower(bns.validity) = (SELECT max(lower(validity)) FROM waterway.bottlenecks WHERE objnam = $1) AND bns.objnam = $1 AND grwl.depth_reference like 'LDC%' ` ) // According to clarification, it has to be assumed, that at times // with no data, the best case (which by convention is the highest // class created by classify()) should be assumed. That is due to the // fact, that at times where bottlenecks are not a limiting factor on // the waterway, services don't provide any data for the bottleneck in // question. // // FIXME: A potential improvement could be to intersect the time // ranges with the time ranges where bottlenecks were "active" (this // _might_ be derivable from the validity periods in the bottleneck // data. So it _might_ be possible do detect actual missing data (BN // valid, but no data from FA service). Anyway, this is left out for // now, as many clarification regarding the base assumtions would be // needed and the results still might be unrelyable. func optimisticPadClassification( from, to time.Time, classified []time.Duration, ) []time.Duration { var actualDuration time.Duration for _, v := range classified { actualDuration += v } // If the actual duration is smaller than the length // of the classifaction interval extend the // time spend in the highest class by the difference. if delta := to.Sub(from) - actualDuration; delta > 0 { classified[len(classified)-1] += delta } return classified } func durationsToPercentage(duration time.Duration, classes []time.Duration) []float64 { percents := make([]float64, len(classes)) total := 100 / duration.Seconds() for i, v := range classes { percents[i] = v.Seconds() * total } return percents } func loadDepthValues( ctx context.Context, conn *sql.Conn, bottleneck string, los int, from, to time.Time, ) (availMeasurements, error) { rows, err := conn.QueryContext( ctx, selectAvailableDepthSQL, bottleneck, los, from, to) if err != nil { return nil, err } defer rows.Close() var ms availMeasurements for rows.Next() { var m availMeasurement if err := rows.Scan( &m.when, &m.depth, &m.width, &m.value, ); err != nil { return nil, err } m.when = m.when.UTC() ms = append(ms, m) } if err := rows.Err(); err != nil { return nil, err } return ms, nil } func loadLDCReferenceValue( ctx context.Context, conn *sql.Conn, bottleneck string, ) ([]float64, error) { var value float64 err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, bottleneck).Scan(&value) switch { case err == sql.ErrNoRows: return nil, nil case err != nil: return nil, err } return []float64{value}, nil } func bottleneckAvailabilty(rw http.ResponseWriter, req *http.Request) { mode := parseFWAMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest, ) return } from, to, ok := parseFromTo(rw, req) if !ok { return } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() var limiting string err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting) switch { case err == sql.ErrNoRows: http.Error( rw, fmt.Sprintf("Unknown limitation for %s.", bn), http.StatusNotFound) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } access := limitingAccess[parseLimitingFactor(limiting)] ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Internal server error: %v", err), http.StatusInternalServerError, ) return } if len(ldcRefs) == 0 { http.Error( rw, "No gauge reference values found for bottleneck", http.StatusNotFound, ) return } breaks := parseBreaks(req.FormValue("breaks"), afdRefs) log.Printf("info: time interval: (%v - %v)\n", from, to) var ms availMeasurements if ms, err = loadDepthValues(ctx, conn, bn, los, from, to); err != nil { return } if len(ms) == 0 { http.Error( rw, "No available fairway depth values found", http.StatusNotFound, ) return } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := makeHeader(false, 1, breaks, '%') if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } interval := intervals[mode](from, to) now := time.Now() for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { // Don't interpolate for the future if now.Sub(pto) < 0 { pto = now } lnwl := ms.classify( pfrom, pto, ldcRefs, (*availMeasurement).getValue, ) afd := ms.classify( pfrom, pto, breaks, access, ) duration := pto.Sub(pfrom) lnwlPercents := durationsToPercentage(duration, lnwl) afdPercents := durationsToPercentage(duration, afd) record[0] = label for i, v := range lnwlPercents { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range afdPercents { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func bottleneckAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { mode := parseFWAMode(req.FormValue("mode")) bn := mux.Vars(req)["objnam"] if bn == "" { http.Error( rw, "Missing objnam of bottleneck", http.StatusBadRequest) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() var limiting string err := conn.QueryRowContext(ctx, selectLimitingSQL, bn).Scan(&limiting) switch { case err == sql.ErrNoRows: http.Error( rw, fmt.Sprintf("Unknown limitation for %s.", bn), http.StatusNotFound) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } access := limitingAccess[parseLimitingFactor(limiting)] log.Printf("info: time interval: (%v - %v)\n", from, to) // load the measurements ms, err := loadDepthValues(ctx, conn, bn, los, from, to) if err != nil { http.Error( rw, fmt.Sprintf("Loading measurements failed: %v.", err), http.StatusInternalServerError) return } ldcRefs, err := loadLDCReferenceValue(ctx, conn, bn) if err != nil { http.Error( rw, fmt.Sprintf("Loading LDC failed: %v.", err), http.StatusInternalServerError) return } if len(ldcRefs) == 0 { http.Error(rw, "No LDC found", http.StatusNotFound) return } breaks := parseBreaks(req.FormValue("breaks"), afdRefs) rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, ldc, classes record := makeHeader(false, 1, breaks, 'd') if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } //log.Println(len(ms)) //for i := range ms { // log.Println(ms[i].when, ms[i].depth) //} log.Printf("info: measurements: %d\n", len(ms)) if len(ms) > 1 { log.Printf("info: first: %v\n", ms[0].when) log.Printf("info: last: %v\n", ms[len(ms)-1].when) log.Printf("info: interval: %.2f [h]\n", ms[len(ms)-1].when.Sub(ms[0].when).Hours()) } interval := intervals[mode](from, to) now := time.Now() for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { // Don't interpolate for the future if now.Sub(pto) < 0 { pto = now } ldc := ms.classify( pfrom, pto, ldcRefs, (*availMeasurement).getValue, ) ranges := ms.classify( pfrom, pto, breaks, access, ) // Round to full days ldcRounded := common.RoundToFullDays(ldc) rangesRounded := common.RoundToFullDays(ranges) record[0] = label for i, v := range ldcRounded { record[i+1] = fmt.Sprintf("%d", v) } for i, d := range rangesRounded { record[3+i] = fmt.Sprintf("%d", d) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } var intervals = []func(time.Time, time.Time) func() (time.Time, time.Time, string){ fwaMonthly: monthly, fwaQuarterly: quarterly, fwaYearly: yearly, } func monthly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 1, 0) label := fmt.Sprintf("%02d-%d", f.Month(), f.Year()) return f, f.AddDate(0, 1, 0).Add(-time.Nanosecond), label } } func quarterly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(0, 3, 0) label := fmt.Sprintf("Q%d-%d", (int(f.Month())-1)/3+1, f.Year()) return f, f.AddDate(0, 3, 0).Add(-time.Nanosecond), label } } func yearly(from, to time.Time) func() (time.Time, time.Time, string) { pfrom := from return func() (time.Time, time.Time, string) { if pfrom.After(to) { return time.Time{}, time.Time{}, "" } f := pfrom pfrom = pfrom.AddDate(1, 0, 0) label := fmt.Sprintf("%d", f.Year()) return f, f.AddDate(1, 0, 0).Add(-time.Nanosecond), label } }