view pkg/imports/fd.go @ 2833:1b6840093eac

Prevent calculation of wrong UTM zones Using geography as input data type will ensure only lon/lat coordinates are fed into the calculation.
author Tom Gottfried <tom@intevation.de>
date Wed, 27 Mar 2019 15:39:52 +0100
parents 7cb027be277d
children 93fa55bce126
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// FairwayDimension is an import job to import
// the fairway dimensions in form of polygon geometries
// and attribute data from a WFS service.
type FairwayDimension struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType        string `json:"feature-type"`
	SortBy             string `json:"sort-by"`
	LOS                int    `json:"los"`
	MinWidth           int    `json:"min-width"`
	MaxWidth           int    `json:"max-width"`
	Depth              int    `json:"depth"`
	SourceOrganization string `json:"source-organization"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

type fdTime struct{ time.Time }

var guessFDTime = misc.TimeGuesser([]string{
	"20060102",
	"2006",
	"",
}).Guess

func (fdt *fdTime) UnmarshalJSON(data []byte) error {
	var s string
	if err := json.Unmarshal(data, &s); err != nil {
		return err
	}
	t, err := guessFDTime(s)
	if err == nil {
		*fdt = fdTime{t}
	}
	return err
}

// FDJobKind is the import queue type identifier.
const FDJobKind JobKind = "fd"

type fdJobCreator struct{}

func init() {
	RegisterJobCreator(FDJobKind, fdJobCreator{})
}

func (fdJobCreator) Description() string { return "fairway dimension" }

func (fdJobCreator) AutoAccept() bool { return false }

func (fdJobCreator) Create() Job { return new(FairwayDimension) }

func (fdJobCreator) Depends() []string {
	return []string{
		"fairway_dimensions",
	}
}

func (fdJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	// We only want to delete the features of the same LOS
	// as the imported.
	var los int64
	if err := tx.QueryRowContext(ctx, fdFindLOSSQL, id).Scan(&los); err != nil {
		// Should always return a row because import will exit as unchanged
		// if no new features were found while inserting.
		return err
	}
	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL, los); err != nil {
		return err
	}

	_, err := tx.ExecContext(ctx, fdStageDoneSQL, id)
	return err
}

// CleanUp for fairway dimension imports is a NOP.
func (*FairwayDimension) CleanUp() error { return nil }

type fairwayDimensionProperties struct {
	HydroSorDat *fdTime `json:"hydro_sordat"`
}

type fdSummary struct {
	Lat float64 `json:"lat"`
	Lon float64 `json:"lon"`
	ID  int64   `json:"id"`
}

const (
	fdFindLOSSQL = `
SELECT level_of_service FROM waterway.fairway_dimensions
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.fairway_dimensions'::regclass)
LIMIT 1`

	fdStageDoneSQL = `
UPDATE waterway.fairway_dimensions SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.fairway_dimensions'::regclass)`

	deleteFairwayDimensionSQL = `
WITH resp AS (
  SELECT best_utm(area) AS t,
         ST_Transform(area::geometry, best_utm(area)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
DELETE FROM waterway.fairway_dimensions
WHERE ST_Covers(
  (SELECT a FROM resp),
  ST_Transform(area::geometry, (SELECT t FROM resp)))
  AND staging_done
  AND level_of_service = $1::smallint`

	// The ST_MakeValid and ST_Buffer below are a workarround to
	// avoid errors due to reprojection.
	insertFairwayDimensionSQL = `
WITH resp AS (
  SELECT best_utm(area) AS t,
         ST_Transform(area::geometry, best_utm(area)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
INSERT INTO waterway.fairway_dimensions (area, level_of_service, min_width, max_width, min_depth, date_info, source_organization)
SELECT ST_Transform(clipped.geom, 4326)::geography, $3, $4, $5, $6, $7, $8 FROM (
    SELECT (ST_Dump(
       ST_Intersection(
         (SELECT ST_Buffer(a, -0.0001) FROM resp),
         ST_MakeValid(ST_Transform(
           ST_GeomFromWKB($1, $2::integer),
           (SELECT t FROM resp)
         ))
       )
     )).geom AS geom
  ) AS clipped
  WHERE clipped.geom IS NOT NULL
RETURNING id,
  ST_X(ST_Centroid(area::geometry)),
  ST_Y(ST_Centroid(area::geometry))
 `
)

// Do executes the actual fairway dimension import.
func (fd *FairwayDimension) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import fairway dimensions")

	feedback.Info("Loading capabilities from %s", fd.URL)
	caps, err := wfs.GetCapabilities(fd.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(fd.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", fd.FeatureType)
	}

	feedback.Info("Found feature type '%s'", fd.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	dl, err := wfs.GetFeatures(caps, fd.FeatureType, fd.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertFairwayDimensionSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		features          int
		outside           int
		fds               []fdSummary
	)

	if err := dl.Download(fd.User, fd.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

	features:
		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props fairwayDimensionProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				feedback.Warn("bad property: %v", err)
				badProperties++
				continue
			}
			var dateInfo time.Time
			if props.HydroSorDat == nil || props.HydroSorDat.IsZero() {
				dateInfo = start
			} else {
				dateInfo = (*props.HydroSorDat).Time
			}
			switch feature.Geometry.Type {
			case "Polygon":
				var p polygonSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				var fdid int64
				var lat, lon float64
				err = insertStmt.QueryRowContext(
					ctx,
					p.asWKB(),
					epsg,
					fd.LOS,
					fd.MinWidth,
					fd.MaxWidth,
					fd.Depth,
					dateInfo,
					fd.SourceOrganization,
				).Scan(&fdid, &lat, &lon)
				switch {
				case err == sql.ErrNoRows:
					outside++
					// ignore -> filtered by responsibility_areas
					continue features
				case err != nil:
					return err
				}
				// Store for potential later removal.
				if err = track(ctx, tx, importID, "waterway.fairway_dimensions", fdid); err != nil {
					return err
				}
				fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon})

				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info("Features outside responsibility area: %d", outside)
	}

	if features == 0 {
		return nil, UnchangedError("No features found")
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	summary := struct {
		Date               time.Time   `json:"date"`
		LOS                int         `json:"los"`
		SourceOrganization string      `json:"source-organization"`
		FdArea             []fdSummary `json:"fd-area"`
	}{
		Date:               time.Now(),
		LOS:                fd.LOS,
		SourceOrganization: fd.SourceOrganization,
		FdArea:             fds,
	}
	return &summary, err
}