view pkg/imports/sec.go @ 5520:05db984d3db1

Improve performance of bottleneck area calculation Avoid buffer calculations by replacing them with simple distance comparisons and calculate the boundary of the result geometry only once per iteration. In some edge cases with very large numbers of iterations, this reduced the runtime of a bottleneck import by a factor of more than twenty.
author Tom Gottfried <tom@intevation.de>
date Thu, 21 Oct 2021 19:50:39 +0200
parents 59a99655f34d
children f2204f91d286
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018,2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
)

// Section is a Job to create a section in the database.
type Section struct {
	Name      string      `json:"name"`
	From      models.Isrs `json:"from"`
	To        models.Isrs `json:"to"`
	Tolerance float32     `json:"tolerance"`
	ObjNam    string      `json:"objnam"`
	NObjNam   *string     `json:"nobjnam"`
	Source    string      `json:"source-organization"`
	Date      models.Date `json:"date-info"`
}

// Description gives a short info about relevant facts of this import.
func (sec *Section) Description() (string, error) {
	return strings.Join([]string{
		sec.Name,
		sec.ObjNam,
		sec.From.String(),
		sec.To.String(),
	}, "|"), nil
}

// SECJobKind is the import queue type identifier.
const SECJobKind JobKind = "sec"

type secJobCreator struct{}

func init() { RegisterJobCreator(SECJobKind, secJobCreator{}) }

func (secJobCreator) Description() string { return "section" }

func (secJobCreator) AutoAccept() bool { return false }

func (secJobCreator) Create() Job { return new(Section) }

func (secJobCreator) Depends() [2][]string {
	return [2][]string{
		{"sections"},
		{"distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	secDeleteSQL = `
DELETE FROM waterway.sections WHERE
staging_done AND name = (
  SELECT name
  FROM waterway.sections WHERE
  id = (
    SELECT key from import.track_imports
    WHERE import_id = $1 AND
      relation = 'waterway.sections'::regclass)
  AND NOT staging_done
)`

	secStageDoneSQL = `
UPDATE waterway.sections SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.sections'::regclass)`

	secInsertSQL = `
WITH
bounds (b) AS (VALUES (
    ($1::char(2),
     $2::char(3),
     $3::char(5),
     $4::char(5),
     $5::int)::isrs
  ), (
    ($6::char(2),
     $7::char(3),
     $8::char(5),
     $9::char(5),
     $10::int)::isrs)),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r),
axs AS (
  SELECT ISRSrange_axis((SELECT r FROM r), $16::double precision) AS axs)
INSERT INTO waterway.sections (
  name,
  section,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $11,
  (SELECT r FROM r),
  ST_Transform(ISRSrange_area(
      (SELECT axs FROM axs),
      (SELECT ST_Buffer(axs, 150) FROM axs)),
    4326),
  $12,
  $13,
  $14,
  $15)
RETURNING id`
)

// StageDone moves the imported section out of the staging area.
func (secJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	if _, err := tx.ExecContext(ctx, secDeleteSQL, id); err != nil {
		return err
	}
	_, err := tx.ExecContext(ctx, secStageDoneSQL, id)
	return err
}

// CleanUp of a section import is a NOP.
func (*Section) CleanUp() error { return nil }

// Do executes the actual section import.
func (sec *Section) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	if sec.Date.Time.IsZero() {
		sec.Date = models.Date{Time: start}
	}

	feedback.Info("Storing section '%s'", sec.Name)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var nobjnm sql.NullString
	if sec.NObjNam != nil {
		nobjnm = sql.NullString{String: *sec.NObjNam, Valid: true}
	}

	feedback.Info("Section from %s to %s.", sec.From.String(), sec.To.String())
	feedback.Info("Tolerance used to snap waterway axis: %g", sec.Tolerance)

	var id int64
	if err := tx.QueryRowContext(
		ctx,
		secInsertSQL,
		sec.From.CountryCode,
		sec.From.LoCode,
		sec.From.FairwaySection,
		sec.From.Orc,
		sec.From.Hectometre,
		sec.To.CountryCode,
		sec.To.LoCode,
		sec.To.FairwaySection,
		sec.To.Orc,
		sec.To.Hectometre,
		sec.Name,
		sec.ObjNam,
		nobjnm,
		sec.Date.Time,
		sec.Source,
		sec.Tolerance,
	).Scan(&id); err != nil {
		return nil, err
	}

	if err := track(ctx, tx, importID, "waterway.sections", id); err != nil {
		return nil, err
	}

	feedback.Info("Storing section '%s' took %s", sec.Name, time.Since(start))
	if err := tx.Commit(); err != nil {
		return nil, err
	}
	feedback.Info("Import of section was successful")

	summary := sec // to provide full data for review

	return summary, nil
}