view pkg/imports/wx.go @ 2855:8a8a929182f9

Enable import of axis and area independently from areas of responsibility In preparation of generating the areas of responsibility from data previously imported by a sys_admin! Having the CTE as an item in the FROM-list prevents the SELECT from returning any rows if the CTE returns no rows. Thus, use the CTE only in scalar subqueries that are relevant only if the statement is executed by somebody without sys_admin role. Also introduced an ST_CollectionExtract() in the axis INSERT because here too, the intersection is not guaranteed to have the right dimensionality.
author Tom Gottfried <tom@intevation.de>
date Thu, 28 Mar 2019 17:19:27 +0100
parents 1b6840093eac
children 813671873536
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayAxis is an import job to import
// the waterway axes in form of line string geometries
// and attribute data from a WFS service.
type WaterwayAxis struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// WXJobKind is the import queue type identifier.
const WXJobKind JobKind = "wx"

type wxJobCreator struct{}

func init() {
	RegisterJobCreator(WXJobKind, wxJobCreator{})
}

func (wxJobCreator) Description() string { return "waterway axis" }

func (wxJobCreator) AutoAccept() bool { return true }

func (wxJobCreator) Create() Job { return new(WaterwayAxis) }

func (wxJobCreator) Depends() []string {
	return []string{
		"waterway_axis",
	}
}

// StageDone is a NOP for waterway axis imports.
func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for waterway axis imports is a NOP.
func (*WaterwayAxis) CleanUp() error { return nil }

type waterwayAxisProperties struct {
	ObjNam  string  `json:"hydro_objnam"`
	NObjNnm *string `json:"hydro_nobjnm"`
}

const (
	deleteWaterwayAxisSQL = `
WITH resp AS (
  SELECT best_utm(area) AS t,
         ST_Transform(area::geometry, best_utm(area)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
DELETE FROM waterway.waterway_axis
WHERE pg_has_role('sys_admin', 'MEMBER')
  OR ST_Covers((SELECT a FROM resp),
    ST_Transform(wtwaxs::geometry, (SELECT t FROM resp)))
`

	checkCrossingAxisSQL = `
SELECT ST_AsText(ST_Intersection(new_line.wtwaxs, axis.wtwaxs))
  FROM waterway.waterway_axis AS axis, waterway.waterway_axis AS new_line
  WHERE new_line.id = $1 AND axis.id <> $1
    AND ST_Crosses(new_line.wtwaxs::geometry, axis.wtwaxs::geometry)
`

	insertWaterwayAxisSQL = `
WITH resp AS (
  SELECT best_utm(area) AS t,
         ST_Transform(area::geometry, best_utm(area)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
SELECT dmp.geom, $3, $4
  FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
    ST_Dump(ST_Transform(ST_CollectionExtract(
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
        THEN ST_Node(ST_Transform(new_line,
          best_utm(ST_Transform(new_line, 4326))))
        ELSE ST_Intersection((SELECT a FROM resp),
          ST_Node(ST_Transform(new_line, (SELECT t FROM resp))))
        END,
      2), 4326)) AS dmp
RETURNING id
`
)

// Do executes the actual waterway axis import.
func (wx *WaterwayAxis) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway axis")

	feedback.Info("Loading capabilities from %s", wx.URL)
	caps, err := wfs.GetCapabilities(wx.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wx.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
	}

	feedback.Info("Found feature type '%s", wx.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wx.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	checkCrossingStmt, err := tx.PrepareContext(ctx, checkCrossingAxisSQL)
	if err != nil {
		return nil, err
	}
	defer checkCrossingStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		features          int
	)

	if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		savepoint := Savepoint(ctx, tx, "feature")

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAxisProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var nobjnam sql.NullString
			if props.NObjNnm != nil {
				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if err := storeLinestring(
					ctx,
					savepoint,
					feedback,
					l,
					epsg,
					props,
					nobjnam,
					checkCrossingStmt,
					insertStmt); err != nil {
					return err
				}
				features++
			case "MultiLineString":
				var ls []lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
					return err
				}
				for _, l := range ls {
					if err := storeLinestring(
						ctx,
						savepoint,
						feedback,
						l,
						epsg,
						props,
						nobjnam,
						checkCrossingStmt,
						insertStmt); err != nil {
						return err
					}
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		err := errors.New("No features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}

func storeLinestring(
	ctx context.Context,
	savepoint func(func() error) error,
	feedback Feedback,
	l lineSlice,
	epsg int,
	props waterwayAxisProperties,
	nobjnam sql.NullString,
	checkCrossingStmt, insertStmt *sql.Stmt,
) error {
	var id int
	if err := savepoint(func() error {
		err := insertStmt.QueryRowContext(
			ctx,
			l.asWKB(),
			epsg,
			props.ObjNam,
			nobjnam,
		).Scan(&id)
		return err
	}); err != nil {
		feedback.Warn(handleError(err).Error())
	}

	var crossing string
	switch err := checkCrossingStmt.QueryRowContext(
		ctx,
		id,
	).Scan(&crossing); {
	case err != nil && err != sql.ErrNoRows:
		return err
	case err == nil:
		feedback.Warn(
			"Linestring %d crosses previously imported linestring near %s. "+
				"Finding a contiguous axis may not work here",
			id, crossing)
	}
	return nil
}