view pkg/controllers/diff.go @ 5412:34bc6041e61e marking-single-beam

Added a type for class breaks.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 07 Jul 2021 10:58:14 +0200
parents 4847ac70103a
children 5f47eeea988d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"time"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/mesh"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	// maxNumberWorkers are processing diffs at a time.
	maxNumberWorkers = 2
	// maxQueueLength are the number of diffs to
	// be wait for when idle.
	maxQueueLength = 5
)

const (
	contourTolerance = 0.1
	contourStep      = 0.1
)

const (
	// isoCellSize is the side length of a raster cell when tracing
	// iso areas.
	isoCellSize = 0.5
)

const (
	diffIDSQL = `
SELECT sd.id FROM
  caching.sounding_differences sd JOIN
  waterway.sounding_results srm ON sd.minuend = srm.id JOIN
  waterway.sounding_results srs ON sd.subtrahend = srs.id
WHERE srm.bottleneck_id = srs.bottleneck_id AND
  srm.bottleneck_id = $1 AND
  srm.date_info = $2::date AND
  srs.date_info = $3::date
`
	insertDiffSQL = `
WITH soundings AS (
  SELECT sr.id AS id, sr.date_info AS date_info FROM
  waterway.sounding_results sr
  WHERE sr.bottleneck_id = $1
)
INSERT INTO caching.sounding_differences (minuend, subtrahend)
SELECT m.id, s.id FROM soundings m, soundings s
WHERE m.date_info = $2::date AND s.date_info = $3::date
RETURNING id
`
	insertDiffIsoAreasQL = `
INSERT INTO caching.sounding_differences_iso_areas (
  sounding_differences_id,
  height,
  areas
)
SELECT
  $1,
  $2,
  ST_Transform(
    ST_Multi(
      ST_Collectionextract(
        ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5),
        3
      )
    ),
    4326
  )
`
	commonDiffBBoxSQL = `
WITH joined AS (
  SELECT
    sr.area      AS area,
    sr.date_info AS date_info
  FROM waterway.sounding_results sr
  WHERE sr.bottleneck_id = $1
),
bbox AS (
  SELECT ST_Extent(ST_intersection(
    (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $3::date),
    (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $4::date)
  )) AS area
)
SELECT ST_XMin(area), ST_YMin(area), ST_XMax(area), ST_YMax(area) FROM bbox
`
)

type (
	diffResult struct {
		id  int64
		err error
	}

	diffCalculationManager struct {
		waiting map[string][]chan diffResult
		cmds    chan func(*diffCalculationManager)
		jobs    chan *models.DiffCalculationInput
	}
)

var errBufferFull = errors.New("buffer full")

var diffCalcMng = startDiffCalculationManager()

func diffCalcHash(dci *models.DiffCalculationInput) string {
	return dci.Bottleneck + "/" +
		dci.Minuend.Format(common.DateFormat) + "/" +
		dci.Subtrahend.Format(common.DateFormat)
}

func startDiffCalculationManager() *diffCalculationManager {
	dcm := diffCalculationManager{
		waiting: make(map[string][]chan diffResult),
		cmds:    make(chan func(*diffCalculationManager)),
		jobs:    make(chan *models.DiffCalculationInput, maxQueueLength),
	}

	go dcm.run()
	return &dcm
}

func (dcm *diffCalculationManager) run() {
	n := runtime.NumCPU()
	if n > maxNumberWorkers {
		n = maxNumberWorkers
	}
	for i := 0; i < n; i++ {
		go dcm.calculate()
	}

	for cmd := range dcm.cmds {
		cmd(dcm)
	}
}

func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) {

	start := time.Now()
	begin := start

	minuendTree, err := mesh.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Minuend.Time)

	log.Printf("info: loading minuend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if minuendTree == nil {
		return 0, fmt.Errorf("cannot find survey for %s/%s",
			dci.Bottleneck,
			dci.Minuend.Format(common.DateFormat))
	}

	epsg := minuendTree.EPSG()

	var box mesh.Box2D

	switch err := conn.QueryRowContext(
		ctx,
		commonDiffBBoxSQL,
		dci.Bottleneck,
		epsg,
		dci.Minuend.Time,
		dci.Subtrahend.Time,
	).Scan(&box.X1, &box.Y1, &box.X2, &box.Y2); {
	case err == sql.ErrNoRows:
		return 0, errors.New("no such intersection")
	case err != nil:
		return 0, err
	}

	if box.Empty() {
		return 0, errors.New("intersection is empty")
	}

	log.Printf("info: bbox of intersection: (%.2f, %.2f) - (%.2f, %.2f)\n",
		box.X1, box.Y1, box.X2, box.Y2)

	start = time.Now()
	raster := mesh.NewRaster(box, isoCellSize)
	raster.Rasterize(minuendTree.Value)
	log.Printf("info: rasterizing minuend took %v\n", time.Since(start))

	minuendTree = nil

	start = time.Now()

	subtrahendTree, err := mesh.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Subtrahend.Time)

	log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if subtrahendTree == nil {
		return 0, fmt.Errorf("cannot find survey for %s/%s",
			dci.Bottleneck,
			dci.Subtrahend.Format(common.DateFormat))
	}

	// We need a slow path implementation for this.
	if epsg != subtrahendTree.EPSG() {
		return 0, errors.New("calculating differences between two different " +
			"EPSG code meshes are not supported, yet")
	}

	start = time.Now()
	raster.Diff(subtrahendTree.Value)
	log.Printf("info: A - B took %v\n", time.Since(start))
	subtrahendTree = nil

	// XXX: Maybe we should start this transaction earlier!?
	var tx *sql.Tx
	if tx, err = conn.BeginTx(ctx, nil); err != nil {
		return 0, err
	}
	defer tx.Rollback()

	zMin, zMax, ok := raster.ZExtent()
	if !ok {
		return 0, errors.New("scans do not have common points")
	}

	log.Printf("info: z range: %.3f - %.3f\n", zMin, zMax)

	var heights mesh.ClassBreaks

	heights, err = mesh.LoadClassBreaks(
		ctx, tx,
		"morphology_classbreaks_compare")
	if err != nil {
		log.Printf("warn: Loading class breaks failed: %v\n", err)
		err = nil
		heights = mesh.SampleDiffHeights(zMin, zMax, contourStep)
	} else {
		heights = heights.ExtrapolateClassBreaks(zMin, zMax)
	}

	heights = common.DedupFloat64s(heights)

	log.Printf("info: num heights: %d\n", len(heights))

	var isoStmt *sql.Stmt
	if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil {
		return 0, err
	}
	defer isoStmt.Close()

	var id int64

	if err = tx.QueryRowContext(
		ctx,
		insertDiffSQL,
		dci.Bottleneck,
		dci.Minuend.Time,
		dci.Subtrahend.Time,
	).Scan(&id); err != nil {
		return 0, err
	}

	areas := raster.Trace(heights)

	raster = nil

	var size int

	for i, a := range areas {
		if len(a) == 0 {
			continue
		}
		wkb := a.AsWKB()
		size += len(wkb)
		if _, err = isoStmt.ExecContext(
			ctx,
			id, heights[i], epsg,
			wkb,
			contourTolerance,
		); err != nil {
			return 0, err
		}
	}

	log.Printf("info: Transferred WKB size: %.2fMB.\n",
		float64(size)/(1024*1024))

	log.Printf("info: calculating and storing iso areas took %v\n",
		time.Since(start))

	if err = tx.Commit(); err != nil {
		log.Printf("info: difference calculation failed after %v\n",
			time.Since(begin))
		return 0, err
	}
	log.Printf("info: difference calculation succeed after %v\n",
		time.Since(begin))

	return id, nil
}

func (dcm *diffCalculationManager) calculate() {
	for dci := range dcm.jobs {
		ctx := context.Background()
		var id int64
		var err error
		err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			id, err = doDiff(ctx, conn, dci)
			return err
		})
		dcm.cmds <- func(dcm *diffCalculationManager) {
			msg := diffResult{id: id, err: err}
			key := diffCalcHash(dci)
			list := dcm.waiting[key]
			delete(dcm.waiting, key)
			for _, ch := range list {
				ch <- msg
			}
		}
	}
}

func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult {
	// buffer to prevent blocking even if receiver is dead.
	ch := make(chan diffResult, 1)

	dcm.cmds <- func(dcm *diffCalculationManager) {
		// When already running add to list of waitings
		key := diffCalcHash(dci)
		if list := dcm.waiting[key]; list != nil {
			dcm.waiting[key] = append(list, ch)
			return
		}
		// look if it is in database
		var id int64
		ctx := context.Background()

		err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			return conn.QueryRowContext(
				ctx,
				diffIDSQL,
				dci.Bottleneck,
				dci.Minuend.Time,
				dci.Subtrahend.Time,
			).Scan(&id)
		})

		switch {
		case err == sql.ErrNoRows:
			// We need to calculate it.
		case err != nil: // error
			ch <- diffResult{err: err}
			return
		default: // we have it
			ch <- diffResult{id: id}
			return
		}

		// add to waiting
		dcm.waiting[key] = []chan diffResult{ch}
		// feed to workers
		select {
		case dcm.jobs <- dci:
		default: // buffer is full
			delete(dcm.waiting, key)
			ch <- diffResult{err: errBufferFull}
		}
	}

	return ch
}

func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) {
	dcm.cmds <- func(dcm *diffCalculationManager) {
		key := diffCalcHash(dci)
		list := dcm.waiting[key]
		for i, x := range list {
			if x == ch {
				if i != len(list)-1 {
					copy(list[i:], list[i+1:])
				}
				list[len(list)-1] = nil
				list = list[:len(list)-1]
				if len(list) == 0 {
					delete(dcm.waiting, key)
				} else {
					dcm.waiting[key] = list
				}
				return
			}
		}
	}
}

func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) {

	dci := mw.JSONInput(req).(*models.DiffCalculationInput)

	ctx := req.Context()

	resultCh := diffCalcMng.enque(dci)
	defer diffCalcMng.unregister(dci, resultCh)

	select {
	case res := <-resultCh:
		switch {
		case res.err == errBufferFull:
			err = mw.JSONError{
				Code:    http.StatusTooManyRequests,
				Message: "Too many differences are waiting to be processed",
			}
		case res.err != nil:
			err = res.err
		default:
			jr = mw.JSONResult{
				Result: map[string]int64{"id": res.id},
			}
		}
	case <-ctx.Done():
		log.Println("request canceled")
	}

	return
}