view pkg/imports/dma.go @ 2928:074f2bb85584

Do not count skipped features as stored
author Tom Gottfried <tom@intevation.de>
date Wed, 03 Apr 2019 18:47:37 +0200
parents 899b591493b4
children a5642ee4c6d0
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/wfs"
)

// FairwayDimension is an import job to import
// the fairway dimensions in form of polygon geometries
// and attribute data from a WFS service.
type DistanceMarksAshore struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy sorts the feature by this key.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// DMAJobKind is the import queue type identifier.
const DMAJobKind JobKind = "dma"

type dmaJobCreator struct{}

func init() {
	RegisterJobCreator(DMAJobKind, dmaJobCreator{})
}

func (dmaJobCreator) Description() string { return "distance marks" }

func (dmaJobCreator) AutoAccept() bool { return true }

func (dmaJobCreator) Create() Job { return new(DistanceMarksAshore) }

func (dmaJobCreator) Depends() []string {
	return []string{
		"distance_marks",
	}
}

// StageDone is a NOP for distance marks imports.
func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for distance marks imports is a NOP.
func (*DistanceMarksAshore) CleanUp() error { return nil }

type distanceMarksAshoreProperties struct {
	HydroCatdis int `json:"hydro_catdis"`
}

const (
	deleteDistanceMarksSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
DELETE FROM waterway.distance_marks
WHERE ST_Covers(
  (SELECT a FROM resp),
  ST_Transform(geom::geometry, (SELECT ST_SRID(a) FROM resp)))
`
	insertDistanceMarksSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
INSERT INTO waterway.distance_marks (geom, catdis)
SELECT ST_Transform(clipped.geom, 4326)::geography, $3 FROM (
    SELECT (ST_Dump(
       ST_Intersection(
         (SELECT a FROM resp),
         ST_Transform(
           ST_GeomFromWKB($1, $2::integer),
           (SELECT ST_SRID(a) FROM resp)
         )
       )
     )).geom AS geom
  ) AS clipped
  WHERE clipped.geom IS NOT NULL
RETURNING id
`
)

// Do executes the actual fairway dimension import.
func (dma *DistanceMarksAshore) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import distance marks")

	feedback.Info("Loading capabilities from %s", dma.URL)
	caps, err := wfs.GetCapabilities(dma.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(dma.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", dma.FeatureType)
	}

	feedback.Info("Found feature type '%s'", dma.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	dl, err := wfs.GetFeatures(caps, dma.FeatureType, dma.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		outside           int
		features          int
	)

	if err := dl.Download(dma.User, dma.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props distanceMarksAshoreProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}
			switch feature.Geometry.Type {
			case "Point":
				var p pointSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				var dmaid int64
				err := insertStmt.QueryRowContext(
					ctx,
					p.asWKB(),
					epsg,
					props.HydroCatdis,
				).Scan(&dmaid)
				switch {
				case err == sql.ErrNoRows:
					outside++
					// ignore -> filtered by responsibility area
					continue
				case err != nil:
					return err
				}
				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info("Features outside responsibility area: %d", outside)
	}

	if features == 0 {
		err := errors.New("No features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}