diff pkg/imports/wa.go @ 1785:614c6c766691

Waterway area import: Implemented.
author Sascha L. Teichmann <teichmann@intevation.de>
date Sat, 12 Jan 2019 19:53:31 +0100
parents
children 09349ca27dd7
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/wa.go	Sat Jan 12 19:53:31 2019 +0100
@@ -0,0 +1,257 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"strconv"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+// WaterwayArea is an import job to import
+// the waterway area in form of polygon geometries
+// and attribute data from a WFS service.
+type WaterwayArea struct {
+	// URL the GetCapabilities URL of the WFS service.
+	URL string `json:"url"`
+	// FeatureType selects the feature type of the WFS service.
+	FeatureType string `json:"feature-type"`
+	// SortBy works around misconfigured services to
+	// establish a sort order to get the features.
+	SortBy string `json:"sort-by"`
+}
+
+// WAJobKind is the import queue type identifier.
+const WAJobKind JobKind = "wa"
+
+type waJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(WAJobKind, waJobCreator{})
+}
+
+func (waJobCreator) Description() string { return "waterway area" }
+
+func (waJobCreator) AutoAccept() bool { return true }
+
+func (waJobCreator) Create(_ JobKind, data string) (Job, error) {
+	wa := new(WaterwayArea)
+	if err := common.FromJSONString(data, wa); err != nil {
+		return nil, err
+	}
+	return wa, nil
+}
+
+func (waJobCreator) Depends() []string {
+	return []string{
+		"waterway_area",
+	}
+}
+
+// StageDone is a NOP for waterway area imports.
+func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+// CleanUp for waterway area imports is a NOP.
+func (*WaterwayArea) CleanUp() error { return nil }
+
+type waterwayAreaProperties struct {
+	Catccl *string `json:"ienc_catccl"`
+	Dirimp *string `json:"ienc_dirimp"`
+}
+
+const (
+	deleteWaterwayAreaSQL = `DELETE FROM waterway.waterway_area`
+	insertWaterwayAreaSQL = `
+INSERT INTO waterway.waterway_area (area, catccl, dirimp)
+VALUES (
+  ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography,
+  $3,
+  $4
+)`
+)
+
+// Do executes the actual waterway axis import.
+func (wx *WaterwayArea) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import waterway area")
+
+	feedback.Info("Loading capabilities from %s", wx.URL)
+	caps, err := wfs.GetCapabilities(wx.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(wx.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", wx.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if wx.SortBy != "" {
+		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
+	}
+
+	urls, err := wfs.GetFeaturesGET(
+		caps, wx.FeatureType, "application/json", wx.SortBy)
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	// Delete the old features.
+	if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil {
+		return nil, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		features          int
+	)
+
+	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			var props waterwayAreaProperties
+
+			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
+				badProperties++
+				continue
+			}
+
+			var catccl sql.NullInt64
+			if props.Catccl != nil {
+				if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil {
+					catccl = sql.NullInt64{Int64: value, Valid: true}
+				}
+			}
+			var dirimp sql.NullInt64
+			if props.Dirimp != nil {
+				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
+					dirimp = sql.NullInt64{Int64: value, Valid: true}
+				}
+			}
+
+			switch feature.Geometry.Type {
+			case "Polygon":
+				var p polygon
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+				if _, err := insertStmt.ExecContext(
+					ctx,
+					p.asWKB(),
+					epsg,
+					catccl,
+					dirimp,
+				); err != nil {
+					return err
+				}
+				features++
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		feedback.Error("Downloading features failed: %v", err)
+		return nil, err
+	}
+
+	if features == 0 {
+		err := errors.New("No features found")
+		feedback.Error("%v", err)
+		return nil, err
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, err
+}