view pkg/imports/isr.go @ 4562:5cc4042cf07c iso-areas

Started with integrating iso area generation into gemma server. WIP
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 02 Oct 2019 18:35:09 +0200
parents 671441357db0
children a413b4a89bcc
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/octree"
)

type IsoRefresh struct {
	ClassBreaks string `json:"class-breaks"`
}

// ISRJobKind is the unique name of this import job type.
const ISRJobKind JobKind = "isr"

func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }

type isrJobCreator struct{}

func (isrJobCreator) Description() string { return "refresh iso lines" }

func (isrJobCreator) AutoAccept() bool { return true }

func (isrJobCreator) Create() Job { return new(IsoRefresh) }

func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

func (isrJobCreator) Depends() [2][]string {
	return [2][]string{
		{"sounding_results", "sounding_results_contour_lines"},
		{},
	}
}

const (
	fetchSoundingResultsIDsSQL = `
SELECT bottleneck_id, id
FROM waterway.sounding_results
ORDER BY bottleneck_id
`
	deleteContourLinesSQL = `
DELETE FROM waterway.sounding_results_contour_lines
WHERE sounding_result_id = $1
`
)

func (isr *IsoRefresh) CleanUp() error { return nil }

type bottleneckSoundingResults struct {
	bn  string
	srs []int64
}

func fetchBottleneckResults(
	ctx context.Context,
	conn *sql.Conn,
) ([]bottleneckSoundingResults, error) {

	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var ids []bottleneckSoundingResults

	for rows.Next() {
		var bn string
		var sr int64
		if err := rows.Scan(&bn, &sr); err != nil {
			return nil, err
		}
		if len(ids) > 0 {
			if ids[len(ids)-1].bn != bn {
				ids = append(ids, bottleneckSoundingResults{
					bn:  bn,
					srs: []int64{sr},
				})
			} else {
				ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
			}
		} else {
			ids = []bottleneckSoundingResults{
				{bn: bn, srs: []int64{sr}},
			}
		}
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return ids, nil
}

// Do executes the actual refreshing of the iso lines.
func (isr *IsoRefresh) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	feedback.Info("Regenerating contour lines for sounding results " +
		"after configuration change")
	defer func() {
		feedback.Info(
			"Processing all sounding results took %v.",
			time.Since(start))
	}()

	heights, err := octree.ParseClassBreaks(isr.ClassBreaks)
	if err != nil {
		return nil, err
	}

	bns, err := fetchBottleneckResults(ctx, conn)
	if err != nil {
		return nil, err
	}

	for i := range bns {
		start := time.Now()
		feedback.Info("Processing bottleneck '%s' ...", bns[i].bn)
		err := isr.processBottleneck(
			ctx, conn,
			heights,
			&bns[i],
		)
		feedback.Info("Processing took %v.", time.Since(start))
		if err != nil {
			return nil, err
		}
	}

	return nil, nil
}

func (isr *IsoRefresh) processBottleneck(
	ctx context.Context,
	conn *sql.Conn,
	heights []float64,
	bn *bottleneckSoundingResults,
) error {
	// Do one transaction per bottleneck.
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil
	}
	defer tx.Rollback()

	insertStmt, err := tx.Prepare(insertContourSQL)
	if err != nil {
		return err
	}

	// For all sounding results in bottleneck.
	for _, sr := range bn.srs {
		tree, err := octree.FetchOctreeDirectly(ctx, tx, sr)
		if err != nil {
			return err
		}
		hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z)
		hs = common.DedupFloat64s(hs)

		// Delete the old contour lines.
		if _, err := tx.ExecContext(ctx, deleteContourLinesSQL, sr); err != nil {
			return err
		}

		octree.DoContours(tree, hs, func(res *octree.ContourResult) {
			if err == nil && len(res.Lines) > 0 {
				_, err = insertStmt.ExecContext(
					ctx,
					sr, res.Height, tree.EPSG,
					res.Lines.AsWKB2D(),
					contourTolerance)
			}
		})
		if err != nil {
			return err
		}
	}

	return tx.Commit()
}