view pkg/imports/wa.go @ 2248:cce158db02b0

Input area as multipolygons to generate area from stretch Doing so is more resilient against invalid geometries and gives more plausible results if tributaries are involved (i.e. does not include the adjacent area of the tributary in the result).
author Tom Gottfried <tom@intevation.de>
date Wed, 13 Feb 2019 16:48:52 +0100
parents 7c83b5277c1c
children 02505fcff63c
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayArea is an import job to import
// the waterway area in form of polygon geometries
// and attribute data from a WFS service.
type WaterwayArea struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
}

// WAJobKind is the import queue type identifier.
const WAJobKind JobKind = "wa"

type waJobCreator struct{}

func init() {
	RegisterJobCreator(WAJobKind, waJobCreator{})
}

func (waJobCreator) Description() string { return "waterway area" }

func (waJobCreator) AutoAccept() bool { return true }

func (waJobCreator) Create() Job { return new(WaterwayArea) }

func (waJobCreator) Depends() []string {
	return []string{
		"waterway_area",
	}
}

// StageDone is a NOP for waterway area imports.
func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for waterway area imports is a NOP.
func (*WaterwayArea) CleanUp() error { return nil }

type waterwayAreaProperties struct {
	Catccl *string `json:"ienc_catccl"`
	Dirimp *string `json:"ienc_dirimp"`
}

const (
	deleteWaterwayAreaSQL = `
WITH resp AS (
  SELECT best_utm(area::geometry) AS t,
         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
DELETE FROM waterway.waterway_area
WHERE ST_Covers(
  (SELECT a FROM resp),
  ST_Transform(area::geometry, (SELECT t FROM resp)))
`
	insertWaterwayAreaSQL = `
WITH resp AS (
  SELECT best_utm(area::geometry) AS t,
         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
INSERT INTO waterway.waterway_area (area, catccl, dirimp)
SELECT ST_Transform(clipped.geom, 4326)::geography, $3, $4 FROM (
    SELECT (ST_Dump(
       ST_Intersection(
         (SELECT ST_Buffer(a, 0.0001) FROM resp),
         ST_CollectionExtract(ST_MakeValid(ST_Transform(
           ST_GeomFromWKB($1, $2::integer),
           (SELECT t FROM resp)
         )),3)
       )
     )).geom AS geom
  ) AS clipped
  WHERE clipped.geom IS NOT NULL
`
)

// Do executes the actual waterway axis import.
func (wx *WaterwayArea) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway area")

	feedback.Info("Loading capabilities from %s", wx.URL)
	caps, err := wfs.GetCapabilities(wx.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wx.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
	}

	feedback.Info("Found feature type '%s", wx.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wx.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
	}

	urls, err := wfs.GetFeaturesGET(
		caps, wx.FeatureType, "application/json", wx.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		features          int
	)

	if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAreaProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var catccl sql.NullInt64
			if props.Catccl != nil {
				if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil {
					catccl = sql.NullInt64{Int64: value, Valid: true}
				}
			}
			var dirimp sql.NullInt64
			if props.Dirimp != nil {
				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
					dirimp = sql.NullInt64{Int64: value, Valid: true}
				}
			}

			switch feature.Geometry.Type {
			case "Polygon":
				var p polygonSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					p.asWKB(),
					epsg,
					catccl,
					dirimp,
				); err != nil {
					return err
				}
				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		err := errors.New("No features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}