view pkg/imports/isr.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 5f47eeea988d
children 2dd155cc95ec
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/mesh"
	"gemma.intevation.de/gemma/pkg/models"
)

// IsoRefresh is an import job to refresh the pre-calculated
// iso morphology data when the class breaks were changed.
type IsoRefresh struct {
	ClassBreaks string `json:"class-breaks"`
}

// ISRJobKind is the unique name of this import job type.
const ISRJobKind JobKind = "isr"

func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }

type isrJobCreator struct{}

func (isrJobCreator) Description() string { return "refresh iso lines" }

func (isrJobCreator) AutoAccept() bool { return true }

func (isrJobCreator) Create() Job { return new(IsoRefresh) }

// StageDone is a NOP for IsoRefresh imports.
func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

func (isrJobCreator) Depends() [2][]string {
	return srJobCreator{}.Depends()
}

const (
	fetchSoundingResultsIDsSQL = `
SELECT
  bottleneck_id,
  id,
  surtyp
FROM waterway.sounding_results
ORDER BY bottleneck_id`

	deleteIsoAreasSQL = `
DELETE FROM waterway.sounding_results_iso_areas
WHERE sounding_result_id = $1`

	fetchMarkingPointsSQL = `
SELECT ST_AsBinary(points::geometry(MULTIPOINTZ))
FROM waterway.sounding_results_marking_points
WHERE sounding_result_id = $1`

	deleteMarkingPointsSQL = `
DELETE FROM waterway.sounding_results_marking_points
WHERE sounding_result_id = $1`
)

// CleanUp of a iso refresh import is a NOP.
func (isr *IsoRefresh) CleanUp() error { return nil }

type (
	scanResult struct {
		id         int64
		surveyType models.SurveyType
	}
	bottleneckSoundingResults struct {
		bn  string
		srs []scanResult
	}
)

func fetchBottleneckResults(
	ctx context.Context,
	conn *sql.Conn,
) ([]bottleneckSoundingResults, error) {

	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bsrs []bottleneckSoundingResults

	for rows.Next() {
		var (
			bn string
			id int64
			st string
		)
		if err := rows.Scan(&bn, &id, &st); err != nil {
			return nil, err
		}
		sr := scanResult{id: id, surveyType: models.SurveyType(st)}

		if l := len(bsrs); l == 0 || bsrs[l-1].bn != bn {
			bsrs = append(bsrs, bottleneckSoundingResults{
				bn:  bn,
				srs: []scanResult{sr},
			})
		} else {
			bsrs[l-1].srs = append(bsrs[l-1].srs, sr)
		}
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return bsrs, nil
}

// Do executes the actual refreshing of the iso areas.
func (isr *IsoRefresh) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	feedback.Info("Regenerating iso areas for sounding results " +
		"after configuration change")
	defer func() {
		feedback.Info(
			"Processing all sounding results took %v.",
			time.Since(start))
	}()

	heights, err := mesh.ParseClassBreaks(isr.ClassBreaks)
	if err != nil {
		return nil, err
	}
	heights = heights.Dedup()

	bns, err := fetchBottleneckResults(ctx, conn)
	if err != nil {
		return nil, err
	}

	isrs, err := newISRStmts(ctx, conn)
	if err != nil {
		return nil, err
	}
	defer isrs.close()

	for i := range bns {
		start := time.Now()
		feedback.Info("Processing bottleneck '%s' ...", bns[i].bn)
		err := isr.processBottleneck(
			ctx, conn, feedback,
			isrs,
			heights,
			&bns[i],
		)
		feedback.Info("Processing took %v.", time.Since(start))
		if err != nil {
			return nil, err
		}
	}

	return nil, nil
}

type isrStmts struct {
	insertAreas         *sql.Stmt
	deleteAreas         *sql.Stmt
	fetchMarkingPoints  *sql.Stmt
	deleteMarkingPoints *sql.Stmt
	insertMarkingPoints *sql.Stmt
}

func newISRStmts(ctx context.Context, conn *sql.Conn) (*isrStmts, error) {
	var isrs isrStmts
	for _, x := range []struct {
		stmt  **sql.Stmt
		query string
	}{
		{&isrs.insertAreas, insertIsoAreasSQL},
		{&isrs.deleteAreas, deleteIsoAreasSQL},
		{&isrs.fetchMarkingPoints, fetchMarkingPointsSQL},
		{&isrs.deleteMarkingPoints, deleteMarkingPointsSQL},
		{&isrs.insertMarkingPoints, insertMarkingPointsSQL},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.query); err != nil {
			isrs.close()
			return nil, err
		}
	}
	return &isrs, nil
}

func (isrs *isrStmts) close() {
	for _, x := range []**sql.Stmt{
		&isrs.insertAreas,
		&isrs.deleteAreas,
		&isrs.fetchMarkingPoints,
		&isrs.deleteMarkingPoints,
		&isrs.insertMarkingPoints,
	} {
		if *x != nil {
			(*x).Close()
		}
	}
}

func (isr *IsoRefresh) processBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	isrs *isrStmts,
	heights mesh.ClassBreaks,
	bn *bottleneckSoundingResults,
) error {
	// Do one transaction per bottleneck.
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil
	}
	defer tx.Rollback()

	var (
		insertAreas         = tx.StmtContext(ctx, isrs.insertAreas)
		deleteAreas         = tx.StmtContext(ctx, isrs.deleteAreas)
		fetchMarkingPoints  = tx.StmtContext(ctx, isrs.fetchMarkingPoints)
		deleteMarkingPoints = tx.StmtContext(ctx, isrs.deleteMarkingPoints)
		insertMarkingPoints = tx.StmtContext(ctx, isrs.insertMarkingPoints)
	)

	var markings, beams int

	// For all sounding results in bottleneck.
	for _, sr := range bn.srs {
		switch sr.surveyType {
		case models.SurveyTypeMarking:
			markings++

			// Read all points back in.

			var points mesh.MultiPointZ

			if err := func() error {
				rows, err := fetchMarkingPoints.QueryContext(ctx, sr.id)
				if err != nil {
					return err
				}
				defer rows.Close()

				for rows.Next() {
					var buf []byte
					if err := rows.Scan(&buf); err != nil {
						return err
					}
					var npoints mesh.MultiPointZ
					if err := npoints.FromWKB(buf); err != nil {
						return err
					}
					points = append(points, npoints...)
				}
				return rows.Err()
			}(); err != nil {
				return err
			}

			// Delete old points
			if _, err := deleteMarkingPoints.ExecContext(ctx, sr.id); err != nil {
				return err
			}

			// Re-classify points.
			classes := heights.Classify(points)

			// Re-insert points
			for i, class := range classes {
				// Ignore empty classes
				if len(class) == 0 {
					continue
				}
				if _, err := insertMarkingPoints.ExecContext(
					ctx, sr.id, heights[i], 4326, class.AsWKB(),
				); err != nil {
					return err
				}
			}

		case models.SurveyTypeMultiBeam, models.SurveyTypeSingleBeam:
			beams++

			tree, err := mesh.FetchMeshDirectly(ctx, tx, sr.id)
			if err != nil {
				return err
			}
			hs := heights.ExtrapolateClassBreaks(tree.Min().Z, tree.Max().Z).Dedup()

			// Delete the old iso areas.
			if _, err := deleteAreas.ExecContext(ctx, sr.id); err != nil {
				return err
			}

			// Calculate and store the iso areas.
			box := mesh.Box2D{
				X1: tree.Min().X,
				Y1: tree.Min().Y,
				X2: tree.Max().X,
				Y2: tree.Max().Y,
			}

			raster := mesh.NewRaster(box, isoCellSize)
			raster.Rasterize(tree.Value)
			areas := raster.Trace(hs)

			for i, a := range areas {
				if len(a) == 0 {
					continue
				}
				if _, err := insertAreas.ExecContext(
					ctx,
					sr.id, hs[i], tree.EPSG(),
					a.AsWKB(),
					contourTolerance,
				); err != nil {
					return err
				}
			}
		default:
			log.Errorf("unknown survey type '%s'\n", sr.surveyType)
		}
	}
	feedback.Info("Scan types: Single/Multi: %d Marking: %d", beams, markings)

	return tx.Commit()
}