diff pkg/imports/wp.go @ 2086:6096ec4951f8

Waterway profiles imports: Download the geometries from WPS and join them with the CSV data.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 31 Jan 2019 18:42:57 +0100
parents ddbac0f22ffb
children 5d3d2e823314
line wrap: on
line diff
--- a/pkg/imports/wp.go	Thu Jan 31 17:22:19 2019 +0100
+++ b/pkg/imports/wp.go	Thu Jan 31 18:42:57 2019 +0100
@@ -18,6 +18,8 @@
 	"context"
 	"database/sql"
 	"encoding/csv"
+	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"os"
@@ -29,11 +31,19 @@
 	"gemma.intevation.de/gemma/pkg/common"
 	"gemma.intevation.de/gemma/pkg/misc"
 	"gemma.intevation.de/gemma/pkg/models"
+	"gemma.intevation.de/gemma/pkg/wfs"
 	"github.com/jackc/pgx/pgtype"
 )
 
 type WaterwayProfiles struct {
 	Dir string `json:"dir"`
+	// URL the GetCapabilities URL of the WFS service.
+	URL string `json:"url"`
+	// FeatureType selects the feature type of the WFS service.
+	FeatureType string `json:"feature-type"`
+	// SortBy works around misconfigured services to
+	// establish a sort order to get the features.
+	SortBy string `json:"sort-by"`
 }
 
 const WPJobKind JobKind = "wp"
@@ -65,9 +75,31 @@
 }
 
 const (
+	createGeomTempTableSQL = `
+CREATE TEMP TABLE wp_geoms (
+  geom geometry(linestring, %d)
+) ON COMMIT DROP`
+
+	createTempIndexSQL = `
+CREATE INDEX ON wp_geoms USING GIST(geom)`
+
+	analyzeTempTableSQL = `ANALYZE wp_geoms`
+
+	insertGeomTmpTableSQL = `
+INSERT INTO wp_geoms (geom) VALUES (
+  ST_Transform(ST_GeomFromWKB($1, $2::int), %d)
+)`
+
 	insertWaterwayProfileSQL = `
+WITH point AS (
+  SELECT ST_Transform(geom::geometry, $14::int) geom
+  FROM waterway.distance_marks_virtual
+  WHERE location_code =
+	($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+)
 INSERT INTO waterway.waterway_profiles (
   location,
+  geom,
   validity,
   lnwl,
   mwl,
@@ -78,6 +110,11 @@
   source_organization
 ) VALUES (
   ($1, $2, $3, $4, $5),
+  ( SELECT ST_Transform(geom, 4326)::geography
+    FROM wp_geoms
+    WHERE geom && ( SELECT geom from point ) AND
+	  ST_Intersects(geom, ( SELECT geom FROM point ))
+  ),
   $6,
   $7,
   $8,
@@ -135,9 +172,14 @@
 	}
 	defer tx.Rollback()
 
-	//  TODO: Download profile geometries from WFS.
+	epsg, err := wp.downloadGeometries(
+		ctx, importID, tx, start, feedback)
+	if err != nil {
+		return nil, fmt.Errorf("error downloading geometries: %v", err)
+	}
 
-	summary, err := wp.processCSV(ctx, importID, tx, start, feedback)
+	summary, err := wp.processCSV(
+		ctx, importID, tx, start, epsg, feedback)
 	if err != nil {
 		return nil, fmt.Errorf("error processing CVS: %v", err)
 	}
@@ -153,13 +195,159 @@
 	return summary, nil
 }
 
+func (wp *WaterwayProfiles) downloadGeometries(
+	ctx context.Context,
+	importID int64,
+	tx *sql.Tx,
+	start time.Time,
+	feedback Feedback,
+) (int, error) {
+	feedback.Info("Start downloading geometries from WFS.")
+
+	feedback.Info("Loading capabilities from %s", wp.URL)
+	caps, err := wfs.GetCapabilities(wp.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return 0, err
+	}
+
+	ft := caps.FindFeatureType(wp.FeatureType)
+	if ft == nil {
+		return 0, fmt.Errorf("Unknown feature type '%s'", wp.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", wp.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
+		return 0, err
+	}
+
+	if wp.SortBy != "" {
+		feedback.Info("Features will be sorted by '%s'", wp.SortBy)
+	}
+
+	urls, err := wfs.GetFeaturesGET(
+		caps, wp.FeatureType, "application/json", wp.SortBy)
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return 0, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		features          int
+	)
+
+	var insertStmt *sql.Stmt
+	defer func() {
+		if insertStmt != nil {
+			insertStmt.Close()
+		}
+	}()
+
+	var usedEPGS int
+
+	setup := func(epsg int) error {
+		if insertStmt != nil {
+			return nil
+		}
+		feedback.Info("Using EPSG: %d", epsg)
+		usedEPGS = epsg
+
+		tblSQL := fmt.Sprintf(createGeomTempTableSQL, epsg)
+		if _, err := tx.ExecContext(ctx, tblSQL); err != nil {
+			return err
+		}
+		if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
+			return err
+		}
+		insertSQL := fmt.Sprintf(insertGeomTmpTableSQL, epsg)
+		var err error
+		insertStmt, err = tx.PrepareContext(ctx, insertSQL)
+		return err
+	}
+
+	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		if err := setup(epsg); err != nil {
+			return err
+		}
+
+		for _, feature := range rfc.Features {
+			if feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			switch feature.Geometry.Type {
+			case "LineString":
+				var l lineSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
+					return err
+				}
+				if _, err := insertStmt.ExecContext(
+					ctx,
+					l.asWKB(),
+					epsg,
+				); err != nil {
+					return err
+				}
+				features++
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		feedback.Error("Downloading features failed: %v", err)
+		return 0, err
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if features == 0 {
+		return 0, errors.New("No features found")
+	}
+	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
+		return 0, err
+	}
+	return usedEPGS, nil
+}
+
 func (wp *WaterwayProfiles) processCSV(
 	ctx context.Context,
 	importID int64,
 	tx *sql.Tx,
 	start time.Time,
+	epsg int,
 	feedback Feedback,
 ) (interface{}, error) {
+	feedback.Info("Start processing CSV file.")
 
 	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
 	if err != nil {
@@ -357,6 +545,7 @@
 			fe100,
 			dateInfo,
 			source,
+			epsg,
 		).Scan(&id); err != nil {
 			return nil, err
 		}