Mercurial > gemma
view pkg/controllers/stretches.go @ 4343:63c25eb9c07c
FA: be optimistic about missing data.
According to clarification, missing data has to be interpreted as the best case,
this is, because the services do not provide data for bottlenecks, which are not
considered a limitating factor on the water way at a given time.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Fri, 06 Sep 2019 17:22:42 +0200 |
parents | 0f467a839fe2 |
children | 97312d7954ba |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "runtime" "strings" "sync" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectSectionBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.stretches s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` ) type ( stretchBottleneck struct { name string limiting string } stretchBottlenecks []stretchBottleneck fullStretchBottleneck struct { *stretchBottleneck measurements availMeasurements ldc []float64 breaks []float64 access func(*availMeasurement) float64 } ) func (bns stretchBottlenecks) contains(limiting string) bool { for i := range bns { if bns[i].limiting == limiting { return true } } return false } func loadFullStretchBottleneck( ctx context.Context, conn *sql.Conn, bn *stretchBottleneck, los int, from, to time.Time, depthbreaks, widthbreaks []float64, ) (*fullStretchBottleneck, error) { measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to) if err != nil { return nil, err } ldc, err := loadLDCReferenceValue(ctx, conn, bn.name) if err != nil { return nil, err } var access func(*availMeasurement) float64 var breaks []float64 switch bn.limiting { case "width": access = (*availMeasurement).getWidth breaks = widthbreaks case "depth": access = (*availMeasurement).getDepth breaks = depthbreaks default: log.Printf( "warn: unknown limitation '%s'. default to 'depth'.\n", bn.limiting) access = (*availMeasurement).getDepth breaks = depthbreaks } return &fullStretchBottleneck{ stretchBottleneck: bn, measurements: measurements, ldc: ldc, breaks: breaks, access: access, }, nil } func loadStretchBottlenecks( ctx context.Context, conn *sql.Conn, stretch bool, name string, ) (stretchBottlenecks, error) { var sql string if stretch { sql = selectStretchBottlenecks } else { sql = selectSectionBottlenecks } rows, err := conn.QueryContext(ctx, sql, name) if err != nil { return nil, err } defer rows.Close() var bns stretchBottlenecks for rows.Next() { var bn stretchBottleneck if err := rows.Scan( &bn.name, &bn.limiting, ); err != nil { return nil, err } bns = append(bns, bn) } if err := rows.Err(); err != nil { return nil, err } return bns, nil } func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) depthbreaks, widthbreaks := afdRefs, afdRefs from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []time.Duration breaks []time.Duration } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, bn.access, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } res.ldc = ldc res.breaks = breaks } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [h]" record[2] = "# >= LDC [h]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [h]" } record[i+4] = fmt.Sprintf("# >= break_%d", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [h]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [h]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } // Normalize to look like as we have only one bottleneck. scale := 1 / float64(len(loaded)) empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, r := range results { record[0] = r.label for i, v := range r.ldc { record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale) } for i, d := range r.breaks { record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func joinErrors(errors []error) string { var b strings.Builder for _, err := range errors { if b.Len() > 0 { b.WriteString(", ") } b.WriteString(err.Error()) } return b.String() } func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) if name == "" { http.Error( rw, fmt.Sprintf("Missing %s name", vars["kind"]), http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } depthbreaks, widthbreaks := afdRefs, afdRefs if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error( rw, "No bottlenecks found.", http.StatusNotFound, ) return } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []float64 breaks []float64 } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, (*availMeasurement).getValue, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } duration := res.to.Sub(res.from) * time.Duration(len(loaded)) res.ldc = durationsToPercentage(duration, ldc) res.breaks = durationsToPercentage(duration, breaks) } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [%%]" record[2] = "# >= LDC [%%]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [%%]" } record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.3f [%%]", v) } record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, res := range results { record[0] = res.label for i, v := range res.ldc { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range res.breaks { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } }