Mercurial > gemma
changeset 2086:6096ec4951f8
Waterway profiles imports: Download the geometries from WPS and join them with the CSV data.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 31 Jan 2019 18:42:57 +0100 |
parents | bca8bda0b805 |
children | 5d3d2e823314 |
files | pkg/controllers/wpimports.go pkg/imports/wp.go |
diffstat | 2 files changed, 211 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/wpimports.go Thu Jan 31 17:22:19 2019 +0100 +++ b/pkg/controllers/wpimports.go Thu Jan 31 18:42:57 2019 +0100 @@ -72,6 +72,18 @@ } func importWaterwayProfiles(rw http.ResponseWriter, req *http.Request) { + url := req.FormValue("url") + if url == "" { + http.Error(rw, "missing 'url' parameter", http.StatusBadRequest) + return + } + + featureType := req.FormValue("feature-type") + if featureType == "" { + http.Error(rw, "missing 'feature-type' parameter", http.StatusBadRequest) + return + } + dir, err := storeWaterwayProfiles(req) if err != nil { log.Printf("error: %v\n", err) @@ -79,7 +91,14 @@ return } - wp := &imports.WaterwayProfiles{Dir: dir} + sortBy := req.FormValue("sort-by") + + wp := &imports.WaterwayProfiles{ + Dir: dir, + URL: url, + FeatureType: featureType, + SortBy: sortBy, + } serialized, err := common.ToJSONString(wp) if err != nil {
--- a/pkg/imports/wp.go Thu Jan 31 17:22:19 2019 +0100 +++ b/pkg/imports/wp.go Thu Jan 31 18:42:57 2019 +0100 @@ -18,6 +18,8 @@ "context" "database/sql" "encoding/csv" + "encoding/json" + "errors" "fmt" "io" "os" @@ -29,11 +31,19 @@ "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" + "gemma.intevation.de/gemma/pkg/wfs" "github.com/jackc/pgx/pgtype" ) type WaterwayProfiles struct { Dir string `json:"dir"` + // URL the GetCapabilities URL of the WFS service. + URL string `json:"url"` + // FeatureType selects the feature type of the WFS service. + FeatureType string `json:"feature-type"` + // SortBy works around misconfigured services to + // establish a sort order to get the features. + SortBy string `json:"sort-by"` } const WPJobKind JobKind = "wp" @@ -65,9 +75,31 @@ } const ( + createGeomTempTableSQL = ` +CREATE TEMP TABLE wp_geoms ( + geom geometry(linestring, %d) +) ON COMMIT DROP` + + createTempIndexSQL = ` +CREATE INDEX ON wp_geoms USING GIST(geom)` + + analyzeTempTableSQL = `ANALYZE wp_geoms` + + insertGeomTmpTableSQL = ` +INSERT INTO wp_geoms (geom) VALUES ( + ST_Transform(ST_GeomFromWKB($1, $2::int), %d) +)` + insertWaterwayProfileSQL = ` +WITH point AS ( + SELECT ST_Transform(geom::geometry, $14::int) geom + FROM waterway.distance_marks_virtual + WHERE location_code = + ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) +) INSERT INTO waterway.waterway_profiles ( location, + geom, validity, lnwl, mwl, @@ -78,6 +110,11 @@ source_organization ) VALUES ( ($1, $2, $3, $4, $5), + ( SELECT ST_Transform(geom, 4326)::geography + FROM wp_geoms + WHERE geom && ( SELECT geom from point ) AND + ST_Intersects(geom, ( SELECT geom FROM point )) + ), $6, $7, $8, @@ -135,9 +172,14 @@ } defer tx.Rollback() - // TODO: Download profile geometries from WFS. + epsg, err := wp.downloadGeometries( + ctx, importID, tx, start, feedback) + if err != nil { + return nil, fmt.Errorf("error downloading geometries: %v", err) + } - summary, err := wp.processCSV(ctx, importID, tx, start, feedback) + summary, err := wp.processCSV( + ctx, importID, tx, start, epsg, feedback) if err != nil { return nil, fmt.Errorf("error processing CVS: %v", err) } @@ -153,13 +195,159 @@ return summary, nil } +func (wp *WaterwayProfiles) downloadGeometries( + ctx context.Context, + importID int64, + tx *sql.Tx, + start time.Time, + feedback Feedback, +) (int, error) { + feedback.Info("Start downloading geometries from WFS.") + + feedback.Info("Loading capabilities from %s", wp.URL) + caps, err := wfs.GetCapabilities(wp.URL) + if err != nil { + feedback.Error("Loading capabilities failed: %v", err) + return 0, err + } + + ft := caps.FindFeatureType(wp.FeatureType) + if ft == nil { + return 0, fmt.Errorf("Unknown feature type '%s'", wp.FeatureType) + } + + feedback.Info("Found feature type '%s", wp.FeatureType) + + epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + if err != nil { + feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) + return 0, err + } + + if wp.SortBy != "" { + feedback.Info("Features will be sorted by '%s'", wp.SortBy) + } + + urls, err := wfs.GetFeaturesGET( + caps, wp.FeatureType, "application/json", wp.SortBy) + if err != nil { + feedback.Error("Cannot create GetFeature URLs. %v", err) + return 0, err + } + + var ( + unsupported = stringCounter{} + missingProperties int + features int + ) + + var insertStmt *sql.Stmt + defer func() { + if insertStmt != nil { + insertStmt.Close() + } + }() + + var usedEPGS int + + setup := func(epsg int) error { + if insertStmt != nil { + return nil + } + feedback.Info("Using EPSG: %d", epsg) + usedEPGS = epsg + + tblSQL := fmt.Sprintf(createGeomTempTableSQL, epsg) + if _, err := tx.ExecContext(ctx, tblSQL); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil { + return err + } + insertSQL := fmt.Sprintf(insertGeomTmpTableSQL, epsg) + var err error + insertStmt, err = tx.PrepareContext(ctx, insertSQL) + return err + } + + if err := wfs.DownloadURLs(urls, func(r io.Reader) error { + rfc, err := wfs.ParseRawFeatureCollection(r) + if err != nil { + return fmt.Errorf("parsing GetFeature document failed: %v", err) + } + if rfc.CRS != nil { + crsName := rfc.CRS.Properties.Name + if epsg, err = wfs.CRSToEPSG(crsName); err != nil { + feedback.Error("Unsupported CRS: %d", crsName) + return err + } + } + + // No features -> ignore. + if rfc.Features == nil { + return nil + } + + if err := setup(epsg); err != nil { + return err + } + + for _, feature := range rfc.Features { + if feature.Geometry.Coordinates == nil { + missingProperties++ + continue + } + + switch feature.Geometry.Type { + case "LineString": + var l lineSlice + if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { + return err + } + if _, err := insertStmt.ExecContext( + ctx, + l.asWKB(), + epsg, + ); err != nil { + return err + } + features++ + default: + unsupported[feature.Geometry.Type]++ + } + } + return nil + }); err != nil { + feedback.Error("Downloading features failed: %v", err) + return 0, err + } + + if missingProperties > 0 { + feedback.Warn("Missing properties: %d", missingProperties) + } + + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) + } + + if features == 0 { + return 0, errors.New("No features found") + } + if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil { + return 0, err + } + return usedEPGS, nil +} + func (wp *WaterwayProfiles) processCSV( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, + epsg int, feedback Feedback, ) (interface{}, error) { + feedback.Info("Start processing CSV file.") f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) if err != nil { @@ -357,6 +545,7 @@ fe100, dateInfo, source, + epsg, ).Scan(&id); err != nil { return nil, err }