changeset 1862:427f86518097

Added distance marks importer in backend.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 17 Jan 2019 13:16:23 +0100
parents 5083a1d19a4b
children 3bf2e5a91e50
files pkg/controllers/manualimports.go pkg/controllers/routes.go pkg/imports/dma.go pkg/imports/scheduled.go pkg/models/dma.go
diffstat 5 files changed, 327 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/manualimports.go	Thu Jan 17 13:15:22 2019 +0100
+++ b/pkg/controllers/manualimports.go	Thu Jan 17 13:16:23 2019 +0100
@@ -133,6 +133,17 @@
 	return fd, due, retries, fdi.SendEmail
 }
 
+func importDistanceMarksAshore(input interface{}) (interface{}, time.Time, int, bool) {
+	dmai := input.(*models.DistanceMarksAshoreImport)
+	dma := &imports.DistanceMarksAshore{
+		URL:         dmai.URL,
+		FeatureType: dmai.FeatureType,
+		SortBy:      dmai.SortBy,
+	}
+	due, retries := retry(dmai.Attributes)
+	return dma, due, retries, dmai.SendEmail
+}
+
 func manualImport(
 	kind imports.JobKind,
 	setup func(interface{}) (interface{}, time.Time, int, bool),
--- a/pkg/controllers/routes.go	Thu Jan 17 13:15:22 2019 +0100
+++ b/pkg/controllers/routes.go	Thu Jan 17 13:16:23 2019 +0100
@@ -223,6 +223,12 @@
 		NoConn: true,
 	})).Methods(http.MethodPost)
 
+	api.Handle("/imports/distancemarks", waterwayAdmin(&JSONHandler{
+		Input:  func() interface{} { return new(models.DistanceMarksAshoreImport) },
+		Handle: manualImport(imports.DMAJobKind, importDistanceMarksAshore),
+		NoConn: true,
+	})).Methods(http.MethodPost)
+
 	// Import scheduler configuration
 	api.Handle("/imports/config/{id:[0-9]+}/run",
 		waterwayAdmin(&JSONHandler{
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/dma.go	Thu Jan 17 13:16:23 2019 +0100
@@ -0,0 +1,261 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Raimund Renkert <raimund.renkert@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+// FairwayDimension is an import job to import
+// the fairway dimensions in form of polygon geometries
+// and attribute data from a WFS service.
+type DistanceMarksAshore struct {
+	// URL the GetCapabilities URL of the WFS service.
+	URL string `json:"url"`
+	// FeatureType selects the feature type of the WFS service.
+	FeatureType string `json:"feature-type"`
+	// SortBy sorts the feature by this key.
+	SortBy string `json:"sort-by"`
+}
+
+// DMAJobKind is the import queue type identifier.
+const DMAJobKind JobKind = "dma"
+
+type dmaJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(DMAJobKind, dmaJobCreator{})
+}
+
+func (dmaJobCreator) Description() string { return "distance marks" }
+
+func (dmaJobCreator) AutoAccept() bool { return true }
+
+func (dmaJobCreator) Create(_ JobKind, data string) (Job, error) {
+	dma := new(DistanceMarksAshore)
+	if err := common.FromJSONString(data, dma); err != nil {
+		return nil, err
+	}
+	return dma, nil
+}
+
+func (dmaJobCreator) Depends() []string {
+	return []string{
+		"distance_marks",
+	}
+}
+
+// StageDone is a NOP for distance marks imports.
+func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+// CleanUp for distance marks imports is a NOP.
+func (*DistanceMarksAshore) CleanUp() error { return nil }
+
+type distanceMarksAshoreProperties struct {
+	HydroCatdis int `json:"hydro_catdis"`
+}
+
+const (
+	deleteDistanceMarksSQL = `
+WITH resp AS (
+  SELECT best_utm(area::geometry) AS t,
+         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
+  FROM users.responsibility_areas
+  WHERE country = users.current_user_country()
+)
+DELETE FROM waterway.distance_marks
+WHERE ST_Covers(
+  (SELECT a FROM resp),
+  ST_Transform(geom::geometry, (SELECT t FROM resp)))
+`
+	insertDistanceMarksSQL = `
+WITH resp AS (
+  SELECT best_utm(area::geometry) AS t,
+         ST_Transform(area::geometry, best_utm(area::geometry)) AS a
+  FROM users.responsibility_areas
+  WHERE country = users.current_user_country()
+)
+INSERT INTO waterway.distance_marks (geom, catdis)
+SELECT ST_Transform(clipped.geom, 4326)::geography, $3 FROM (
+    SELECT (ST_Dump(
+       ST_Intersection(
+         (SELECT a FROM resp),
+         ST_Transform(
+           ST_GeomFromWKB($1, $2::integer),
+           (SELECT t FROM resp)
+         )
+       )
+     )).geom AS geom
+  ) AS clipped
+  WHERE clipped.geom IS NOT NULL
+`
+)
+
+// Do executes the actual fairway dimension import.
+func (dma *DistanceMarksAshore) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import distance marks")
+
+	feedback.Info("Loading capabilities from %s", dma.URL)
+	caps, err := wfs.GetCapabilities(dma.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(dma.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("Unknown feature type '%s'", dma.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s'", dma.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	urls, err := wfs.GetFeaturesGET(
+		caps, dma.FeatureType, "application/json", dma.SortBy)
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	// Delete the old features.
+	if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil {
+		return nil, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		features          int
+	)
+
+	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		for _, feature := range rfc.Features {
+			if feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			var props distanceMarksAshoreProperties
+
+			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
+				badProperties++
+				continue
+			}
+			switch feature.Geometry.Type {
+			case "Point":
+				var p pointSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+				if _, err := insertStmt.ExecContext(
+					ctx,
+					p.asWKB(),
+					epsg,
+					props.HydroCatdis,
+				); err != nil {
+					feedback.Error("error: %s", err)
+					return err
+				}
+				features++
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		feedback.Error("Downloading features failed: %v", err)
+		return nil, err
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if features == 0 {
+		err := errors.New("No features found")
+		feedback.Error("%v", err)
+		return nil, err
+	}
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, err
+}
--- a/pkg/imports/scheduled.go	Thu Jan 17 13:15:22 2019 +0100
+++ b/pkg/imports/scheduled.go	Thu Jan 17 13:16:23 2019 +0100
@@ -156,6 +156,22 @@
 			Insecure: insecure,
 		}, nil
 	},
+	DMAJobKind: func(cfg *IDConfig) (interface{}, error) {
+		log.Println("info: schedule 'dma' import")
+		ft, found := cfg.Attributes.Get("feature-type")
+		if !found {
+			return nil, errors.New("cannot find 'feature-type' attribute")
+		}
+		sb, found := cfg.Attributes.Get("sort-by")
+		if !found {
+			return nil, errors.New("cannot find 'sort-by' attribute")
+		}
+		return &DistanceMarksAshore{
+			URL:         *cfg.URL,
+			FeatureType: ft,
+			SortBy:      sb,
+		}, nil
+	},
 }
 
 func init() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/models/dma.go	Thu Jan 17 13:16:23 2019 +0100
@@ -0,0 +1,33 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Raimund Renkert <raimund.renkert@intevation.de>
+
+package models
+
+import "gemma.intevation.de/gemma/pkg/common"
+
+type (
+	// DistanceMarksAshoreImport specifies an import of the distance marks.
+	DistanceMarksAshoreImport struct {
+		// URL is the capabilities URL of the WFS.
+		URL string `json:"url"`
+		// FeatureType is the layer to use.
+		FeatureType string `json:"feature-type"`
+		// SortBy sorts the feature by this key.
+		SortBy string `json:"sort-by"`
+		// SendEmail is set to true if an email should be send after
+		// importing the axis.
+		SendEmail bool `json:"send-email"`
+		// Attributes are optional attributes.
+		Attributes common.Attributes `json:"attributes,omitempty"`
+	}
+)