view pkg/controllers/stretches.go @ 3424:0a666ba899fa

fairway availabilty: removed level from class breaks.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 23 May 2019 15:49:59 +0200
parents b9fc6c546610
children 6994602d2935
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2099 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/middleware"
	"github.com/gorilla/mux"
)

const (
	selectSectionBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.sections s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`

	selectStretchBottlenecks = `
SELECT
  distinct(b.objnam),
  b.limiting
FROM waterway.stretches s, waterway.bottlenecks b
WHERE ST_Intersects(b.area, s.area) AND s.name = $1`
)

type (
	stretchBottleneck struct {
		name     string
		limiting string
	}

	stretchBottlenecks []stretchBottleneck

	fullStretchBottleneck struct {
		*stretchBottleneck
		measurements availMeasurements
		ldc          []float64
		breaks       []float64
		access       func(*availMeasurement) float64
	}
)

func (bns stretchBottlenecks) contains(limiting string) bool {
	for i := range bns {
		if bns[i].limiting == limiting {
			return true
		}
	}
	return false
}

func loadFullStretchBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	bn *stretchBottleneck,
	los int,
	from, to time.Time,
	depthbreaks, widthbreaks []float64,
) (*fullStretchBottleneck, error) {
	measurements, err := loadDepthValues(ctx, conn, bn.name, los, from, to)
	if err != nil {
		return nil, err
	}
	ldc, err := loadLDCReferenceValue(ctx, conn, bn.name)
	if err != nil {
		return nil, err
	}

	var access func(*availMeasurement) float64
	var breaks []float64

	switch bn.limiting {
	case "width":
		access = (*availMeasurement).getWidth
		breaks = widthbreaks
	case "depth":
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	default:
		log.Printf(
			"warn: unknown limitation '%s'. default to 'depth'.\n",
			bn.limiting)
		access = (*availMeasurement).getDepth
		breaks = depthbreaks
	}

	return &fullStretchBottleneck{
		stretchBottleneck: bn,
		measurements:      measurements,
		ldc:               ldc,
		breaks:            breaks,
		access:            access,
	}, nil
}

func loadStretchBottlenecks(
	ctx context.Context,
	conn *sql.Conn,
	stretch bool,
	name string,
) (stretchBottlenecks, error) {
	var sql string
	if stretch {
		sql = selectStretchBottlenecks
	} else {
		sql = selectSectionBottlenecks
	}

	rows, err := conn.QueryContext(ctx, sql, name)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns stretchBottlenecks

	for rows.Next() {
		var bn stretchBottleneck
		if err := rows.Scan(
			&bn.name,
			&bn.limiting,
		); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}

	if err := rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func stretchAvailableFairwayDepth(rw http.ResponseWriter, req *http.Request) {

	vars := mux.Vars(req)
	stretch := vars["kind"] == "stretch"
	name := vars["name"]

	mode := intervalMode(req.FormValue("mode"))
	var from, to time.Time
	var los int

	depthbreaks, widthbreaks := afdRefs, afdRefs

	if f := req.FormValue("from"); f != "" {
		var err error
		if from, err = time.Parse(common.TimeFormat, f); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'from': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		from = time.Now().AddDate(-1, 0, 0)
	}
	from = from.UTC()

	if t := req.FormValue("to"); t != "" {
		var err error
		if to, err = time.Parse(common.TimeFormat, t); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'to': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		to = from.AddDate(1, 0, 0)
	}
	to = to.UTC()

	if to.Before(from) {
		to, from = from, to
	}

	if l := req.FormValue("los"); l != "" {
		var err error
		if los, err = strconv.Atoi(l); err != nil {
			http.Error(
				rw, fmt.Sprintf("Invalid format for 'los': %v.", err),
				http.StatusBadRequest)
			return
		}
	} else {
		los = 1
	}

	conn := middleware.GetDBConn(req)
	ctx := req.Context()

	bns, err := loadStretchBottlenecks(ctx, conn, stretch, name)
	if err != nil {
		http.Error(
			rw, fmt.Sprintf("DB error: %v.", err),
			http.StatusInternalServerError)
		return
	}
	if len(bns) == 0 {
		http.Error(rw, "No bottlenecks found.", http.StatusNotFound)
		return
	}

	if b := req.FormValue("depthbreaks"); b != "" {
		depthbreaks = breaksToReferenceValue(b)
	}

	if b := req.FormValue("widthbreaks"); b != "" {
		widthbreaks = breaksToReferenceValue(b)
	}

	useDepth, useWidth := bns.contains("depth"), bns.contains("width")

	if useDepth && useWidth && len(widthbreaks) != len(depthbreaks) {
		http.Error(
			rw,
			fmt.Sprintf("class breaks lengths differ: %d != %d",
				len(widthbreaks), len(depthbreaks)),
			http.StatusBadRequest,
		)
		return
	}

	log.Printf("info: time interval: (%v - %v)\n", from, to)

	var loaded []*fullStretchBottleneck
	var errors []error

	for i := range bns {
		l, err := loadFullStretchBottleneck(
			ctx,
			conn,
			&bns[i],
			los,
			from, to,
			depthbreaks, widthbreaks,
		)
		if err != nil {
			errors = append(errors, err)
			continue
		}
		loaded = append(loaded, l)
	}

	if len(loaded) == 0 {
		http.Error(
			rw,
			fmt.Sprintf("No bottleneck loaded: %v", joinErrors(errors)),
			http.StatusInternalServerError,
		)
		return
	}

	type (
		result struct {
			label       string
			from, to    time.Time
			ldc, breaks []time.Duration
		}

		calculation struct {
			result      *result
			ldc, breaks []time.Duration
		}

		job struct {
			result *result
			bn     *fullStretchBottleneck
		}
	)

	n := runtime.NumCPU() / 2
	if n == 0 {
		n = 1
	}

	jobCh := make(chan job)
	calcCh := make(chan calculation, n)
	done := make(chan struct{})

	var wg sync.WaitGroup

	go func() {
		defer close(done)
		for calc := range calcCh {
			ldc := calc.result.ldc
			for i, v := range calc.ldc {
				ldc[i] += v
			}
			breaks := calc.result.breaks
			for i, v := range calc.breaks {
				breaks[i] += v
			}
		}
	}()

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobCh {
				bn := job.bn
				res := job.result
				ldc := bn.measurements.classify(
					res.from, res.to,
					bn.ldc,
					bn.access,
				)
				breaks := bn.measurements.classify(
					res.from, res.to,
					bn.breaks,
					bn.access,
				)
				calcCh <- calculation{
					result: res,
					breaks: breaks,
					ldc:    ldc,
				}
			}
		}()
	}

	var results []*result

	interval := intervals[mode](from, to)

	var breaks []float64

	if useDepth {
		breaks = depthbreaks
	} else {
		breaks = widthbreaks
	}

	for pfrom, pto, label := interval(); label != ""; pfrom, pto, label = interval() {

		res := &result{
			label:  label,
			from:   pfrom,
			to:     pto,
			ldc:    make([]time.Duration, 2),
			breaks: make([]time.Duration, len(breaks)+1),
		}
		results = append(results, res)

		for _, bn := range loaded {
			jobCh <- job{bn: bn, result: res}
		}
	}
	close(jobCh)
	wg.Wait()
	close(calcCh)
	<-done

	rw.Header().Add("Content-Type", "text/csv")

	out := csv.NewWriter(rw)

	// label, classes, lnwl
	record := make([]string, 1+1+len(breaks)+1)
	record[0] = "#label"
	record[1] = "# >= LDC [h]"
	for i, v := range breaks {
		if useDepth && useWidth {
			if i == 0 {
				record[2] = "# < break_1 [h]"
			}
			record[i+3] = fmt.Sprintf("# >= break_%d", i+1)
		} else {
			if i == 0 {
				record[2] = fmt.Sprintf("# < %.2f [h]", v)
			}
			record[i+3] = fmt.Sprintf("# >= %.2f [h]", v)
		}
	}

	if err := out.Write(record); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
		return
	}

	// Normalize to look like as we have only one bottleneck.
	scale := 1 / float64(len(loaded))

	for _, r := range results {
		record[0] = r.label
		record[1] = fmt.Sprintf("%.3f", r.ldc[1].Hours()*scale)

		for i, d := range r.breaks {
			record[2+i] = fmt.Sprintf("%.3f", d.Hours()*scale)
		}

		if err := out.Write(record); err != nil {
			// Too late for HTTP status message.
			log.Printf("error: %v\n", err)
			return
		}
	}

	out.Flush()
	if err := out.Error(); err != nil {
		// Too late for HTTP status message.
		log.Printf("error: %v\n", err)
	}
}

func joinErrors(errors []error) string {
	var b strings.Builder
	for _, err := range errors {
		if b.Len() > 0 {
			b.WriteString(", ")
		}
		b.WriteString(err.Error())
	}
	return b.String()
}

func stretchAvailabilty(
	_ interface{},
	req *http.Request,
	conn *sql.Conn,
) (jr JSONResult, err error) {
	// TODO: Implement me!
	return
}