view pkg/imports/wx.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 6fe3662aafeb
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

// WXJobKind is the import queue type identifier.
const WXJobKind JobKind = "wx"

func init() {
	RegisterJobCreator(WXJobKind,
		&WFSFeatureJobCreator{
			description: "waterway axis",
			depends:     [2][]string{{"waterway_axis"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(insertWaterwayAxisSQL),
				consume,
				axisInvalidation,
				newMultiLineFeature(func() interface{} {
					return new(waterwayAxisProperties)
				}),
			),
		})
}

type waterwayAxisProperties struct {
	ObjNam  string  `json:"hydro_objnam"`
	NObjNnm *string `json:"hydro_nobjnm"`
}

const (
	insertWaterwayAxisSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT
    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2)))
      AS new_ax
  FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
    LATERAL (SELECT
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
          OR ST_Covers((SELECT a FROM resp),
            ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp)))
        THEN new_line
        ELSE ST_Intersection((SELECT ST_Buffer(a, -0.0001) FROM resp),
          ST_Node(ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp))))
        END) AS new_ax (new_ax)
  -- Do nothing if intersection is empty:
  WHERE NOT ST_IsEmpty(new_ax)
),
t AS (
  UPDATE waterway.waterway_axis SET last_found = current_timestamp
  -- The first condition is just to help the PostgreSQL query planner
  -- to avoid evaluating more costly conditions including those
  -- introduced by row level security policies:
  WHERE (SELECT new_ax FROM g) IS NOT NULL
    AND validity @> current_timestamp
    AND (
        wtwaxs, objnam, nobjnam
      ) IS NOT DISTINCT FROM (
        (SELECT new_ax FROM g), $3, $4)
  RETURNING 1
)
INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
SELECT new_ax, $3, $4
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`
	invalidateAxisSQL = `
UPDATE waterway.waterway_axis
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
`
)

func axisInvalidation(spc *SQLGeometryConsumer) error {
	res, err := spc.tx.ExecContext(spc.ctx, invalidateAxisSQL)
	if err != nil {
		return err
	}
	old, err := res.RowsAffected()
	if err != nil {
		return err
	}
	if old == 0 {
		return ErrFeaturesUnmodified
	}
	spc.feedback.Info(
		"Number of features removed from data source: %d", old)
	return nil
}