view pkg/imports/wx.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents 6fe3662aafeb
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

// WXJobKind is the import queue type identifier.
const WXJobKind JobKind = "wx"

func init() {
	RegisterJobCreator(WXJobKind,
		&WFSFeatureJobCreator{
			description: "waterway axis",
			depends:     [2][]string{{"waterway_axis"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(insertWaterwayAxisSQL),
				consume,
				axisInvalidation,
				newMultiLineFeature(func() interface{} {
					return new(waterwayAxisProperties)
				}),
			),
		})
}

type waterwayAxisProperties struct {
	ObjNam  string  `json:"hydro_objnam"`
	NObjNnm *string `json:"hydro_nobjnm"`
}

const (
	insertWaterwayAxisSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT
    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2)))
      AS new_ax
  FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
    LATERAL (SELECT
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
          OR ST_Covers((SELECT a FROM resp),
            ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp)))
        THEN new_line
        ELSE ST_Intersection((SELECT ST_Buffer(a, -0.0001) FROM resp),
          ST_Node(ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp))))
        END) AS new_ax (new_ax)
  -- Do nothing if intersection is empty:
  WHERE NOT ST_IsEmpty(new_ax)
),
t AS (
  UPDATE waterway.waterway_axis SET last_found = current_timestamp
  -- The first condition is just to help the PostgreSQL query planner
  -- to avoid evaluating more costly conditions including those
  -- introduced by row level security policies:
  WHERE (SELECT new_ax FROM g) IS NOT NULL
    AND validity @> current_timestamp
    AND (
        wtwaxs, objnam, nobjnam
      ) IS NOT DISTINCT FROM (
        (SELECT new_ax FROM g), $3, $4)
  RETURNING 1
)
INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
SELECT new_ax, $3, $4
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`
	invalidateAxisSQL = `
UPDATE waterway.waterway_axis
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
`
)

func axisInvalidation(spc *SQLGeometryConsumer) error {
	res, err := spc.tx.ExecContext(spc.ctx, invalidateAxisSQL)
	if err != nil {
		return err
	}
	old, err := res.RowsAffected()
	if err != nil {
		return err
	}
	if old == 0 {
		return ErrFeaturesUnmodified
	}
	spc.feedback.Info(
		"Number of features removed from data source: %d", old)
	return nil
}