Mercurial > gemma
view pkg/imports/wx.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | 86c88fc0ff5e |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/wfs" ) // WaterwayAxis is an import job to import // the waterway axes in form of line string geometries // and attribute data from a WFS service. type WaterwayAxis struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy works around misconfigured services to // establish a sort order to get the features. SortBy string `json:"sort-by"` } // WXJobKind is the import queue type identifier. const WXJobKind JobKind = "wx" type wxJobCreator struct{} func init() { RegisterJobCreator(WXJobKind, wxJobCreator{}) } func (wxJobCreator) Description() string { return "waterway axis" } func (wxJobCreator) AutoAccept() bool { return true } func (wxJobCreator) Create(_ JobKind, data string) (Job, error) { wx := new(WaterwayAxis) if err := common.FromJSONString(data, wx); err != nil { return nil, err } return wx, nil } func (wxJobCreator) Depends() []string { return []string{ "waterway_axis", } } // StageDone is a NOP for waterway axis imports. func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp for waterway axis imports is a NOP. func (*WaterwayAxis) CleanUp() error { return nil } type waterwayAxisProperties struct { ObjNam string `json:"hydro_objnam"` NObjNnm *string `json:"hydro_nobjnm"` } const ( deleteWaterwayAxisSQL = ` WITH resp AS ( SELECT best_utm(area::geometry) AS t, ST_Transform(area::geometry, best_utm(area::geometry)) AS a FROM users.responsibility_areas WHERE country = users.current_user_country() ) DELETE FROM waterway.waterway_axis WHERE ST_Covers( (SELECT a FROM resp), ST_Transform(wtwaxs::geometry, (SELECT t FROM resp))) ` insertWaterwayAxisSQL = ` WITH resp AS ( SELECT best_utm(area::geometry) AS t, ST_Transform(area::geometry, best_utm(area::geometry)) AS a FROM users.responsibility_areas WHERE country = users.current_user_country() ) INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam) SELECT ST_Transform(clipped.geom, 4326)::geography, $3, $4 FROM ( SELECT (ST_Dump( ST_Intersection( (SELECT a FROM resp), ST_Transform( ST_GeomFromWKB($1, $2::integer), (SELECT t FROM resp) ) ) )).geom AS geom ) AS clipped WHERE clipped.geom IS NOT NULL ` ) // Do executes the actual waterway axis import. func (wx *WaterwayAxis) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import waterway axis") feedback.Info("Loading capabilities from %s", wx.URL) caps, err := wfs.GetCapabilities(wx.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(wx.FeatureType) if ft == nil { return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) } feedback.Info("Found feature type '%s", wx.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) return nil, err } if wx.SortBy != "" { feedback.Info("Features will be sorted by '%s'", wx.SortBy) } urls, err := wfs.GetFeaturesGET( caps, wx.FeatureType, "application/json", wx.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL) if err != nil { return nil, err } defer insertStmt.Close() // Delete the old features. if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil { return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int features int ) if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } var props waterwayAxisProperties if err := json.Unmarshal(*feature.Properties, &props); err != nil { badProperties++ continue } var nobjnam sql.NullString if props.NObjNnm != nil { nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true} } switch feature.Geometry.Type { case "LineString": var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } if _, err := insertStmt.ExecContext( ctx, l.asWKB(), epsg, props.ObjNam, nobjnam, ); err != nil { return err } features++ case "MultiLineString": var ls []lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil { return err } for _, l := range ls { if _, err := insertStmt.ExecContext( ctx, l.asWKB(), epsg, props.ObjNam, nobjnam, ); err != nil { return err } features++ } default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return nil, err } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { err := errors.New("No features found") feedback.Error("%v", err) return nil, err } if err = tx.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } return nil, err }