view pkg/imports/st.go @ 2693:02d012272dd4

Log snapping tolerance
author Tom Gottfried <tom@intevation.de>
date Fri, 15 Mar 2019 18:24:57 +0100
parents dc4fae4bdb8f
children 2b4727a32ce6
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
)

type Stretch struct {
	Name      string                 `json:"name"`
	From      models.Isrs            `json:"from"`
	To        models.Isrs            `json:"to"`
	Tolerance float32                `json:"tolerance"`
	ObjNam    string                 `json:"objnam"`
	NObjNam   *string                `json:"nobjnam"`
	Source    string                 `json:"source-organization"`
	Date      models.Date            `json:"date-info"`
	Countries models.UniqueCountries `json:"countries"`
}

const STJobKind JobKind = "st"

type stJobCreator struct{}

func init() {
	RegisterJobCreator(STJobKind, stJobCreator{})
}

func (stJobCreator) Description() string { return "stretch" }

func (stJobCreator) AutoAccept() bool { return false }

func (stJobCreator) Create() Job { return new(Stretch) }

func (stJobCreator) Depends() []string {
	return []string{
		"stretches",
	}
}

const (
	stDeleteSQL = `
DELETE FROM waterway.stretches WHERE
staging_done AND name = (
  SELECT name
  FROM waterway.stretches WHERE
  id = (
    SELECT key from import.track_imports
    WHERE import_id = $1 AND
      relation = 'waterway.stretches'::regclass)
  AND NOT staging_done
)`

	stStageDoneSQL = `
UPDATE waterway.stretches SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.stretches'::regclass)`

	stInsertSQL = `
WITH r AS (
  SELECT isrsrange(
	( $1::char(2),
	  $2::char(3),
	  $3::char(5),
	  $4::char(5),
	  $5::int),
	( $6::char(2),
	  $7::char(3),
	  $8::char(5),
	  $9::char(5),
	  $10::int)) AS r
)
INSERT INTO waterway.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $11,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r), $16::double precision),
    (SELECT ST_Collect(CAST(area AS geometry))
      FROM waterway.waterway_area)),
  $12,
  $13,
  $14,
  $15)
RETURNING id`

	stInsertCountrySQL = `
INSERT INTO waterway.stretch_countries (
  stretches_id,
  country_code
) VALUES (
  $1,
  $2
)`
)

// StageDone moves the imported stretch out of the staging area.
func (stJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	if _, err := tx.ExecContext(ctx, stDeleteSQL, id); err != nil {
		return err
	}
	_, err := tx.ExecContext(ctx, stStageDoneSQL, id)
	return err
}

// CleanUp of a stretch import is a NOP.
func (*Stretch) CleanUp() error { return nil }

// Do executes the actual stretch import.
func (st *Stretch) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	if st.Date.Time.IsZero() {
		st.Date = models.Date{Time: start}
	}

	feedback.Info("Storing stretch '%s'", st.Name)

	if len(st.Countries) == 0 {
		return nil, errors.New("List of countries is empty")
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}

	var nobjnm sql.NullString
	if st.NObjNam != nil {
		nobjnm = sql.NullString{String: *st.NObjNam, Valid: true}
	}

	feedback.Info("Stretch from %s to %s.", st.From.String(), st.To.String())
	feedback.Info("Tolerance used to snap waterway axis: %g", st.Tolerance)

	var id int64
	if err := tx.QueryRowContext(
		ctx,
		stInsertSQL,
		st.From.CountryCode,
		st.From.LoCode,
		st.From.FairwaySection,
		st.From.Orc,
		st.From.Hectometre,
		st.To.CountryCode,
		st.To.LoCode,
		st.To.FairwaySection,
		st.To.Orc,
		st.To.Hectometre,
		st.Name,
		st.ObjNam,
		nobjnm,
		st.Date.Time,
		st.Source,
		st.Tolerance,
	).Scan(&id); err != nil {
		return nil, handleError(err)
	}

	// store the associated countries.

	feedback.Info("Countries associated with stretch: %s.", st.Countries)
	for _, c := range st.Countries {
		if _, err := insertCountryStmt.ExecContext(ctx, id, c); err != nil {
			return nil, err
		}
	}

	if err := track(ctx, tx, importID, "waterway.stretches", id); err != nil {
		return nil, err
	}

	feedback.Info("Storing stretch '%s' took %s", st.Name, time.Since(start))
	if err := tx.Commit(); err != nil {
		return nil, err
	}
	feedback.Info("Import of stretch was successful")

	summary := struct {
		Stretch string `json:"stretch"`
	}{
		Stretch: st.Name,
	}

	return &summary, nil
}