Mercurial > gemma
view pkg/controllers/stretches.go @ 4795:fe838fc3ca69
FA: changed aggregation, so that worst classes are determining the result.
Previously aggregation was done by calculating the mean, but that was
not the expected behavior as the overall availability of the section
can't be better than the availability of the worst included
bottleneck.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Fri, 25 Oct 2019 17:11:37 +0200 |
parents | ef21c1464843 |
children | 199c49f967d2 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "runtime" "strings" "sync" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" ) // The following requests are taking _all_ bottlenecks into account, not only // the currently valid ones. This is neccessary, as we are doing reports on // arbitrary time ranges and bottlenecks currently active might have been in the // selected time range. // // FIXME: the better solution would be to limit the bottlenecks to those with: // b.validity && REQUESTED_TIME_RANGE const ( selectSectionBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b WHERE ST_Intersects(b.area, s.area) AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM users.stretches s, waterway.bottlenecks b WHERE ST_Intersects(b.area, s.area) AND s.name = $1` ) type ( stretchBottleneck struct { name string limiting string } stretchBottlenecks []stretchBottleneck fullStretchBottleneck struct { *stretchBottleneck measurements availMeasurements ldc []float64 breaks []float64 access func(*availMeasurement) float64 } ) func (bns stretchBottlenecks) contains(limiting string) bool { for i := range bns { if bns[i].limiting == limiting { return true } } return false } func maxDuration(a time.Duration, b time.Duration) time.Duration { if a > b { return a } return b } func sumClassesTo(breaks []time.Duration, to int) time.Duration { var result time.Duration if to < 0 { result = 0 } for i := 0; i <= to; i++ { result += breaks[i] } return result } func aggregateClasses( new []time.Duration, agg []time.Duration, ) []time.Duration { newAgg := make([]time.Duration, len(agg)) fmt.Printf("Old agg: %v\n", agg) fmt.Printf("compwit: %v\n", new) for i := 0; i < len(new)-1; i++ { old_sum := sumClassesTo(agg, i) new_sum := sumClassesTo(new, i) newAgg[i] = maxDuration(new_sum, old_sum) - sumClassesTo(newAgg, i-1) } // adjust highest class so the sum of all classes in agg // matches the original sum of all classes in new. newAgg[len(new)-1] = sumClassesTo(new, len(new)-1) - sumClassesTo(newAgg, len(new)-2) fmt.Printf("New agg: %v\n", newAgg) return newAgg } func loadFullStretchBottleneck( ctx context.Context, conn *sql.Conn, bn *stretchBottleneck, los int, from, to time.Time, depthbreaks, widthbreaks []float64, ) (*fullStretchBottleneck, error) { measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to) if err != nil { return nil, err } ldc, err := loadLDCReferenceValue(ctx, conn, bn.name) if err != nil { return nil, err } if len(ldc) == 0 { return nil, fmt.Errorf("No LDC found for bottleneck: %s", bn.name) } var access func(*availMeasurement) float64 var breaks []float64 switch bn.limiting { case "width": access = (*availMeasurement).getWidth breaks = widthbreaks case "depth": access = (*availMeasurement).getDepth breaks = depthbreaks default: log.Printf( "warn: unknown limitation '%s'. default to 'depth'.\n", bn.limiting) access = (*availMeasurement).getDepth breaks = depthbreaks } return &fullStretchBottleneck{ stretchBottleneck: bn, measurements: measurements, ldc: ldc, breaks: breaks, access: access, }, nil } func loadStretchBottlenecks( ctx context.Context, conn *sql.Conn, stretch bool, name string, ) (stretchBottlenecks, error) { var sql string if stretch { sql = selectStretchBottlenecks } else { sql = selectSectionBottlenecks } rows, err := conn.QueryContext(ctx, sql, name) if err != nil { return nil, err } defer rows.Close() var bns stretchBottlenecks for rows.Next() { var bn stretchBottleneck if err := rows.Scan( &bn.name, &bn.limiting, ); err != nil { return nil, err } bns = append(bns, bn) } if err := rows.Err(); err != nil { return nil, err } return bns, nil } func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) depthbreaks, widthbreaks := afdRefs, afdRefs from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []time.Duration breaks []time.Duration } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, (*availMeasurement).getValue, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { ldc = aggregateClasses(l, ldc) breaks = aggregateClasses(b, breaks) } } res.ldc = ldc res.breaks = breaks } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [d]" record[2] = "# >= LDC [d]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [d]" } record[i+4] = fmt.Sprintf("# >= break_%d", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [d]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [d]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, r := range results { // Round to full days ldcRounded := common.RoundToFullDays(r.ldc) rangesRounded := common.RoundToFullDays(r.breaks) record[0] = r.label for i, v := range ldcRounded { record[1+i] = fmt.Sprintf("%d", v) } for i, d := range rangesRounded { record[3+i] = fmt.Sprintf("%d", d) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func joinErrors(errors []error) string { var b strings.Builder for _, err := range errors { if b.Len() > 0 { b.WriteString(", ") } b.WriteString(err.Error()) } return b.String() } func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) if name == "" { http.Error( rw, fmt.Sprintf("Missing %s name", vars["kind"]), http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } depthbreaks, widthbreaks := afdRefs, afdRefs if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error( rw, "No bottlenecks found.", http.StatusNotFound, ) return } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []float64 breaks []float64 } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration now := time.Now() for _, bn := range loaded { // Don't interpolate for the future if now.Sub(res.to) < 0 { res.to = now } l := bn.measurements.classify( res.from, res.to, bn.ldc, (*availMeasurement).getValue, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { ldc = aggregateClasses(l, ldc) breaks = aggregateClasses(b, breaks) } } duration := res.to.Sub(res.from) res.ldc = durationsToPercentage(duration, ldc) res.breaks = durationsToPercentage(duration, breaks) } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [%%]" record[2] = "# >= LDC [%%]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [%%]" } record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.3f [%%]", v) } record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, res := range results { record[0] = res.label for i, v := range res.ldc { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range res.breaks { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } }