view pkg/imports/isr.go @ 4640:fb09a43b062e

Decouple the tracing of the iso areas from the octree data structure.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 11 Oct 2019 20:09:37 +0200
parents c657dec6b0fa
children 4bbfe3dd2ab5
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/octree"
)

type IsoRefresh struct {
	ClassBreaks string `json:"class-breaks"`
}

// ISRJobKind is the unique name of this import job type.
const ISRJobKind JobKind = "isr"

func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }

type isrJobCreator struct{}

func (isrJobCreator) Description() string { return "refresh iso lines" }

func (isrJobCreator) AutoAccept() bool { return true }

func (isrJobCreator) Create() Job { return new(IsoRefresh) }

func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

func (isrJobCreator) Depends() [2][]string {
	return srJobCreator{}.Depends()
}

const (
	fetchSoundingResultsIDsSQL = `
SELECT bottleneck_id, id
FROM waterway.sounding_results
ORDER BY bottleneck_id
`
	deleteIsoAreasSQL = `
DELETE FROM waterway.sounding_results_iso_areas
WHERE sounding_result_id = $1
`
)

func (isr *IsoRefresh) CleanUp() error { return nil }

type bottleneckSoundingResults struct {
	bn  string
	srs []int64
}

func fetchBottleneckResults(
	ctx context.Context,
	conn *sql.Conn,
) ([]bottleneckSoundingResults, error) {

	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ids []bottleneckSoundingResults

	for rows.Next() {
		var bn string
		var sr int64
		if err := rows.Scan(&bn, &sr); err != nil {
			return nil, err
		}
		if len(ids) > 0 {
			if ids[len(ids)-1].bn != bn {
				ids = append(ids, bottleneckSoundingResults{
					bn:  bn,
					srs: []int64{sr},
				})
			} else {
				ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
			}
		} else {
			ids = []bottleneckSoundingResults{
				{bn: bn, srs: []int64{sr}},
			}
		}
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return ids, nil
}

// Do executes the actual refreshing of the iso lines.
func (isr *IsoRefresh) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	feedback.Info("Regenerating contour lines for sounding results " +
		"after configuration change")
	defer func() {
		feedback.Info(
			"Processing all sounding results took %v.",
			time.Since(start))
	}()

	heights, err := octree.ParseClassBreaks(isr.ClassBreaks)
	if err != nil {
		return nil, err
	}

	bns, err := fetchBottleneckResults(ctx, conn)
	if err != nil {
		return nil, err
	}

	for i := range bns {
		start := time.Now()
		feedback.Info("Processing bottleneck '%s' ...", bns[i].bn)
		err := isr.processBottleneck(
			ctx, conn,
			heights,
			&bns[i],
		)
		feedback.Info("Processing took %v.", time.Since(start))
		if err != nil {
			return nil, err
		}
	}

	return nil, nil
}

func (isr *IsoRefresh) processBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	heights []float64,
	bn *bottleneckSoundingResults,
) error {
	// Do one transaction per bottleneck.
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil
	}
	defer tx.Rollback()

	insertAreasStmt, err := tx.Prepare(insertIsoAreasSQL)
	if err != nil {
		return err
	}
	defer insertAreasStmt.Close()

	// For all sounding results in bottleneck.
	for _, sr := range bn.srs {
		tree, err := octree.FetchOctreeDirectly(ctx, tx, sr)
		if err != nil {
			return err
		}
		hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z)
		hs = common.DedupFloat64s(hs)

		// Delete the old iso areas.
		if _, err := tx.ExecContext(ctx, deleteIsoAreasSQL, sr); err != nil {
			return err
		}

		// Calculate and store the iso areas.
		areas := octree.TraceAreas(hs, isoCellSize, tree.Min, tree.Max, tree.Value)
		for i, a := range areas {
			if len(a) == 0 {
				continue
			}
			if _, err := insertAreasStmt.ExecContext(
				ctx,
				sr, hs[i], tree.EPSG,
				a.AsWKB(),
				contourTolerance,
			); err != nil {
				return err
			}
		}
	}

	return tx.Commit()
}