view pkg/imports/sec.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents f2204f91d286
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018,2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
)

// Section is a Job to create a section in the database.
type Section struct {
	Name      string      `json:"name"`
	From      models.Isrs `json:"from"`
	To        models.Isrs `json:"to"`
	Tolerance float32     `json:"tolerance"`
	ObjNam    string      `json:"objnam"`
	NObjNam   *string     `json:"nobjnam"`
	Source    string      `json:"source-organization"`
	Date      models.Date `json:"date-info"`
}

// Description gives a short info about relevant facts of this import.
func (sec *Section) Description([]string) (string, error) {
	return strings.Join([]string{
		sec.Name,
		sec.ObjNam,
		sec.From.String(),
		sec.To.String(),
	}, "|"), nil
}

// SECJobKind is the import queue type identifier.
const SECJobKind JobKind = "sec"

type secJobCreator struct{}

func init() { RegisterJobCreator(SECJobKind, secJobCreator{}) }

func (secJobCreator) Description() string { return "section" }

func (secJobCreator) AutoAccept() bool { return false }

func (secJobCreator) Create() Job { return new(Section) }

func (secJobCreator) Depends() [2][]string {
	return [2][]string{
		{"sections"},
		{"distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	secDeleteSQL = `
DELETE FROM waterway.sections WHERE
staging_done AND name = (
  SELECT name
  FROM waterway.sections WHERE
  id = (
    SELECT key from import.track_imports
    WHERE import_id = $1 AND
      relation = 'waterway.sections'::regclass)
  AND NOT staging_done
)`

	secStageDoneSQL = `
UPDATE waterway.sections SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.sections'::regclass)`

	secInsertSQL = `
WITH
bounds (b) AS (VALUES (
    ($1::char(2),
     $2::char(3),
     $3::char(5),
     $4::char(5),
     $5::int)::isrs
  ), (
    ($6::char(2),
     $7::char(3),
     $8::char(5),
     $9::char(5),
     $10::int)::isrs)),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r),
axs AS (
  SELECT ISRSrange_axis((SELECT r FROM r), $16::double precision) AS axs)
INSERT INTO waterway.sections (
  name,
  section,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $11,
  (SELECT r FROM r),
  ST_Transform(ISRSrange_area(
      (SELECT axs FROM axs),
      (SELECT ST_Buffer(axs, 150) FROM axs)),
    4326),
  $12,
  $13,
  $14,
  $15)
RETURNING id`
)

// StageDone moves the imported section out of the staging area.
func (secJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	if _, err := tx.ExecContext(ctx, secDeleteSQL, id); err != nil {
		return err
	}
	_, err := tx.ExecContext(ctx, secStageDoneSQL, id)
	return err
}

// CleanUp of a section import is a NOP.
func (*Section) CleanUp() error { return nil }

// Do executes the actual section import.
func (sec *Section) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	if sec.Date.Time.IsZero() {
		sec.Date = models.Date{Time: start}
	}

	feedback.Info("Storing section '%s'", sec.Name)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var nobjnm sql.NullString
	if sec.NObjNam != nil {
		nobjnm = sql.NullString{String: *sec.NObjNam, Valid: true}
	}

	feedback.Info("Section from %s to %s.", sec.From.String(), sec.To.String())
	feedback.Info("Tolerance used to snap waterway axis: %g", sec.Tolerance)

	var id int64
	if err := tx.QueryRowContext(
		ctx,
		secInsertSQL,
		sec.From.CountryCode,
		sec.From.LoCode,
		sec.From.FairwaySection,
		sec.From.Orc,
		sec.From.Hectometre,
		sec.To.CountryCode,
		sec.To.LoCode,
		sec.To.FairwaySection,
		sec.To.Orc,
		sec.To.Hectometre,
		sec.Name,
		sec.ObjNam,
		nobjnm,
		sec.Date.Time,
		sec.Source,
		sec.Tolerance,
	).Scan(&id); err != nil {
		return nil, err
	}

	if err := track(ctx, tx, importID, "waterway.sections", id); err != nil {
		return nil, err
	}

	feedback.Info("Storing section '%s' took %s", sec.Name, time.Since(start))
	if err := tx.Commit(); err != nil {
		return nil, err
	}
	feedback.Info("Import of section was successful")

	summary := sec // to provide full data for review

	return summary, nil
}