view pkg/imports/dma.go @ 2006:35acb7f9ae0c

Do anything else before expectedly failing role creation Creating roles during database setup expectedly fails in case there already is another gemma database in the cluster. Doing it at the end of the transaction ensures it does not hide errors in other commands in the script. In passing, add the default admin via the designated view to ensure it will become a correctly set up application user.
author Tom Gottfried <tom@intevation.de>
date Thu, 24 Jan 2019 17:23:43 +0100
parents 427f86518097
children 2b72f5e005aa
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// FairwayDimension is an import job to import
// the fairway dimensions in form of polygon geometries
// and attribute data from a WFS service.
type DistanceMarksAshore struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy sorts the feature by this key.
	SortBy string `json:"sort-by"`
}

// DMAJobKind is the import queue type identifier.
const DMAJobKind JobKind = "dma"

type dmaJobCreator struct{}

func init() {
	RegisterJobCreator(DMAJobKind, dmaJobCreator{})
}

func (dmaJobCreator) Description() string { return "distance marks" }

func (dmaJobCreator) AutoAccept() bool { return true }

func (dmaJobCreator) Create(_ JobKind, data string) (Job, error) {
	dma := new(DistanceMarksAshore)
	if err := common.FromJSONString(data, dma); err != nil {
		return nil, err
	}
	return dma, nil
}

func (dmaJobCreator) Depends() []string {
	return []string{
		"distance_marks",
	}
}

// StageDone is a NOP for distance marks imports.
func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for distance marks imports is a NOP.
func (*DistanceMarksAshore) CleanUp() error { return nil }

type distanceMarksAshoreProperties struct {
	HydroCatdis int `json:"hydro_catdis"`
}

const (
	deleteDistanceMarksSQL = `
WITH resp AS (
  SELECT best_utm(area::geometry) AS t,
         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
DELETE FROM waterway.distance_marks
WHERE ST_Covers(
  (SELECT a FROM resp),
  ST_Transform(geom::geometry, (SELECT t FROM resp)))
`
	insertDistanceMarksSQL = `
WITH resp AS (
  SELECT best_utm(area::geometry) AS t,
         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
  FROM users.responsibility_areas
  WHERE country = users.current_user_country()
)
INSERT INTO waterway.distance_marks (geom, catdis)
SELECT ST_Transform(clipped.geom, 4326)::geography, $3 FROM (
    SELECT (ST_Dump(
       ST_Intersection(
         (SELECT a FROM resp),
         ST_Transform(
           ST_GeomFromWKB($1, $2::integer),
           (SELECT t FROM resp)
         )
       )
     )).geom AS geom
  ) AS clipped
  WHERE clipped.geom IS NOT NULL
`
)

// Do executes the actual fairway dimension import.
func (dma *DistanceMarksAshore) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import distance marks")

	feedback.Info("Loading capabilities from %s", dma.URL)
	caps, err := wfs.GetCapabilities(dma.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(dma.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", dma.FeatureType)
	}

	feedback.Info("Found feature type '%s'", dma.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	urls, err := wfs.GetFeaturesGET(
		caps, dma.FeatureType, "application/json", dma.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		features          int
	)

	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props distanceMarksAshoreProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}
			switch feature.Geometry.Type {
			case "Point":
				var p pointSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					p.asWKB(),
					epsg,
					props.HydroCatdis,
				); err != nil {
					feedback.Error("error: %s", err)
					return err
				}
				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		feedback.Error("Downloading features failed: %v", err)
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		err := errors.New("No features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}