view pkg/controllers/stretches.go @ 3535:337e9f85f84c

Prevent non-erased gauge version to have empty validity range This is a follow-up to revision ba0339118d9c, that did not introduce such constraint by virtue of missing that we have the information which gauge is 'current' readily at hand in the erased flag.
author Tom Gottfried <tom@intevation.de>
date Wed, 29 May 2019 18:41:35 +0200
parents a6128caca3c1
children 02951a62e8c6
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2099 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"gemma.intevation.de/gemma/pkg/middleware"
	"github.com/gorilla/mux"
)

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.stretches s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}
)

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	depthbreaks, widthbreaks := afdRefs, afdRefs

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []time.Duration
		breaks []time.Duration
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {

				var ldc, breaks []time.Duration

				for _, bn := range loaded {
					l := bn.measurements.classify(
						res.from, res.to,
						bn.ldc,
						bn.access,
					)
					b := bn.measurements.classify(
						res.from, res.to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				res.ldc = ldc
				res.breaks = breaks
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)
		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [h]"
	record[2] = "# >= LDC [h]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [h]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.1f [h]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.1f [h]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	// Normalize to look like as we have only one bottleneck.
	scale := 1 / float64(len(loaded))

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, r := range results {
		record[0] = r.label
		for i, v := range r.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v.Hours()*scale)
		}

		for i, d := range r.breaks {
			record[3+i] = fmt.Sprintf("%.3f", d.Hours()*scale)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]
	mode := intervalMode(req.FormValue("mode"))

	if name == "" {
		http.Error(
			rw,
			fmt.Sprintf("Missing %s name", vars["kind"]),
			http.StatusBadRequest,
		)
		return
	}

	from, ok := parseFormTime(rw, req, "from", time.Now().AddDate(-1, 0, 0))
	if !ok {
		return
	}

	to, ok := parseFormTime(rw, req, "to", from.AddDate(1, 0, 0))
	if !ok {
		return
	}

	if to.Before(from) {
		to, from = from, to
	}

	los, ok := parseFormInt(rw, req, "los", 1)
	if !ok {
		return
	}

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}

	if len(bns) == 0 {
		http.Error(
			rw,
			"No bottlenecks found.",
			http.StatusNotFound,
		)
		return
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			log.Printf("error: %v\n", err)
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	type result struct {
		label  string
		from   time.Time
		to     time.Time
		ldc    []float64
		breaks []float64
	}

	jobCh := make(chan *result)

	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for res := range jobCh {
				var ldc, breaks []time.Duration

				for _, bn := range loaded {
					l := bn.measurements.classify(
						from, to,
						bn.ldc,
						(*availMeasurement).getValue,
					)

					b := bn.measurements.classify(
						from, to,
						bn.breaks,
						bn.access,
					)

					if ldc == nil {
						ldc, breaks = l, b
					} else {
						for i, v := range l {
							ldc[i] += v
						}
						for i, v := range b {
							breaks[i] += v
						}
					}
				}

				duration := res.to.Sub(res.from) * time.Duration(len(loaded))

				res.ldc = durationsToPercentage(duration, ldc)
				res.breaks = durationsToPercentage(duration, breaks)
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label: label,
			from:  pfrom,
			to:    pto,
		}
		results = append(results, res)

		jobCh <- res
	}

	close(jobCh)
	wg.Wait()

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, lnwl, classes
	record := make([]string, 1+2+len(breaks)+1)
	record[0] = "# time"
	record[1] = "# < LDC [%%]"
	record[2] = "# >= LDC [%%]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[3] = "# < break_1 [%%]"
			}
			record[i+4] = fmt.Sprintf("# >= break_%d [%%]", i+1)
		} else {
			if i == 0 {
				record[3] = fmt.Sprintf("# < %.3f [%%]", v)
			}
			record[i+4] = fmt.Sprintf("# >= %.3f [%%]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	empty := fmt.Sprintf("%.3f", 0.0)
	for i := range record[1:] {
		record[i+1] = empty
	}

	for _, res := range results {
		record[0] = res.label

		for i, v := range res.ldc {
			record[1+i] = fmt.Sprintf("%.3f", v)
		}

		for i, v := range res.breaks {
			record[3+i] = fmt.Sprintf("%.3f", v)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}