view pkg/controllers/diff.go @ 4723:baabc2b2f094

Avoid creating user profiles without matching role The INSTEAD OF triggers on users.list_users did that already, but profile data coming e.g. via restoring a dump had been added also if there was no matching database role in the cluster. This also unifies the errors occuring on creation of users with existing role names that differed between roles with and without profile before. Note this is no referential integrity. A dropped role still leaves an orphaned profile behind.
author Tom Gottfried <tom@intevation.de>
date Thu, 17 Oct 2019 18:56:59 +0200
parents 1d6f28e45696
children c91e759007da
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package controllers

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"sync"
	"time"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/octree"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	// maxNumberWorkers are processing diffs at a time.
	maxNumberWorkers = 2
	// maxQueueLength are the number of diffs to
	// be wait for when idle.
	maxQueueLength = 5
)

const (
	contourTolerance = 0.1
	contourStep      = 0.1
)

const (
	// isoCellSize is the side length of a raster cell when tracing
	// iso areas.
	isoCellSize = 0.5
)

const (
	diffIDSQL = `
SELECT sd.id FROM
  caching.sounding_differences sd JOIN
  waterway.sounding_results srm ON sd.minuend = srm.id JOIN
  waterway.sounding_results srs ON sd.subtrahend = srs.id
WHERE srm.bottleneck_id = srs.bottleneck_id AND
  srm.bottleneck_id = $1 AND
  srm.date_info = $2::date AND
  srs.date_info = $3::date
`
	insertDiffSQL = `
WITH soundings AS (
  SELECT sr.id AS id, sr.date_info AS date_info FROM
  waterway.sounding_results sr
  WHERE sr.bottleneck_id = $1
)
INSERT INTO caching.sounding_differences (minuend, subtrahend)
SELECT m.id, s.id FROM soundings m, soundings s
WHERE m.date_info = $2::date AND s.date_info = $3::date
RETURNING id
`
	insertDiffIsoAreasQL = `
INSERT INTO caching.sounding_differences_iso_areas (
  sounding_differences_id,
  height,
  areas
)
SELECT
  $1,
  $2,
  ST_Transform(
    ST_Multi(
      ST_CollectionExtract(
        ST_MakeValid(
          ST_Multi(
             ST_Collectionextract(
                ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5),
                3
             )
            )
        ),
        3
      )
    ),
    4326
  )
`
)

type (
	diffResult struct {
		id  int64
		err error
	}

	diffCalculationManager struct {
		waiting map[string][]chan diffResult
		cmds    chan func(*diffCalculationManager)
		jobs    chan *models.DiffCalculationInput
	}
)

var errBufferFull = errors.New("buffer full")

var diffCalcMng = startDiffCalculationManager()

func diffCalcHash(dci *models.DiffCalculationInput) string {
	return dci.Bottleneck + "/" +
		dci.Minuend.Format(common.DateFormat) + "/" +
		dci.Subtrahend.Format(common.DateFormat)
}

func startDiffCalculationManager() *diffCalculationManager {
	dcm := diffCalculationManager{
		waiting: make(map[string][]chan diffResult),
		cmds:    make(chan func(*diffCalculationManager)),
		jobs:    make(chan *models.DiffCalculationInput, maxQueueLength),
	}

	go dcm.run()
	return &dcm
}

func (dcm *diffCalculationManager) run() {
	n := runtime.NumCPU()
	if n > maxNumberWorkers {
		n = maxNumberWorkers
	}
	for i := 0; i < n; i++ {
		go dcm.calculate()
	}

	for cmd := range dcm.cmds {
		cmd(dcm)
	}
}

func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) {

	start := time.Now()
	begin := start

	minuendTree, err := octree.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Minuend.Time)

	log.Printf("info: loading minuend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if minuendTree == nil {
		return 0, fmt.Errorf("Cannot find survey for %s/%s.",
			dci.Bottleneck,
			dci.Minuend.Format(common.DateFormat))
	}

	start = time.Now()

	subtrahendTree, err := octree.FromCache(
		ctx, conn,
		dci.Bottleneck, dci.Subtrahend.Time)

	log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start))
	if err != nil {
		return 0, err
	}

	if subtrahendTree == nil {
		return 0, fmt.Errorf("Cannot find survey for %s/%s.",
			dci.Bottleneck,
			dci.Subtrahend.Format(common.DateFormat))
	}

	// We need a slow path implementation for this.
	epsg := minuendTree.EPSG()
	if epsg != subtrahendTree.EPSG() {
		return 0, errors.New("Calculating differences between two different " +
			"EPSG code meshes are not supported, yet.")
	}

	start = time.Now()
	points := minuendTree.Diff(subtrahendTree)
	log.Printf("info: A - B took %v\n", time.Since(start))

	minuendTree, subtrahendTree = nil, nil

	// The Triangulation and the loading of the clipping
	// polygon can be done concurrently.

	jobs := make(chan func())

	wg := new(sync.WaitGroup)
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for job := range jobs {
				job()
			}
		}()
	}

	var (
		tri     *octree.Triangulation
		triErr  error
		clip    *octree.Polygon
		clipErr error
	)

	jobs <- func() {
		start := time.Now()
		tri, triErr = points.Triangulate()
		log.Printf("info: triangulation took %v\n", time.Since(start))
	}

	jobs <- func() {
		start := time.Now()
		clip, clipErr = octree.LoadClippingPolygon(
			ctx, conn,
			epsg,
			dci.Bottleneck,
			dci.Minuend.Time,
			dci.Subtrahend.Time)
		log.Printf("info: loading clipping polygon took %v\n", time.Since(start))
	}
	close(jobs)
	wg.Wait()

	switch {
	case triErr != nil && clipErr != nil:
		return 0, fmt.Errorf("%v %v", triErr, clipErr)
	case triErr != nil:
		return 0, triErr
	case clipErr != nil:
		return 0, clipErr
	}

	start = time.Now()
	tin := tri.Tin()
	removed := tin.Clip(clip)
	clip = nil
	log.Printf("info: clipping TIN took %v\n", time.Since(start))

	log.Printf("info: Number of triangles to clip: %d\n", len(removed))

	start = time.Now()
	var tree octree.STRTree

	tree.BuildWithout(tin, removed)

	log.Printf("info: Building final mesh took: %v\n", time.Since(start))

	start = time.Now()

	// XXX: Maybe we should start this transaction earlier!?
	var tx *sql.Tx
	if tx, err = conn.BeginTx(ctx, nil); err != nil {
		return 0, err
	}
	defer tx.Rollback()

	var heights []float64

	heights, err = octree.LoadClassBreaks(
		ctx, tx,
		"morphology_classbreaks_compare")
	if err != nil {
		log.Printf("warn: Loading class breaks failed: %v\n", err)
		err = nil
		heights = octree.SampleDiffHeights(tin.Min.Z, tin.Max.Z, contourStep)
	} else {
		heights = octree.ExtrapolateClassBreaks(heights, tin.Min.Z, tin.Max.Z)
		// heights = octree.InBetweenClassBreaks(heights, 0.05, 2)
	}

	log.Printf("info: z range: %.3f - %.3f\n", tin.Min.Z, tin.Max.Z)

	log.Printf("info: num heights: %d\n", len(heights))

	var isoStmt *sql.Stmt
	if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil {
		return 0, err
	}
	defer isoStmt.Close()

	var id int64

	if err = tx.QueryRowContext(
		ctx,
		insertDiffSQL,
		dci.Bottleneck,
		dci.Minuend.Time,
		dci.Subtrahend.Time,
	).Scan(&id); err != nil {
		return 0, err
	}

	heights = common.DedupFloat64s(heights)

	areas := octree.TraceAreas(heights, isoCellSize, tin.Min, tin.Max, tree.Value)

	var size int

	for i, a := range areas {
		if len(a) == 0 {
			continue
		}
		wkb := a.AsWKB()
		size += len(wkb)
		if _, err = isoStmt.ExecContext(
			ctx,
			id, heights[i], epsg,
			wkb,
			contourTolerance,
		); err != nil {
			return 0, err
		}
	}

	log.Printf("info: Transferred WKB size: %.2fMB.\n",
		float64(size)/(1024*1024))

	log.Printf("info: calculating and storing iso lines took %v\n",
		time.Since(start))

	if err = tx.Commit(); err != nil {
		log.Printf("info: difference calculation failed after %v\n",
			time.Since(begin))
		return 0, err
	}
	log.Printf("info: difference calculation succeed after %v\n",
		time.Since(begin))

	return id, nil
}

func (dcm *diffCalculationManager) calculate() {
	for dci := range dcm.jobs {
		ctx := context.Background()
		var id int64
		var err error
		err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			id, err = doDiff(ctx, conn, dci)
			return err
		})
		dcm.cmds <- func(dcm *diffCalculationManager) {
			msg := diffResult{id: id, err: err}
			key := diffCalcHash(dci)
			list := dcm.waiting[key]
			delete(dcm.waiting, key)
			for _, ch := range list {
				ch <- msg
			}
		}
	}
}

func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult {
	// buffer to prevent blocking even if receiver is dead.
	ch := make(chan diffResult, 1)

	dcm.cmds <- func(dcm *diffCalculationManager) {
		// When already running add to list of waitings
		key := diffCalcHash(dci)
		if list := dcm.waiting[key]; list != nil {
			dcm.waiting[key] = append(list, ch)
			return
		}
		// look if it is in database
		var id int64
		ctx := context.Background()

		err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error {
			return conn.QueryRowContext(
				ctx,
				diffIDSQL,
				dci.Bottleneck,
				dci.Minuend.Time,
				dci.Subtrahend.Time,
			).Scan(&id)
		})

		switch {
		case err == sql.ErrNoRows:
			// We need to calculate it.
		case err != nil: // error
			ch <- diffResult{err: err}
			return
		default: // we have it
			ch <- diffResult{id: id}
			return
		}

		// add to waiting
		dcm.waiting[key] = []chan diffResult{ch}
		// feed to workers
		select {
		case dcm.jobs <- dci:
		default: // buffer is full
			delete(dcm.waiting, key)
			ch <- diffResult{err: errBufferFull}
		}
	}

	return ch
}

func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) {
	dcm.cmds <- func(dcm *diffCalculationManager) {
		key := diffCalcHash(dci)
		list := dcm.waiting[key]
		for i, x := range list {
			if x == ch {
				if i != len(list)-1 {
					copy(list[i:], list[i+1:])
				}
				list[len(list)-1] = nil
				list = list[:len(list)-1]
				if len(list) == 0 {
					delete(dcm.waiting, key)
				} else {
					dcm.waiting[key] = list
				}
				return
			}
		}
	}
}

func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) {

	dci := mw.JSONInput(req).(*models.DiffCalculationInput)

	ctx := req.Context()

	resultCh := diffCalcMng.enque(dci)
	defer diffCalcMng.unregister(dci, resultCh)

	select {
	case res := <-resultCh:
		switch {
		case res.err == errBufferFull:
			err = mw.JSONError{
				Code:    http.StatusTooManyRequests,
				Message: "Too many differences are waiting to be processed",
			}
		case res.err != nil:
			err = res.err
		default:
			jr = mw.JSONResult{
				Result: map[string]int64{"id": res.id},
			}
		}
	case <-ctx.Done():
		log.Println("request canceled")
	}

	return
}