Mercurial > gemma
view pkg/imports/wfsjob.go @ 5600:9967a78e43f4
Fix format issue.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sat, 06 Aug 2022 00:46:21 +0200 |
parents | ade07a3f2cfd |
children | 1222b777f51f |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) var ( ErrFeatureIgnored = errors.New("feature ignored") ErrFeatureDuplicated = errors.New("feature duplicated") ErrFeaturesUnmodified = errors.New("features unmodified") ) type ( WFSFeatureConsumer interface { Commit() error Rollback() error NewFeature() (kind string, properties interface{}) Consume(geom, properties interface{}, epsg int) error } WFSFeatureJobCreator struct { description string depends [2][]string newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) stageDone func(context.Context, *sql.Tx, int64, Feedback) error } WFSFeatureJob struct { models.WFSImport creator *WFSFeatureJobCreator } ) var ( kindToGeometry = map[string]func() interface{}{ // TODO: extend me! "Point": func() interface{} { return new(pointSlice) }, "LineString": func() interface{} { return new(lineSlice) }, "MultiLineString": func() interface{} { return new(multiLineSlice) }, } wrapGeomKind = map[[2]string]func(interface{}) interface{}{ // TODO: extend me! {"LineString", "MultiLineString"}: func(x interface{}) interface{} { return &multiLineSlice{*x.(*lineSlice)} }, } ) func (wfjc *WFSFeatureJobCreator) Description() string { return wfjc.description } func (wfjc *WFSFeatureJobCreator) Depends() [2][]string { return wfjc.depends } func (wfjc *WFSFeatureJobCreator) AutoAccept() bool { return wfjc.stageDone == nil } func (wfjc *WFSFeatureJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, feedback Feedback, ) error { if wfjc.stageDone == nil { return nil } return wfjc.stageDone(ctx, tx, id, feedback) } func (wfjc *WFSFeatureJobCreator) Create() Job { return &WFSFeatureJob{creator: wfjc} } // Description gives a short info about relevant facts of this import. func (wfj *WFSFeatureJob) Description([]string) (string, error) { return wfj.URL + "|" + wfj.FeatureType, nil } // CleanUp for WFS imports is a NOP. func (*WFSFeatureJob) CleanUp() error { return nil } func (wfj *WFSFeatureJob) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import %s", wfj.creator.Description()) feedback.Info("Loading capabilities from %s", wfj.URL) caps, err := wfs.GetCapabilities(wfj.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(wfj.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType) } feedback.Info("Found feature type '%s'", wfj.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } if nilString(wfj.SortBy) != "" { feedback.Info("Features will be sorted by '%s'", *wfj.SortBy) } dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy)) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int dupes int features int ) consumer, err := wfj.creator.newConsumer(ctx, conn, feedback) if err != nil { return nil, err } defer consumer.Rollback() if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) feedback.Info( "Found %d features in data source", len(rfc.Features)) for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } kind, props := consumer.NewFeature() if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } // Look if we can deserialize given type makeGeom := kindToGeometry[feature.Geometry.Type] if makeGeom == nil { unsupported[feature.Geometry.Type]++ continue } // Optional wrapping wrap := func(x interface{}) interface{} { return x } if feature.Geometry.Type != kind { // Look if we can wrap it if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil { unsupported[feature.Geometry.Type]++ continue } } geom := makeGeom() if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil { return err } switch err := consumer.Consume(wrap(geom), props, epsg); { case err == ErrFeatureDuplicated: dupes++ case err == ErrFeatureIgnored: // be silent case err != nil: return err default: features++ } } return nil }); err != nil { return nil, err } if dupes > 0 { feedback.Info( "Features outside responsibility area, duplicates or unchanged: %d", dupes) } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified { feedback.Info("Storing %d features took %s", features, time.Since(start)) } // Commit before eventually returning UnchangedError because we might // have updated last_found if features == 0 && err == ErrFeaturesUnmodified { return nil, UnchangedError("no valid new features found") } if err == ErrFeaturesUnmodified { // It's not really an error. err = nil } return nil, err } type ( SQLGeometryConsumer struct { ctx context.Context tx *sql.Tx feedback Feedback consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error newFeature func() (string, interface{}) preCommit func(*SQLGeometryConsumer) error savepoint func(func() error) error stmts []*sql.Stmt } ) func (sgc *SQLGeometryConsumer) Rollback() error { if tx := sgc.tx; tx != nil { sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil return tx.Rollback() } return nil } func (sgc *SQLGeometryConsumer) Commit() error { var err error if tx := sgc.tx; tx != nil { if sgc.preCommit != nil { err = sgc.preCommit(sgc) } sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil if err2 := tx.Commit(); err2 != nil && (err == nil || err == ErrFeaturesUnmodified) { // An error on commit that is not induced by the first // overrules the first. err = err2 } } return err } func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}) { return sgc.newFeature() } func (sgc *SQLGeometryConsumer) Consume( geom, properties interface{}, epsg int, ) error { return sgc.consume(sgc, geom, properties, epsg) } func (sgc *SQLGeometryConsumer) ConsumePolygon( polygon polygonSlice, properties interface{}, epsg int, ) error { return sgc.consume(sgc, polygon, properties, epsg) } func newSQLConsumer( init func(*SQLGeometryConsumer) error, consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error, preCommit func(*SQLGeometryConsumer) error, newFeature func() (string, interface{}), ) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) { return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } sgc := &SQLGeometryConsumer{ ctx: ctx, tx: tx, feedback: feedback, consume: consume, newFeature: newFeature, preCommit: preCommit, savepoint: Savepoint(ctx, tx, "feature"), } if err := init(sgc); err != nil { tx.Rollback() return nil, err } return sgc, nil } } func (sgc *SQLGeometryConsumer) releaseStmts() { for i := len(sgc.stmts); i > 0; i-- { sgc.stmts[i-1].Close() sgc.stmts[i-1] = nil } sgc.stmts = nil } func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error { return func(sgc *SQLGeometryConsumer) error { for _, query := range queries { stmt, err := sgc.tx.PrepareContext(sgc.ctx, query) if err != nil { return err } sgc.stmts = append(sgc.stmts, stmt) } return nil } }