view pkg/imports/isr.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents 5f47eeea988d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/mesh"
	"gemma.intevation.de/gemma/pkg/models"
)

// IsoRefresh is an import job to refresh the pre-calculated
// iso morphology data when the class breaks were changed.
type IsoRefresh struct {
	ClassBreaks string `json:"class-breaks"`
}

// ISRJobKind is the unique name of this import job type.
const ISRJobKind JobKind = "isr"

func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }

type isrJobCreator struct{}

func (isrJobCreator) Description() string { return "refresh iso lines" }

func (isrJobCreator) AutoAccept() bool { return true }

func (isrJobCreator) Create() Job { return new(IsoRefresh) }

// StageDone is a NOP for IsoRefresh imports.
func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

func (isrJobCreator) Depends() [2][]string {
	return srJobCreator{}.Depends()
}

const (
	fetchSoundingResultsIDsSQL = `
SELECT
  bottleneck_id,
  id,
  surtyp
FROM waterway.sounding_results
ORDER BY bottleneck_id`

	deleteIsoAreasSQL = `
DELETE FROM waterway.sounding_results_iso_areas
WHERE sounding_result_id = $1`

	fetchMarkingPointsSQL = `
SELECT ST_AsBinary(points::geometry(MULTIPOINTZ))
FROM waterway.sounding_results_marking_points
WHERE sounding_result_id = $1`

	deleteMarkingPointsSQL = `
DELETE FROM waterway.sounding_results_marking_points
WHERE sounding_result_id = $1`
)

// CleanUp of a iso refresh import is a NOP.
func (isr *IsoRefresh) CleanUp() error { return nil }

type (
	scanResult struct {
		id         int64
		surveyType models.SurveyType
	}
	bottleneckSoundingResults struct {
		bn  string
		srs []scanResult
	}
)

func fetchBottleneckResults(
	ctx context.Context,
	conn *sql.Conn,
) ([]bottleneckSoundingResults, error) {

	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bsrs []bottleneckSoundingResults

	for rows.Next() {
		var (
			bn string
			id int64
			st string
		)
		if err := rows.Scan(&bn, &id, &st); err != nil {
			return nil, err
		}
		sr := scanResult{id: id, surveyType: models.SurveyType(st)}

		if l := len(bsrs); l == 0 || bsrs[l-1].bn != bn {
			bsrs = append(bsrs, bottleneckSoundingResults{
				bn:  bn,
				srs: []scanResult{sr},
			})
		} else {
			bsrs[l-1].srs = append(bsrs[l-1].srs, sr)
		}
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return bsrs, nil
}

// Do executes the actual refreshing of the iso areas.
func (isr *IsoRefresh) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	feedback.Info("Regenerating iso areas for sounding results " +
		"after configuration change")
	defer func() {
		feedback.Info(
			"Processing all sounding results took %v.",
			time.Since(start))
	}()

	heights, err := mesh.ParseClassBreaks(isr.ClassBreaks)
	if err != nil {
		return nil, err
	}
	heights = heights.Dedup()

	bns, err := fetchBottleneckResults(ctx, conn)
	if err != nil {
		return nil, err
	}

	isrs, err := newISRStmts(ctx, conn)
	if err != nil {
		return nil, err
	}
	defer isrs.close()

	for i := range bns {
		start := time.Now()
		feedback.Info("Processing bottleneck '%s' ...", bns[i].bn)
		err := isr.processBottleneck(
			ctx, conn, feedback,
			isrs,
			heights,
			&bns[i],
		)
		feedback.Info("Processing took %v.", time.Since(start))
		if err != nil {
			return nil, err
		}
	}

	return nil, nil
}

type isrStmts struct {
	insertAreas         *sql.Stmt
	deleteAreas         *sql.Stmt
	fetchMarkingPoints  *sql.Stmt
	deleteMarkingPoints *sql.Stmt
	insertMarkingPoints *sql.Stmt
}

func newISRStmts(ctx context.Context, conn *sql.Conn) (*isrStmts, error) {
	var isrs isrStmts
	for _, x := range []struct {
		stmt  **sql.Stmt
		query string
	}{
		{&isrs.insertAreas, insertIsoAreasSQL},
		{&isrs.deleteAreas, deleteIsoAreasSQL},
		{&isrs.fetchMarkingPoints, fetchMarkingPointsSQL},
		{&isrs.deleteMarkingPoints, deleteMarkingPointsSQL},
		{&isrs.insertMarkingPoints, insertMarkingPointsSQL},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.query); err != nil {
			isrs.close()
			return nil, err
		}
	}
	return &isrs, nil
}

func (isrs *isrStmts) close() {
	for _, x := range []**sql.Stmt{
		&isrs.insertAreas,
		&isrs.deleteAreas,
		&isrs.fetchMarkingPoints,
		&isrs.deleteMarkingPoints,
		&isrs.insertMarkingPoints,
	} {
		if *x != nil {
			(*x).Close()
		}
	}
}

func (isr *IsoRefresh) processBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	isrs *isrStmts,
	heights mesh.ClassBreaks,
	bn *bottleneckSoundingResults,
) error {
	// Do one transaction per bottleneck.
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil
	}
	defer tx.Rollback()

	var (
		insertAreas         = tx.StmtContext(ctx, isrs.insertAreas)
		deleteAreas         = tx.StmtContext(ctx, isrs.deleteAreas)
		fetchMarkingPoints  = tx.StmtContext(ctx, isrs.fetchMarkingPoints)
		deleteMarkingPoints = tx.StmtContext(ctx, isrs.deleteMarkingPoints)
		insertMarkingPoints = tx.StmtContext(ctx, isrs.insertMarkingPoints)
	)

	var markings, beams int

	// For all sounding results in bottleneck.
	for _, sr := range bn.srs {
		switch sr.surveyType {
		case models.SurveyTypeMarking:
			markings++

			// Read all points back in.

			var points mesh.MultiPointZ

			if err := func() error {
				rows, err := fetchMarkingPoints.QueryContext(ctx, sr.id)
				if err != nil {
					return err
				}
				defer rows.Close()

				for rows.Next() {
					var buf []byte
					if err := rows.Scan(&buf); err != nil {
						return err
					}
					var npoints mesh.MultiPointZ
					if err := npoints.FromWKB(buf); err != nil {
						return err
					}
					points = append(points, npoints...)
				}
				return rows.Err()
			}(); err != nil {
				return err
			}

			// Delete old points
			if _, err := deleteMarkingPoints.ExecContext(ctx, sr.id); err != nil {
				return err
			}

			// Re-classify points.
			classes := heights.Classify(points)

			// Re-insert points
			for i, class := range classes {
				// Ignore empty classes
				if len(class) == 0 {
					continue
				}
				if _, err := insertMarkingPoints.ExecContext(
					ctx, sr.id, heights[i], 4326, class.AsWKB(),
				); err != nil {
					return err
				}
			}

		case models.SurveyTypeMultiBeam, models.SurveyTypeSingleBeam:
			beams++

			tree, err := mesh.FetchMeshDirectly(ctx, tx, sr.id)
			if err != nil {
				return err
			}
			hs := heights.ExtrapolateClassBreaks(tree.Min().Z, tree.Max().Z).Dedup()

			// Delete the old iso areas.
			if _, err := deleteAreas.ExecContext(ctx, sr.id); err != nil {
				return err
			}

			// Calculate and store the iso areas.
			box := mesh.Box2D{
				X1: tree.Min().X,
				Y1: tree.Min().Y,
				X2: tree.Max().X,
				Y2: tree.Max().Y,
			}

			raster := mesh.NewRaster(box, isoCellSize)
			raster.Rasterize(tree.Value)
			areas := raster.Trace(hs)

			for i, a := range areas {
				if len(a) == 0 {
					continue
				}
				if _, err := insertAreas.ExecContext(
					ctx,
					sr.id, hs[i], tree.EPSG(),
					a.AsWKB(),
					contourTolerance,
				); err != nil {
					return err
				}
			}
		default:
			log.Errorf("unknown survey type '%s'\n", sr.surveyType)
		}
	}
	feedback.Info("Scan types: Single/Multi: %d Marking: %d", beams, markings)

	return tx.Commit()
}