Mercurial > gemma
view pkg/controllers/stretches.go @ 3430:6994602d2935
fairway availibilty: Implemented for streches and sections.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 23 May 2019 17:28:14 +0200 |
parents | 0a666ba899fa |
children | d7ddb21f7017 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2099 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "fmt" "log" "net/http" "runtime" "strconv" "strings" "sync" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/middleware" "github.com/gorilla/mux" ) const ( selectSectionBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b WHERE ST_Intersects(b.area, s.area) AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.stretches s, waterway.bottlenecks b WHERE ST_Intersects(b.area, s.area) AND s.name = $1` ) type ( stretchBottleneck struct { name string limiting string } stretchBottlenecks []stretchBottleneck fullStretchBottleneck struct { *stretchBottleneck measurements availMeasurements ldc []float64 breaks []float64 access func(*availMeasurement) float64 } ) func (bns stretchBottlenecks) contains(limiting string) bool { for i := range bns { if bns[i].limiting == limiting { return true } } return false } func loadFullStretchBottleneck( ctx context.Context, conn *sql.Conn, bn *stretchBottleneck, los int, from, to time.Time, depthbreaks, widthbreaks []float64, ) (*fullStretchBottleneck, error) { measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to) if err != nil { return nil, err } ldc, err := loadLDCReferenceValue(ctx, conn, bn.name) if err != nil { return nil, err } var access func(*availMeasurement) float64 var breaks []float64 switch bn.limiting { case "width": access = (*availMeasurement).getWidth breaks = widthbreaks case "depth": access = (*availMeasurement).getDepth breaks = depthbreaks default: log.Printf( "warn: unknown limitation '%s'. default to 'depth'.\n", bn.limiting) access = (*availMeasurement).getDepth breaks = depthbreaks } return &fullStretchBottleneck{ stretchBottleneck: bn, measurements: measurements, ldc: ldc, breaks: breaks, access: access, }, nil } func loadStretchBottlenecks( ctx context.Context, conn *sql.Conn, stretch bool, name string, ) (stretchBottlenecks, error) { var sql string if stretch { sql = selectStretchBottlenecks } else { sql = selectSectionBottlenecks } rows, err := conn.QueryContext(ctx, sql, name) if err != nil { return nil, err } defer rows.Close() var bns stretchBottlenecks for rows.Next() { var bn stretchBottleneck if err := rows.Scan( &bn.name, &bn.limiting, ); err != nil { return nil, err } bns = append(bns, bn) } if err := rows.Err(); err != nil { return nil, err } return bns, nil } func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) var from, to time.Time var los int depthbreaks, widthbreaks := afdRefs, afdRefs if f := req.FormValue("from"); f != "" { var err error if from, err = time.Parse(common.TimeFormat, f); err != nil { http.Error( rw, fmt.Sprintf("Invalid format for 'from': %v.", err), http.StatusBadRequest) return } } else { from = time.Now().AddDate(-1, 0, 0) } from = from.UTC() if t := req.FormValue("to"); t != "" { var err error if to, err = time.Parse(common.TimeFormat, t); err != nil { http.Error( rw, fmt.Sprintf("Invalid format for 'to': %v.", err), http.StatusBadRequest) return } } else { to = from.AddDate(1, 0, 0) } to = to.UTC() if to.Before(from) { to, from = from, to } if l := req.FormValue("los"); l != "" { var err error if los, err = strconv.Atoi(l); err != nil { http.Error( rw, fmt.Sprintf("Invalid format for 'los': %v.", err), http.StatusBadRequest) return } } else { los = 1 } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } type ( result struct { label string from, to time.Time ldc, breaks []time.Duration } calculation struct { result *result ldc, breaks []time.Duration } job struct { result *result bn *fullStretchBottleneck } ) n := runtime.NumCPU() / 2 if n == 0 { n = 1 } jobCh := make(chan job) calcCh := make(chan calculation, n) done := make(chan struct{}) var wg sync.WaitGroup go func() { defer close(done) for calc := range calcCh { ldc := calc.result.ldc for i, v := range calc.ldc { ldc[i] += v } breaks := calc.result.breaks for i, v := range calc.breaks { breaks[i] += v } } }() for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for job := range jobCh { bn := job.bn res := job.result ldc := bn.measurements.classify( res.from, res.to, bn.ldc, bn.access, ) breaks := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) calcCh <- calculation{ result: res, breaks: breaks, ldc: ldc, } } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, ldc: make([]time.Duration, 2), breaks: make([]time.Duration, len(breaks)+1), } results = append(results, res) for _, bn := range loaded { jobCh <- job{bn: bn, result: res} } } close(jobCh) wg.Wait() close(calcCh) <-done rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, classes, lnwl record := make([]string, 1+1+len(breaks)+1) record[0] = "#label" record[1] = "# >= LDC [h]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[2] = "# < break_1 [h]" } record[i+3] = fmt.Sprintf("# >= break_%d", i+1) } else { if i == 0 { record[2] = fmt.Sprintf("# < %.2f [h]", v) } record[i+3] = fmt.Sprintf("# >= %.2f [h]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } // Normalize to look like as we have only one bottleneck. scale := 1 / float64(len(loaded)) for _, r := range results { record[0] = r.label record[1] = fmt.Sprintf("%.3f", r.ldc[1].Hours()*scale) for i, d := range r.breaks { record[2+i] = fmt.Sprintf("%.3f", d.Hours()*scale) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func joinErrors(errors []error) string { var b strings.Builder for _, err := range errors { if b.Len() > 0 { b.WriteString(", ") } b.WriteString(err.Error()) } return b.String() } func stretchAvailabilty( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] var from, to time.Time var los int depthbreaks, widthbreaks := afdRefs, afdRefs if f := req.FormValue("from"); f != "" { if from, err = parseTime(f, "from"); err != nil { return } } else { from = time.Now().AddDate(-1, 0, 0).UTC() } if t := req.FormValue("to"); t != "" { if to, err = parseTime(t, "to"); err != nil { return } } else { to = from.AddDate(1, 0, 0).UTC() } if to.Before(from) { to, from = from, to } if l := req.FormValue("los"); l != "" { if los, err = parseInt(l, "los"); err != nil { return } } else { los = 1 } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } ctx := req.Context() var bns stretchBottlenecks if bns, err = loadStretchBottlenecks(ctx, conn, stretch, name); err != nil { return } if len(bns) == 0 { err = JSONError{ Code: http.StatusNotFound, Message: "No bottlenecks found.", } return } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), } return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { err = JSONError{ Code: http.StatusInternalServerError, Message: fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), } return } type calculation struct { ldc, afd []time.Duration } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } jobCh := make(chan *fullStretchBottleneck) calcCh := make(chan calculation, n) done := make(chan struct{}) ldc := make([]time.Duration, 2) afd := make([]time.Duration, len(breaks)+1) var wg sync.WaitGroup go func() { defer close(done) for calc := range calcCh { for i, v := range calc.ldc { ldc[i] += v } for i, v := range calc.afd { afd[i] += v } } }() for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for bn := range jobCh { ldc := bn.measurements.classify( from, to, bn.ldc, (*availMeasurement).getValue, ) afd := bn.measurements.classify( from, to, bn.breaks, bn.access, ) calcCh <- calculation{ldc: ldc, afd: afd} } }() } for _, bn := range loaded { jobCh <- bn } close(jobCh) wg.Wait() close(calcCh) <-done duration := to.Sub(from) * time.Duration(len(loaded)) lnwlPercents := durationsToPercentage(duration, ldc) afdPercents := durationsToPercentage(duration, afd) type ldcOutput struct { Value float64 `json:"value"` Below float64 `json:"below"` Above float64 `json:"above"` } type intervalOutput struct { From *float64 `json:"from,omitempty"` To *float64 `json:"to,omitempty"` Percent float64 `json:"percent"` } type output struct { LDC []intervalOutput `json:"ldc"` AFD []intervalOutput `json:"afd"` } out := output{ LDC: []intervalOutput{ {To: new(float64), Percent: lnwlPercents[0]}, {From: new(float64), Percent: lnwlPercents[1]}, }, } // XXX: In mixed mode the breaks are not correct! for i, percent := range afdPercents { var from, to *float64 switch { case i == 0: to = &breaks[i] case i == len(afdPercents)-1: from = &breaks[len(breaks)-1] default: from = &breaks[i-1] to = &breaks[i] } out.AFD = append(out.AFD, intervalOutput{ From: from, To: to, Percent: percent, }) } jr = JSONResult{Result: &out} return }