Mercurial > gemma
view pkg/imports/wx.go @ 3326:98ce6d101e01
available_fairway_depth: omit unit
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 20 May 2019 12:32:10 +0200 |
parents | 4acbee65275d |
children | 6c5c15b2fb64 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/wfs" ) // WaterwayAxis is an import job to import // the waterway axes in form of line string geometries // and attribute data from a WFS service. type WaterwayAxis struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy works around misconfigured services to // establish a sort order to get the features. SortBy string `json:"sort-by"` // User is an optional username for Basic Auth. User string `json:"user,omitempty"` // Password is an optional password for Basic Auth. Password string `json:"password,omitempty"` } // WXJobKind is the import queue type identifier. const WXJobKind JobKind = "wx" type wxJobCreator struct{} func init() { RegisterJobCreator(WXJobKind, wxJobCreator{}) } func (wxJobCreator) Description() string { return "waterway axis" } func (wxJobCreator) AutoAccept() bool { return true } func (wxJobCreator) Create() Job { return new(WaterwayAxis) } func (wxJobCreator) Depends() [2][]string { return [2][]string{ {"waterway_axis"}, {}, } } // StageDone is a NOP for waterway axis imports. func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp for waterway axis imports is a NOP. func (*WaterwayAxis) CleanUp() error { return nil } type waterwayAxisProperties struct { ObjNam string `json:"hydro_objnam"` NObjNnm *string `json:"hydro_nobjnm"` } const ( deleteWaterwayAxisSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ) DELETE FROM waterway.waterway_axis WHERE pg_has_role('sys_admin', 'MEMBER') OR ST_Covers((SELECT a FROM resp), ST_Transform(wtwaxs::geometry, (SELECT ST_SRID(a) FROM resp))) ` insertWaterwayAxisSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ) INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam) SELECT dmp.geom, $3, $4 FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line), ST_Dump(ST_Transform(ST_CollectionExtract( CASE WHEN pg_has_role('sys_admin', 'MEMBER') THEN ST_Node(ST_Transform(new_line, best_utm(ST_Transform(new_line, 4326)))) ELSE ST_Intersection((SELECT a FROM resp), ST_Node(ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp)))) END, 2), 4326)) AS dmp RETURNING id ` ) // Do executes the actual waterway axis import. func (wx *WaterwayAxis) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import waterway axis") feedback.Info("Loading capabilities from %s", wx.URL) caps, err := wfs.GetCapabilities(wx.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(wx.FeatureType) if ft == nil { return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) } feedback.Info("Found feature type '%s", wx.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) return nil, err } if wx.SortBy != "" { feedback.Info("Features will be sorted by '%s'", wx.SortBy) } dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL) if err != nil { return nil, err } defer insertStmt.Close() // Delete the old features. if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil { return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int outside int features int ) if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) savepoint := Savepoint(ctx, tx, "feature") for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } var props waterwayAxisProperties if err := json.Unmarshal(*feature.Properties, &props); err != nil { badProperties++ continue } var nobjnam sql.NullString if props.NObjNnm != nil { nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true} } switch feature.Geometry.Type { case "LineString": var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } if err := storeLinestring( ctx, savepoint, feedback, l, epsg, props, nobjnam, &outside, &features, insertStmt); err != nil { return err } case "MultiLineString": var ls []lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil { return err } for _, l := range ls { if err := storeLinestring( ctx, savepoint, feedback, l, epsg, props, nobjnam, &outside, &features, insertStmt); err != nil { return err } } default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return nil, err } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if outside > 0 { feedback.Info("Features outside responsibility area: %d", outside) } if features == 0 { return nil, errors.New("No features found") } if err = tx.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } return nil, err } func storeLinestring( ctx context.Context, savepoint func(func() error) error, feedback Feedback, l lineSlice, epsg int, props waterwayAxisProperties, nobjnam sql.NullString, outside, features *int, insertStmt *sql.Stmt, ) error { var id int err := savepoint(func() error { err := insertStmt.QueryRowContext( ctx, l.asWKB(), epsg, props.ObjNam, nobjnam, ).Scan(&id) return err }) switch { case err == sql.ErrNoRows: *outside++ // ignore -> filtered by responsibility_areas return nil case err != nil: feedback.Warn(handleError(err).Error()) default: *features++ } return nil }