view pkg/controllers/stretches.go @ 4611:b5aa1eb83bb0 geoserver_sql_views

Add possibility to configure SRS for GeoServer SQL view Automatic detection of spatial reference system for SQL views in GeoServer does not always find the correct SRS.
author Tom Gottfried <tom@intevation.de>
date Fri, 06 Sep 2019 11:58:03 +0200
parents 0f467a839fe2
children 63c25eb9c07c
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/middleware"
)

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE b.validity @> current_timestamp
  AND ST_Intersects(b.area, s.area)
  AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.stretches s, waterway.bottlenecks b
WHERE b.validity @> current_timestamp
  AND ST_Intersects(b.area, s.area)
  AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}
)

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	depthbreaks, widthbreaks := afdRefs, afdRefs

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []time.Duration
		breaks []time.Duration
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {

				var ldc, breaks []time.Duration

				for _, bn := range loaded {
					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						bn.access,
					)
					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				res.ldc = ldc
				res.breaks = breaks
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)
		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [h]"
	record[2] = "# >= LDC [h]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [h]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.1f [h]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.1f [h]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	// Normalize to look like as we have only one bottleneck.
	scale := 1 / float64(len(loaded))

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, r := range results {
		record[0] = r.label
		for i, v := range r.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale)
		}

		for i, d := range r.breaks {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	if name == "" {
		http.Error(
			rw,
			fmt.Sprintf("Missing %s name", vars["kind"]),
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(
			rw,
			"No bottlenecks found.",
			http.StatusNotFound,
		)
		return
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []float64
		breaks []float64
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {
				var ldc, breaks []time.Duration

				for _, bn := range loaded {
					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						(*availMeasurement).getValue,
					)

					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				duration := res.to.Sub(res.from) * time.Duration(len(loaded))

				res.ldc = durationsToPercentage(duration, ldc)
				res.breaks = durationsToPercentage(duration, breaks)
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)

		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [%%]"
	record[2] = "# >= LDC [%%]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [%%]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.3f [%%]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, res := range results {
		record[0] = res.label

		for i, v := range res.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}

		for i, v := range res.breaks {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}