view pkg/imports/wx.go @ 5013:7dff1015283d

Add row level security policies for waterway axis Enforcing the area of responsibility this way instead of leaving it up to the importer implementation will also reduce complexity of statements needed to implement keeping of historic axis data.
author Tom Gottfried <tom@intevation.de>
date Thu, 12 Mar 2020 14:49:19 +0100
parents e8b2dc771f9e
children cf25b23e3eec
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayAxis is an import job to import
// the waterway axes in form of line string geometries
// and attribute data from a WFS service.
type WaterwayAxis struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (wx *WaterwayAxis) Description() (string, error) {
	return wx.URL + "|" + wx.FeatureType, nil
}

// WXJobKind is the import queue type identifier.
const WXJobKind JobKind = "wx"

type wxJobCreator struct{}

func init() {
	RegisterJobCreator(WXJobKind, wxJobCreator{})
}

func (wxJobCreator) Description() string { return "waterway axis" }

func (wxJobCreator) AutoAccept() bool { return true }

func (wxJobCreator) Create() Job { return new(WaterwayAxis) }

func (wxJobCreator) Depends() [2][]string {
	return [2][]string{
		{"waterway_axis"},
		{},
	}
}

// StageDone is a NOP for waterway axis imports.
func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for waterway axis imports is a NOP.
func (*WaterwayAxis) CleanUp() error { return nil }

type waterwayAxisProperties struct {
	ObjNam  string  `json:"hydro_objnam"`
	NObjNnm *string `json:"hydro_nobjnm"`
}

const (
	deleteWaterwayAxisSQL = `
DELETE FROM waterway.waterway_axis
`

	insertWaterwayAxisSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
SELECT
    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2))),
    $3, $4
  FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
    LATERAL (SELECT
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
        THEN new_line
        ELSE ST_Intersection((SELECT ST_Buffer(a, -0.0001) FROM resp),
          ST_Node(ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp))))
        END) AS new_ax (new_ax)
  -- Do nothing if intersection is empty:
  WHERE NOT ST_IsEmpty(new_ax)
RETURNING id
`
)

// Do executes the actual waterway axis import.
func (wx *WaterwayAxis) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway axis")

	feedback.Info("Loading capabilities from %s", wx.URL)
	caps, err := wfs.GetCapabilities(wx.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wx.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wx.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wx.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wx.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		outside           int
		features          int
	)

	if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		savepoint := Savepoint(ctx, tx, "feature")

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAxisProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var nobjnam sql.NullString
			if props.NObjNnm != nil {
				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
			}

			var ls multiLineSlice
			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				ls = append(ls, l)
			case "MultiLineString":
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
					return err
				}
			default:
				unsupported[feature.Geometry.Type]++
				continue
			}
			if err := storeLinestring(
				ctx,
				savepoint,
				feedback,
				ls,
				epsg,
				props,
				nobjnam,
				&outside,
				&features,
				insertStmt); err != nil {
				return err
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info("Features outside responsibility area: %d", outside)
	}

	if features == 0 {
		return nil, errors.New("no features found")
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}

func storeLinestring(
	ctx context.Context,
	savepoint func(func() error) error,
	feedback Feedback,
	l multiLineSlice,
	epsg int,
	props waterwayAxisProperties,
	nobjnam sql.NullString,
	outside, features *int,
	insertStmt *sql.Stmt,
) error {
	var id int
	err := savepoint(func() error {
		err := insertStmt.QueryRowContext(
			ctx,
			l.asWKB(),
			epsg,
			props.ObjNam,
			nobjnam,
		).Scan(&id)
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		*outside++
		// ignore -> filtered by responsibility_areas
		return nil
	case err != nil:
		feedback.Error(pgxutils.ReadableError{Err: err}.Error())
	default:
		*features++
	}
	return nil
}