view pkg/imports/st.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 59a99655f34d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
)

// Stretch is a Job to create a stretch in the database.
type Stretch struct {
	Name      string                 `json:"name"`
	From      models.Isrs            `json:"from"`
	To        models.Isrs            `json:"to"`
	Tolerance float32                `json:"tolerance"`
	ObjNam    string                 `json:"objnam"`
	NObjNam   *string                `json:"nobjnam"`
	Source    string                 `json:"source-organization"`
	Date      models.Date            `json:"date-info"`
	Countries models.UniqueCountries `json:"countries"`
}

// STJobKind is the import queue type identifier.
const STJobKind JobKind = "st"

type stJobCreator struct{}

func init() { RegisterJobCreator(STJobKind, stJobCreator{}) }

func (stJobCreator) Description() string { return "stretch" }

func (stJobCreator) AutoAccept() bool { return false }

func (stJobCreator) Create() Job { return new(Stretch) }

func (stJobCreator) Depends() [2][]string {
	return [2][]string{
		{"stretches", "stretch_countries"},
		{"distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	stDeleteSQL = `
DELETE FROM users.stretches WHERE
staging_done AND name IN (
  SELECT name
  FROM users.stretches WHERE
  id IN (
    SELECT key from import.track_imports
    WHERE import_id = $1 AND
      relation = 'users.stretches'::regclass)
  AND NOT staging_done
)`

	stStageDoneSQL = `
UPDATE users.stretches SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'users.stretches'::regclass)`

	stInsertSQL = `
WITH
bounds (b) AS (VALUES (
    ($1::char(2),
     $2::char(3),
     $3::char(5),
     $4::char(5),
     $5::int)::isrs
  ), (
    ($6::char(2),
     $7::char(3),
     $8::char(5),
     $9::char(5),
     $10::int)::isrs)),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r),
axs AS (
  SELECT ISRSrange_axis((SELECT r FROM r), $16::double precision) AS axs)
INSERT INTO users.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $11,
  (SELECT r FROM r),
  ST_Transform(ISRSrange_area(
      (SELECT axs FROM axs),
      (SELECT ST_Buffer(axs, 10000) FROM axs)),
    4326),
  $12,
  $13,
  $14,
  $15)
RETURNING id`

	stInsertCountrySQL = `
INSERT INTO users.stretch_countries (
  stretch_id,
  country
) VALUES (
  $1,
  $2
)`
)

// StageDone moves the imported stretch out of the staging area.
func (stJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	if _, err := tx.ExecContext(ctx, stDeleteSQL, id); err != nil {
		return err
	}
	_, err := tx.ExecContext(ctx, stStageDoneSQL, id)
	return err
}

// CleanUp of a stretch import is a NOP.
func (*Stretch) CleanUp() error { return nil }

// Do executes the actual stretch import.
func (st *Stretch) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	if st.Date.Time.IsZero() {
		st.Date = models.Date{Time: start}
	}

	feedback.Info("Storing stretch '%s'", st.Name)

	if len(st.Countries) == 0 {
		return nil, errors.New("list of countries is empty")
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}
	defer insertCountryStmt.Close()

	var nobjnm sql.NullString
	if st.NObjNam != nil {
		nobjnm = sql.NullString{String: *st.NObjNam, Valid: true}
	}

	feedback.Info("Stretch from %s to %s.", st.From.String(), st.To.String())
	feedback.Info("Tolerance used to snap waterway axis: %g", st.Tolerance)

	var id int64
	if err := tx.QueryRowContext(
		ctx,
		stInsertSQL,
		st.From.CountryCode,
		st.From.LoCode,
		st.From.FairwaySection,
		st.From.Orc,
		st.From.Hectometre,
		st.To.CountryCode,
		st.To.LoCode,
		st.To.FairwaySection,
		st.To.Orc,
		st.To.Hectometre,
		st.Name,
		st.ObjNam,
		nobjnm,
		st.Date.Time,
		st.Source,
		st.Tolerance,
	).Scan(&id); err != nil {
		return nil, err
	}

	// store the associated countries.

	feedback.Info("Countries associated with stretch: %s.", st.Countries)
	for _, c := range st.Countries {
		if _, err := insertCountryStmt.ExecContext(ctx, id, c); err != nil {
			return nil, err
		}
	}

	if err := track(ctx, tx, importID, "users.stretches", id); err != nil {
		return nil, err
	}

	feedback.Info("Storing stretch '%s' took %s", st.Name, time.Since(start))
	if err := tx.Commit(); err != nil {
		return nil, err
	}
	feedback.Info("Import of stretch was successful")

	summary := st // provide full information for summary

	return summary, nil
}