Mercurial > gemma
view pkg/controllers/stretches.go @ 4304:fee5794f3b7a
Return stretch as a ZIPed ESRI shape file from end point. TODO: Write the actual geometries to the shape file.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 02 Sep 2019 17:57:44 +0200 |
parents | 95786a675d70 |
children | 0f467a839fe2 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2099 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "archive/zip" "context" "database/sql" "encoding/csv" "fmt" "io" "io/ioutil" "log" "net/http" "os" "path/filepath" "runtime" "strings" "sync" "time" "github.com/gorilla/mux" "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/config" "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/wkb" ) const ( selectSectionBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.sections s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` selectStretchBottlenecks = ` SELECT distinct(b.objnam), b.limiting FROM waterway.stretches s, waterway.bottlenecks b WHERE b.validity @> current_timestamp AND ST_Intersects(b.area, s.area) AND s.name = $1` ) type ( stretchBottleneck struct { name string limiting string } stretchBottlenecks []stretchBottleneck fullStretchBottleneck struct { *stretchBottleneck measurements availMeasurements ldc []float64 breaks []float64 access func(*availMeasurement) float64 } ) func (bns stretchBottlenecks) contains(limiting string) bool { for i := range bns { if bns[i].limiting == limiting { return true } } return false } func loadFullStretchBottleneck( ctx context.Context, conn *sql.Conn, bn *stretchBottleneck, los int, from, to time.Time, depthbreaks, widthbreaks []float64, ) (*fullStretchBottleneck, error) { measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to) if err != nil { return nil, err } ldc, err := loadLDCReferenceValue(ctx, conn, bn.name) if err != nil { return nil, err } var access func(*availMeasurement) float64 var breaks []float64 switch bn.limiting { case "width": access = (*availMeasurement).getWidth breaks = widthbreaks case "depth": access = (*availMeasurement).getDepth breaks = depthbreaks default: log.Printf( "warn: unknown limitation '%s'. default to 'depth'.\n", bn.limiting) access = (*availMeasurement).getDepth breaks = depthbreaks } return &fullStretchBottleneck{ stretchBottleneck: bn, measurements: measurements, ldc: ldc, breaks: breaks, access: access, }, nil } func loadStretchBottlenecks( ctx context.Context, conn *sql.Conn, stretch bool, name string, ) (stretchBottlenecks, error) { var sql string if stretch { sql = selectStretchBottlenecks } else { sql = selectSectionBottlenecks } rows, err := conn.QueryContext(ctx, sql, name) if err != nil { return nil, err } defer rows.Close() var bns stretchBottlenecks for rows.Next() { var bn stretchBottleneck if err := rows.Scan( &bn.name, &bn.limiting, ); err != nil { return nil, err } bns = append(bns, bn) } if err := rows.Err(); err != nil { return nil, err } return bns, nil } func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) depthbreaks, widthbreaks := afdRefs, afdRefs from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error(rw, "No bottlenecks found.", http.StatusNotFound) return } if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []time.Duration breaks []time.Duration } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration for _, bn := range loaded { l := bn.measurements.classify( res.from, res.to, bn.ldc, bn.access, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } res.ldc = ldc res.breaks = breaks } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [h]" record[2] = "# >= LDC [h]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [h]" } record[i+4] = fmt.Sprintf("# >= break_%d", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.1f [h]", v) } record[i+4] = fmt.Sprintf("# >= %.1f [h]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } // Normalize to look like as we have only one bottleneck. scale := 1 / float64(len(loaded)) empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, r := range results { record[0] = r.label for i, v := range r.ldc { record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale) } for i, d := range r.breaks { record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } func joinErrors(errors []error) string { var b strings.Builder for _, err := range errors { if b.Len() > 0 { b.WriteString(", ") } b.WriteString(err.Error()) } return b.String() } func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) stretch := vars["kind"] == "stretch" name := vars["name"] mode := intervalMode(req.FormValue("mode")) if name == "" { http.Error( rw, fmt.Sprintf("Missing %s name", vars["kind"]), http.StatusBadRequest, ) return } from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0)) if !ok { return } to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0)) if !ok { return } if to.Before(from) { to, from = from, to } los, ok := parseFormInt(rw, req, "los", 1) if !ok { return } depthbreaks, widthbreaks := afdRefs, afdRefs if b := req.FormValue("depthbreaks"); b != "" { depthbreaks = breaksToReferenceValue(b) } if b := req.FormValue("widthbreaks"); b != "" { widthbreaks = breaksToReferenceValue(b) } conn := middleware.GetDBConn(req) ctx := req.Context() bns, err := loadStretchBottlenecks(ctx, conn, stretch, name) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } if len(bns) == 0 { http.Error( rw, "No bottlenecks found.", http.StatusNotFound, ) return } useDepth, useWidth := bns.contains("depth"), bns.contains("width") if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) { http.Error( rw, fmt.Sprintf("class breaks lengths differ: %d != %d", len(widthbreaks), len(depthbreaks)), http.StatusBadRequest, ) return } log.Printf("info: time interval: (%v - %v)\n", from, to) var loaded []*fullStretchBottleneck var errors []error for i := range bns { l, err := loadFullStretchBottleneck( ctx, conn, &bns[i], los, from, to, depthbreaks, widthbreaks, ) if err != nil { log.Printf("error: %v\n", err) errors = append(errors, err) continue } loaded = append(loaded, l) } if len(loaded) == 0 { http.Error( rw, fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)), http.StatusInternalServerError, ) return } n := runtime.NumCPU() / 2 if n == 0 { n = 1 } type result struct { label string from time.Time to time.Time ldc []float64 breaks []float64 } jobCh := make(chan *result) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for res := range jobCh { var ldc, breaks []time.Duration for _, bn := range loaded { l := bn.measurements.classify( res.from, res.to, bn.ldc, (*availMeasurement).getValue, ) b := bn.measurements.classify( res.from, res.to, bn.breaks, bn.access, ) if ldc == nil { ldc, breaks = l, b } else { for i, v := range l { ldc[i] += v } for i, v := range b { breaks[i] += v } } } duration := res.to.Sub(res.from) * time.Duration(len(loaded)) res.ldc = durationsToPercentage(duration, ldc) res.breaks = durationsToPercentage(duration, breaks) } }() } var results []*result interval := intervals[mode](from, to) var breaks []float64 if useDepth { breaks = depthbreaks } else { breaks = widthbreaks } for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() { res := &result{ label: label, from: pfrom, to: pto, } results = append(results, res) jobCh <- res } close(jobCh) wg.Wait() rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) // label, lnwl, classes record := make([]string, 1+2+len(breaks)+1) record[0] = "# time" record[1] = "# < LDC [%%]" record[2] = "# >= LDC [%%]" for i, v := range breaks { if useDepth && useWidth { if i == 0 { record[3] = "# < break_1 [%%]" } record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1) } else { if i == 0 { record[3] = fmt.Sprintf("# < %.3f [%%]", v) } record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v) } } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } empty := fmt.Sprintf("%.3f", 0.0) for i := range record[1:] { record[i+1] = empty } for _, res := range results { record[0] = res.label for i, v := range res.ldc { record[1+i] = fmt.Sprintf("%.3f", v) } for i, v := range res.breaks { record[3+i] = fmt.Sprintf("%.3f", v) } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) } } const ( selectStretchSQL = ` SELECT id, isrs_asText(lower(stretch)), isrs_asText(upper(stretch)), ST_AsBinary(area::geometry), objnam, nobjnam, date_info, source_organization FROM waterway.stretches WHERE staging_done AND name = $1` selectStretchCountriesSQL = ` SELECT country_code FROM waterway.stretch_countries WHERE stretches_id = $1 ORDER BY country_code` ) func stretchShapeDownload(rw http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) name := vars["name"] conn := middleware.GetDBConn(req) ctx := req.Context() tx, err := conn.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } defer tx.Rollback() var ( id int64 lower, upper string data []byte objnam string nobjnam sql.NullString dateInfo time.Time source string ) switch err := tx.QueryRowContext(ctx, selectStretchSQL, name).Scan( &id, &lower, &upper, &data, &objnam, &nobjnam, &dateInfo, &source, ); { case err == sql.ErrNoRows: http.NotFound(rw, req) return case err != nil: http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } var countries []string if err := func() error { rows, err := tx.Query(selectStretchCountriesSQL, id) if err != nil { return err } defer rows.Close() for rows.Next() { var country string if err := rows.Scan(&country); err != nil { return err } countries = append(countries, country) } return rows.Err() }(); err != nil { http.Error( rw, fmt.Sprintf("DB error: %v.", err), http.StatusInternalServerError) return } var geom wkb.MultiPolygonGeom if err := geom.FromWKB(data); err != nil { http.Error( rw, fmt.Sprintf("Decoding WKB error: %v.", err), http.StatusInternalServerError) return } tmp := config.TmpDir() dir, err := ioutil.TempDir(tmp, "stretch-download") if err != nil { http.Error( rw, fmt.Sprintf("Cannot create temp dir: %v.", err), http.StatusInternalServerError) return } defer os.RemoveAll(dir) shpDir := filepath.Join(dir, "stretch") if err := os.Mkdir(shpDir, os.ModePerm); err != nil { http.Error( rw, fmt.Sprintf("Cannot create temp dir: %v.", err), http.StatusInternalServerError) return } filename := filepath.Join(shpDir, "stretch") if err := func() error { writer, err := shp.Create(filename, shp.POLYGON) if err != nil { return err } defer writer.Close() // TODO: Write geometry return nil }(); err != nil { http.Error( rw, fmt.Sprintf("creating shapefile failed: %v.", err), http.StatusInternalServerError) return } entries, err := func() ([]os.FileInfo, error) { f, err := os.Open(shpDir) if err != nil { return nil, err } defer f.Close() return f.Readdir(-1) }() if err != nil { http.Error( rw, fmt.Sprintf("cannot read directory: %v.", err), http.StatusInternalServerError) return } rw.Header().Set("Content-Type", "application/zip") rw.Header().Set("Content-Disposition", `attachment; filename="stretch.zip"`) zipfile := zip.NewWriter(rw) defer zipfile.Close() now := time.Now() base := filepath.Base(shpDir) for _, info := range entries { if !info.Mode().IsRegular() { continue } if err := func() error { srcFile, err := os.Open(filepath.Join(shpDir, info.Name())) if err != nil { return err } defer srcFile.Close() header := &zip.FileHeader{ Name: base + "/" + info.Name(), Method: zip.Deflate, Modified: now, } out, err := zipfile.CreateHeader(header) if err != nil { return err } _, err = io.Copy(out, srcFile) return err }(); err != nil { log.Printf("error: cannot write file: %v\n", err) return } } }