view pkg/controllers/diff.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"net/http"
	"runtime"
	"time"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/mesh"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	// maxNumberWorkers are processing diffs at a time.
	maxNumberWorkers = 2
	// maxQueueLength are the number of diffs to
	// be wait for when idle.
	maxQueueLength = 5
)

const (
	contourTolerance = 0.1
	contourStep      = 0.1
)

const (
	// isoCellSize is the side length of a raster cell when tracing
	// iso areas.
	isoCellSize = 0.5
)

const (
	diffIDSQL = `
SELECT sd.id FROM
  caching.sounding_differences sd JOIN
  waterway.sounding_results srm ON sd.minuend = srm.id JOIN
  waterway.sounding_results srs ON sd.subtrahend = srs.id
WHERE srm.bottleneck_id = srs.bottleneck_id AND
  srm.bottleneck_id = $1 AND
  srm.date_info = $2::date AND
  srs.date_info = $3::date
`
	insertDiffSQL = `
WITH soundings AS (
  SELECT sr.id AS id, sr.date_info AS date_info FROM
  waterway.sounding_results sr
  WHERE sr.bottleneck_id = $1
)
INSERT INTO caching.sounding_differences (minuend, subtrahend)
SELECT m.id, s.id FROM soundings m, soundings s
WHERE m.date_info = $2::date AND s.date_info = $3::date
RETURNING id
`
	insertDiffIsoAreasQL = `
INSERT INTO caching.sounding_differences_iso_areas (
  sounding_differences_id,
  height,
  areas
)
SELECT
  $1,
  $2,
  ST_Transform(
    ST_Multi(
      ST_Collectionextract(
        ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5),
        3
      )
    ),
    4326
  )
`
	commonDiffBBoxSQL = `
WITH joined AS (
  SELECT
    sr.area      AS area,
    sr.date_info AS date_info
  FROM waterway.sounding_results sr
  WHERE sr.bottleneck_id = $1
),
bbox AS (
  SELECT ST_Extent(ST_intersection(
    (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $3::date),
    (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $4::date)
  )) AS area
)
SELECT ST_XMin(area), ST_YMin(area), ST_XMax(area), ST_YMax(area) FROM bbox
`
)

type (
	diffResult struct {
		id  int64
		err error
	}

	diffCalculationManager struct {
		waiting map[string][]chan diffResult
		cmds    chan func(*diffCalculationManager)
		jobs    chan *models.DiffCalculationInput
	}
)

var errBufferFull = errors.New("buffer full")

var diffCalcMng = startDiffCalculationManager()

func diffCalcHash(dci *models.DiffCalculationInput) string {
	return dci.Bottleneck + "/" +
		dci.Minuend.Format(common.DateFormat) + "/" +
		dci.Subtrahend.Format(common.DateFormat)
}

func startDiffCalculationManager() *diffCalculationManager {
	dcm := diffCalculationManager{
		waiting: make(map[string][]chan diffResult),
		cmds:    make(chan func(*diffCalculationManager)),
		jobs:    make(chan *models.DiffCalculationInput, maxQueueLength),
	}

	go dcm.run()
	return &dcm
}

func (dcm *diffCalculationManager) run() {
	n := runtime.NumCPU()
	if n > maxNumberWorkers {
		n = maxNumberWorkers
	}
	for i := 0; i < n; i++ {
		go dcm.calculate()
	}

	for cmd := range dcm.cmds {
		cmd(dcm)
	}
}

func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) {

	start := time.Now()
	begin := start

	minuendTree, err := mesh.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Minuend.Time)

	log.Infof("loading minuend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if minuendTree == nil {
		return 0, fmt.Errorf("cannot find survey for %s/%s",
			dci.Bottleneck,
			dci.Minuend.Format(common.DateFormat))
	}

	epsg := minuendTree.EPSG()

	var box mesh.Box2D

	switch err := conn.QueryRowContext(
		ctx,
		commonDiffBBoxSQL,
		dci.Bottleneck,
		epsg,
		dci.Minuend.Time,
		dci.Subtrahend.Time,
	).Scan(&box.X1, &box.Y1, &box.X2, &box.Y2); {
	case err == sql.ErrNoRows:
		return 0, errors.New("no such intersection")
	case err != nil:
		return 0, err
	}

	if box.Empty() {
		return 0, errors.New("intersection is empty")
	}

	log.Infof("bbox of intersection: (%.2f, %.2f) - (%.2f, %.2f)\n",
		box.X1, box.Y1, box.X2, box.Y2)

	start = time.Now()
	raster := mesh.NewRaster(box, isoCellSize)
	raster.Rasterize(minuendTree.Value)
	log.Infof("rasterizing minuend took %v\n", time.Since(start))

	minuendTree = nil

	start = time.Now()

	subtrahendTree, err := mesh.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Subtrahend.Time)

	log.Infof("loading subtrahend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if subtrahendTree == nil {
		return 0, fmt.Errorf("cannot find survey for %s/%s",
			dci.Bottleneck,
			dci.Subtrahend.Format(common.DateFormat))
	}

	// We need a slow path implementation for this.
	if epsg != subtrahendTree.EPSG() {
		return 0, errors.New("calculating differences between two different " +
			"EPSG code meshes are not supported, yet")
	}

	start = time.Now()
	raster.Diff(subtrahendTree.Value)
	log.Infof("A - B took %v\n", time.Since(start))
	subtrahendTree = nil

	// XXX: Maybe we should start this transaction earlier!?
	var tx *sql.Tx
	if tx, err = conn.BeginTx(ctx, nil); err != nil {
		return 0, err
	}
	defer tx.Rollback()

	zMin, zMax, ok := raster.ZExtent()
	if !ok {
		return 0, errors.New("scans do not have common points")
	}

	log.Infof("z range: %.3f - %.3f\n", zMin, zMax)

	var heights mesh.ClassBreaks

	heights, err = mesh.LoadClassBreaks(
		ctx, tx,
		"morphology_classbreaks_compare")
	if err != nil {
		log.Warnf("Loading class breaks failed: %v\n", err)
		err = nil
		heights = mesh.SampleDiffHeights(zMin, zMax, contourStep)
	} else {
		heights = heights.ExtrapolateClassBreaks(zMin, zMax)
	}

	heights = common.DedupFloat64s(heights)

	log.Infof("num heights: %d\n", len(heights))

	var isoStmt *sql.Stmt
	if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil {
		return 0, err
	}
	defer isoStmt.Close()

	var id int64

	if err = tx.QueryRowContext(
		ctx,
		insertDiffSQL,
		dci.Bottleneck,
		dci.Minuend.Time,
		dci.Subtrahend.Time,
	).Scan(&id); err != nil {
		return 0, err
	}

	areas := raster.Trace(heights)

	raster = nil

	var size int

	for i, a := range areas {
		if len(a) == 0 {
			continue
		}
		wkb := a.AsWKB()
		size += len(wkb)
		if _, err = isoStmt.ExecContext(
			ctx,
			id, heights[i], epsg,
			wkb,
			contourTolerance,
		); err != nil {
			return 0, err
		}
	}

	log.Infof("Transferred WKB size: %.2fMB.\n",
		float64(size)/(1024*1024))

	log.Infof("calculating and storing iso areas took %v\n",
		time.Since(start))

	if err = tx.Commit(); err != nil {
		log.Infof("difference calculation failed after %v\n",
			time.Since(begin))
		return 0, err
	}
	log.Infof("difference calculation succeed after %v\n",
		time.Since(begin))

	return id, nil
}

func (dcm *diffCalculationManager) calculate() {
	for dci := range dcm.jobs {
		ctx := context.Background()
		var id int64
		var err error
		err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			id, err = doDiff(ctx, conn, dci)
			return err
		})
		dcm.cmds <- func(dcm *diffCalculationManager) {
			msg := diffResult{id: id, err: err}
			key := diffCalcHash(dci)
			list := dcm.waiting[key]
			delete(dcm.waiting, key)
			for _, ch := range list {
				ch <- msg
			}
		}
	}
}

func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult {
	// buffer to prevent blocking even if receiver is dead.
	ch := make(chan diffResult, 1)

	dcm.cmds <- func(dcm *diffCalculationManager) {
		// When already running add to list of waitings
		key := diffCalcHash(dci)
		if list := dcm.waiting[key]; list != nil {
			dcm.waiting[key] = append(list, ch)
			return
		}
		// look if it is in database
		var id int64
		ctx := context.Background()

		err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			return conn.QueryRowContext(
				ctx,
				diffIDSQL,
				dci.Bottleneck,
				dci.Minuend.Time,
				dci.Subtrahend.Time,
			).Scan(&id)
		})

		switch {
		case err == sql.ErrNoRows:
			// We need to calculate it.
		case err != nil: // error
			ch <- diffResult{err: err}
			return
		default: // we have it
			ch <- diffResult{id: id}
			return
		}

		// add to waiting
		dcm.waiting[key] = []chan diffResult{ch}
		// feed to workers
		select {
		case dcm.jobs <- dci:
		default: // buffer is full
			delete(dcm.waiting, key)
			ch <- diffResult{err: errBufferFull}
		}
	}

	return ch
}

func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) {
	dcm.cmds <- func(dcm *diffCalculationManager) {
		key := diffCalcHash(dci)
		list := dcm.waiting[key]
		for i, x := range list {
			if x == ch {
				if i != len(list)-1 {
					copy(list[i:], list[i+1:])
				}
				list[len(list)-1] = nil
				list = list[:len(list)-1]
				if len(list) == 0 {
					delete(dcm.waiting, key)
				} else {
					dcm.waiting[key] = list
				}
				return
			}
		}
	}
}

func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) {

	dci := mw.JSONInput(req).(*models.DiffCalculationInput)

	ctx := req.Context()

	resultCh := diffCalcMng.enque(dci)
	defer diffCalcMng.unregister(dci, resultCh)

	select {
	case res := <-resultCh:
		switch {
		case res.err == errBufferFull:
			err = mw.JSONError{
				Code:    http.StatusTooManyRequests,
				Message: "Too many differences are waiting to be processed",
			}
		case res.err != nil:
			err = res.err
		default:
			jr = mw.JSONResult{
				Result: map[string]int64{"id": res.id},
			}
		}
	case <-ctx.Done():
		log.Infoln("request canceled")
	}

	return
}