diff pkg/imports/isr.go @ 4214:49564382ffff

Added a import queue job to recalculate the contour lines of the sounding results if the heights have changed.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 16 Aug 2019 13:15:34 +0200
parents
children e453d3bf7663
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/isr.go	Fri Aug 16 13:15:34 2019 +0200
@@ -0,0 +1,199 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/octree"
+)
+
+type IsoRefresh struct {
+	ClassBreaks string `json:"class-breaks"`
+}
+
+// ISRJobKind is the unique name of this import job type.
+const ISRJobKind JobKind = "isr"
+
+func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }
+
+type isrJobCreator struct{}
+
+func (isrJobCreator) Description() string { return "refresh iso lines" }
+
+func (isrJobCreator) AutoAccept() bool { return true }
+
+func (isrJobCreator) Create() Job { return new(IsoRefresh) }
+
+func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+func (isrJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"sounding_results", "sounding_results_contour_lines"},
+		{},
+	}
+}
+
+const (
+	fetchSoundingResultsIDsSQL = `
+SELECT bottleneck_id, id
+FROM waterway.sounding_results
+ORDER BY bottleneck_id
+`
+	deleteContourLinesSQL = `
+DELETE FROM waterway.sounding_results_contour_lines
+WHERE sounding_result_id = $1
+`
+)
+
+func (isr *IsoRefresh) CleanUp() error { return nil }
+
+type bottleneckSoundingResults struct {
+	bn  int64
+	srs []int64
+}
+
+func fetchBottleneckResults(
+	ctx context.Context,
+	conn *sql.Conn,
+) ([]bottleneckSoundingResults, error) {
+
+	rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	var ids []bottleneckSoundingResults
+
+	for rows.Next() {
+		var bn, sr int64
+		if err := rows.Scan(&bn, sr); err != nil {
+			return nil, err
+		}
+		if len(ids) > 0 {
+			if ids[len(ids)-1].bn != bn {
+				ids = append(ids, bottleneckSoundingResults{
+					bn:  bn,
+					srs: []int64{sr},
+				})
+			} else {
+				ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
+			}
+		} else {
+			ids = []bottleneckSoundingResults{
+				{bn: bn, srs: []int64{sr}},
+			}
+		}
+	}
+	if err := rows.Err(); err != nil {
+		return nil, err
+	}
+	return ids, nil
+}
+
+// Do executes the actual refreshing of the iso lines.
+func (isr *IsoRefresh) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+	defer func() {
+		feedback.Info(
+			"Processing all sounding results took %v.",
+			time.Since(start))
+	}()
+
+	heights, err := octree.ParseClassBreaks(isr.ClassBreaks)
+	if err != nil {
+		return nil, err
+	}
+
+	bns, err := fetchBottleneckResults(ctx, conn)
+	if err != nil {
+		return nil, err
+	}
+
+	for i := range bns {
+		start := time.Now()
+		err := isr.processBottleneck(
+			ctx, conn,
+			heights,
+			&bns[i],
+		)
+		feedback.Info("Processing bottleneck  with ID %d took %v",
+			bns[i].bn,
+			time.Since(start))
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return nil, nil
+}
+
+func (isr *IsoRefresh) processBottleneck(
+	ctx context.Context,
+	conn *sql.Conn,
+	heights []float64,
+	bn *bottleneckSoundingResults,
+) error {
+	// Do one transaction per bottleneck.
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.Prepare(insertContourSQL)
+	if err != nil {
+		return err
+	}
+
+	// For all sounding results in bottleneck.
+	for _, sr := range bn.srs {
+		tree, err := octree.FetchOctreeDirectly(ctx, tx, sr)
+		if err != nil {
+			return err
+		}
+		hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z)
+
+		// Delete the old contour lines.
+		if _, err := tx.ExecContext(ctx, deleteContourLinesSQL, sr); err != nil {
+			return err
+		}
+
+		octree.DoContours(tree, hs, func(res *octree.ContourResult) {
+			if err == nil && len(res.Lines) > 0 {
+				_, err = insertStmt.ExecContext(
+					ctx,
+					sr, res.Height, tree.EPSG,
+					res.Lines.AsWKB2D(),
+					contourTolerance)
+			}
+		})
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}