Mercurial > gemma
diff pkg/imports/wa.go @ 1785:614c6c766691
Waterway area import: Implemented.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sat, 12 Jan 2019 19:53:31 +0100 |
parents | |
children | 09349ca27dd7 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/wa.go Sat Jan 12 19:53:31 2019 +0100 @@ -0,0 +1,257 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package imports + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "strconv" + "time" + + "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/wfs" +) + +// WaterwayArea is an import job to import +// the waterway area in form of polygon geometries +// and attribute data from a WFS service. +type WaterwayArea struct { + // URL the GetCapabilities URL of the WFS service. + URL string `json:"url"` + // FeatureType selects the feature type of the WFS service. + FeatureType string `json:"feature-type"` + // SortBy works around misconfigured services to + // establish a sort order to get the features. + SortBy string `json:"sort-by"` +} + +// WAJobKind is the import queue type identifier. +const WAJobKind JobKind = "wa" + +type waJobCreator struct{} + +func init() { + RegisterJobCreator(WAJobKind, waJobCreator{}) +} + +func (waJobCreator) Description() string { return "waterway area" } + +func (waJobCreator) AutoAccept() bool { return true } + +func (waJobCreator) Create(_ JobKind, data string) (Job, error) { + wa := new(WaterwayArea) + if err := common.FromJSONString(data, wa); err != nil { + return nil, err + } + return wa, nil +} + +func (waJobCreator) Depends() []string { + return []string{ + "waterway_area", + } +} + +// StageDone is a NOP for waterway area imports. +func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +// CleanUp for waterway area imports is a NOP. +func (*WaterwayArea) CleanUp() error { return nil } + +type waterwayAreaProperties struct { + Catccl *string `json:"ienc_catccl"` + Dirimp *string `json:"ienc_dirimp"` +} + +const ( + deleteWaterwayAreaSQL = `DELETE FROM waterway.waterway_area` + insertWaterwayAreaSQL = ` +INSERT INTO waterway.waterway_area (area, catccl, dirimp) +VALUES ( + ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography, + $3, + $4 +)` +) + +// Do executes the actual waterway axis import. +func (wx *WaterwayArea) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + + feedback.Info("Import waterway area") + + feedback.Info("Loading capabilities from %s", wx.URL) + caps, err := wfs.GetCapabilities(wx.URL) + if err != nil { + feedback.Error("Loading capabilities failed: %v", err) + return nil, err + } + + ft := caps.FindFeatureType(wx.FeatureType) + if ft == nil { + return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) + } + + feedback.Info("Found feature type '%s", wx.FeatureType) + + epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + if err != nil { + feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) + return nil, err + } + + if wx.SortBy != "" { + feedback.Info("Features will be sorted by '%s'", wx.SortBy) + } + + urls, err := wfs.GetFeaturesGET( + caps, wx.FeatureType, "application/json", wx.SortBy) + if err != nil { + feedback.Error("Cannot create GetFeature URLs. %v", err) + return nil, err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + // Delete the old features. + if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil { + return nil, err + } + + var ( + unsupported = stringCounter{} + missingProperties int + badProperties int + features int + ) + + if err := wfs.DownloadURLs(urls, func(r io.Reader) error { + rfc, err := wfs.ParseRawFeatureCollection(r) + if err != nil { + return fmt.Errorf("parsing GetFeature document failed: %v", err) + } + if rfc.CRS != nil { + crsName := rfc.CRS.Properties.Name + if epsg, err = wfs.CRSToEPSG(crsName); err != nil { + feedback.Error("Unsupported CRS: %d", crsName) + return err + } + } + + // No features -> ignore. + if rfc.Features == nil { + return nil + } + + feedback.Info("Using EPSG: %d", epsg) + + for _, feature := range rfc.Features { + if feature.Properties == nil || feature.Geometry.Coordinates == nil { + missingProperties++ + continue + } + + var props waterwayAreaProperties + + if err := json.Unmarshal(*feature.Properties, &props); err != nil { + badProperties++ + continue + } + + var catccl sql.NullInt64 + if props.Catccl != nil { + if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil { + catccl = sql.NullInt64{Int64: value, Valid: true} + } + } + var dirimp sql.NullInt64 + if props.Dirimp != nil { + if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil { + dirimp = sql.NullInt64{Int64: value, Valid: true} + } + } + + switch feature.Geometry.Type { + case "Polygon": + var p polygon + if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { + return err + } + if _, err := insertStmt.ExecContext( + ctx, + p.asWKB(), + epsg, + catccl, + dirimp, + ); err != nil { + return err + } + features++ + default: + unsupported[feature.Geometry.Type]++ + } + } + return nil + }); err != nil { + feedback.Error("Downloading features failed: %v", err) + return nil, err + } + + if features == 0 { + err := errors.New("No features found") + feedback.Error("%v", err) + return nil, err + } + + if badProperties > 0 { + feedback.Warn("Bad properties: %d", badProperties) + } + + if missingProperties > 0 { + feedback.Warn("Missing properties: %d", missingProperties) + } + + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) + } + + if err = tx.Commit(); err == nil { + feedback.Info("Storing %d features took %s", + features, time.Since(start)) + } + + return nil, err +}