Mercurial > gemma
view pkg/controllers/stretches.go @ 4407:8c0f2377ff47
import_overview: DSR review details added
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Tue, 17 Sep 2019 10:29:36 +0200 |
parents | 5e38667f740c |
children | 2077347ef345 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "runtime" "strings" "sync" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectSectionBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM users.stretches s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` ) type ( stretchBottleneck struct { name string limiting string } stretchBottlenecks []stretchBottleneck fullStretchBottleneck struct { *stretchBottleneck measurements availMeasurements ldc []float64 breaks []float64 access func(*availMeasurement) float64 } ) func (bns stretchBottlenecks) contains(limiting string) bool { for i := range bns { if bns[i].limiting == limiting { return true } } return false } func loadFullStretchBottleneck( ctx context.Context, conn *sql.Conn, bn *stretchBottleneck, los int, from, to time.Time, depthbreaks, widthbreaks []float64, ) (*fullStretchBottleneck, error) { measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to) if err != nil { return nil, err } ldc, err := loadLDCReferenceValue(ctx, conn, bn.name) if err != nil { return nil, err } var access func(*availMeasurement) float64 var breaks []float64 switch bn.limiting { case "width": access = (*availMeasurement).getWidth breaks = widthbreaks case "depth": access = (*availMeasurement).getDepth breaks = depthbreaks default: log.Printf( "warn: unknown limitation '%s'. default to 'depth'.\n", bn.limiting) access = (*availMeasurement).getDepth breaks = depthbreaks } return &fullStretchBottleneck{ stretchBottleneck: bn, measurements: measurements, ldc: ldc, breaks: breaks, access: access, }, nil } func loadStretchBottlenecks( ctx context.Context, conn *sql.Conn, stretch bool, name string, ) (stretchBottlenecks, error) { var sql string if stretch { sql = selectStretchBottlenecks } else { sql = selectSectionBottlenecks } rows, err := conn.QueryContext(ctx, sql, name) if err != nil { return nil, err } defer rows.Close() var bns stretchBottlenecks for rows.Next() { var bn stretchBottleneck if err := rows.Scan( &bn.name, &bn.limiting, ); err != nil { return nil, err } bns = append(bns, bn) } if err := rows.Err(); err != nil { return nil, err } return bns, nil } func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) depthbreaks, widthbreaks := afdRefs, afdRefs from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []time.Duration breaks []time.Duration } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, bn.access, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } res.ldc = ldc res.breaks = breaks } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [d]" record[2] = "# >= LDC [d]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [d]" } record[i+4] = fmt.Sprintf("# >= break_%d", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [d]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [d]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } // Normalize to look like as we have only one bottleneck. scale := 1 / float64(len(loaded)) empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, r := range results { // Round to full days for i, v := range r.ldc { r.ldc[i] = time.Duration(float64(v) * scale) } for i, v := range r.breaks { r.breaks[i] = time.Duration(float64(v) * scale) } ldcRounded := common.RoundToFullDays(r.ldc) rangesRounded := common.RoundToFullDays(r.breaks) record[0] = r.label for i, v := range ldcRounded { record[1+i] = fmt.Sprintf("%d", v) } for i, d := range rangesRounded { record[3+i] = fmt.Sprintf("%d", d) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func joinErrors(errors []error) string { var b strings.Builder for _, err := range errors { if b.Len() > 0 { b.WriteString(", ") } b.WriteString(err.Error()) } return b.String() } func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) if name == "" { http.Error( rw, fmt.Sprintf("Missing %s name", vars["kind"]), http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } depthbreaks, widthbreaks := afdRefs, afdRefs if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error( rw, "No bottlenecks found.", http.StatusNotFound, ) return } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []float64 breaks []float64 } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, (*availMeasurement).getValue, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } duration := res.to.Sub(res.from) * time.Duration(len(loaded)) res.ldc = durationsToPercentage(duration, ldc) res.breaks = durationsToPercentage(duration, breaks) } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [%%]" record[2] = "# >= LDC [%%]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [%%]" } record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.3f [%%]", v) } record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, res := range results { record[0] = res.label for i, v := range res.ldc { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range res.breaks { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } }