view pkg/imports/isr.go @ 5412:34bc6041e61e marking-single-beam

Added a type for class breaks.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 07 Jul 2021 10:58:14 +0200
parents 56c589f7435d
children c9da747d4109
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/mesh"
)

// IsoRefresh is an import job to refresh the pre-calculated
// iso morphology data when the class breaks were changed.
type IsoRefresh struct {
	ClassBreaks string `json:"class-breaks"`
}

// ISRJobKind is the unique name of this import job type.
const ISRJobKind JobKind = "isr"

func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }

type isrJobCreator struct{}

func (isrJobCreator) Description() string { return "refresh iso lines" }

func (isrJobCreator) AutoAccept() bool { return true }

func (isrJobCreator) Create() Job { return new(IsoRefresh) }

// StageDone is a NOP for IsoRefresh imports.
func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

func (isrJobCreator) Depends() [2][]string {
	return srJobCreator{}.Depends()
}

const (
	fetchSoundingResultsIDsSQL = `
SELECT bottleneck_id, id
FROM waterway.sounding_results
ORDER BY bottleneck_id
`
	deleteIsoAreasSQL = `
DELETE FROM waterway.sounding_results_iso_areas
WHERE sounding_result_id = $1
`
)

// CleanUp of a iso refresh import is a NOP.
func (isr *IsoRefresh) CleanUp() error { return nil }

type bottleneckSoundingResults struct {
	bn  string
	srs []int64
}

func fetchBottleneckResults(
	ctx context.Context,
	conn *sql.Conn,
) ([]bottleneckSoundingResults, error) {

	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ids []bottleneckSoundingResults

	for rows.Next() {
		var bn string
		var sr int64
		if err := rows.Scan(&bn, &sr); err != nil {
			return nil, err
		}
		if len(ids) > 0 {
			if ids[len(ids)-1].bn != bn {
				ids = append(ids, bottleneckSoundingResults{
					bn:  bn,
					srs: []int64{sr},
				})
			} else {
				ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
			}
		} else {
			ids = []bottleneckSoundingResults{
				{bn: bn, srs: []int64{sr}},
			}
		}
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return ids, nil
}

// Do executes the actual refreshing of the iso areas.
func (isr *IsoRefresh) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	feedback.Info("Regenerating iso areas for sounding results " +
		"after configuration change")
	defer func() {
		feedback.Info(
			"Processing all sounding results took %v.",
			time.Since(start))
	}()

	heights, err := mesh.ParseClassBreaks(isr.ClassBreaks)
	if err != nil {
		return nil, err
	}

	bns, err := fetchBottleneckResults(ctx, conn)
	if err != nil {
		return nil, err
	}

	for i := range bns {
		start := time.Now()
		feedback.Info("Processing bottleneck '%s' ...", bns[i].bn)
		err := isr.processBottleneck(
			ctx, conn,
			heights,
			&bns[i],
		)
		feedback.Info("Processing took %v.", time.Since(start))
		if err != nil {
			return nil, err
		}
	}

	return nil, nil
}

func (isr *IsoRefresh) processBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	heights mesh.ClassBreaks,
	bn *bottleneckSoundingResults,
) error {
	// Do one transaction per bottleneck.
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil
	}
	defer tx.Rollback()

	insertAreasStmt, err := tx.Prepare(insertIsoAreasSQL)
	if err != nil {
		return err
	}
	defer insertAreasStmt.Close()

	// For all sounding results in bottleneck.
	for _, sr := range bn.srs {
		tree, err := mesh.FetchMeshDirectly(ctx, tx, sr)
		if err != nil {
			return err
		}
		hs := heights.ExtrapolateClassBreaks(tree.Min().Z, tree.Max().Z)
		hs = common.DedupFloat64s(hs)

		// Delete the old iso areas.
		if _, err := tx.ExecContext(ctx, deleteIsoAreasSQL, sr); err != nil {
			return err
		}

		// Calculate and store the iso areas.
		box := mesh.Box2D{
			X1: tree.Min().X,
			Y1: tree.Min().Y,
			X2: tree.Max().X,
			Y2: tree.Max().Y,
		}

		raster := mesh.NewRaster(box, isoCellSize)
		raster.Rasterize(tree.Value)
		areas := raster.Trace(hs)

		for i, a := range areas {
			if len(a) == 0 {
				continue
			}
			if _, err := insertAreasStmt.ExecContext(
				ctx,
				sr, hs[i], tree.EPSG(),
				a.AsWKB(),
				contourTolerance,
			); err != nil {
				return err
			}
		}
	}

	return tx.Commit()
}