Mercurial > gemma
view pkg/imports/wfsjob.go @ 5560:f2204f91d286
Join the log lines of imports to the log exports to recover data from them.
Used in SR export to extract information that where in the meta json
but now are only found in the log.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Feb 2022 18:34:40 +0100 |
parents | 733f7136a30e |
children | ade07a3f2cfd |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) var ( ErrFeatureIgnored = errors.New("feature ignored") ErrFeatureDuplicated = errors.New("feature duplicated") ErrFeaturesUnmodified = errors.New("features unmodified") ) type ( WFSFeatureConsumer interface { Commit() error Rollback() error NewFeature() (kind string, properties interface{}) Consume(geom, properties interface{}, epsg int) error } WFSFeatureJobCreator struct { description string depends [2][]string newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) stageDone func(context.Context, *sql.Tx, int64, Feedback) error } WFSFeatureJob struct { models.WFSImport creator *WFSFeatureJobCreator } ) var ( kindToGeometry = map[string]func() interface{}{ // TODO: extend me! "Point": func() interface{} { return new(pointSlice) }, "LineString": func() interface{} { return new(lineSlice) }, "MultiLineString": func() interface{} { return new(multiLineSlice) }, } wrapGeomKind = map[[2]string]func(interface{}) interface{}{ // TODO: extend me! {"LineString", "MultiLineString"}: func(x interface{}) interface{} { return &multiLineSlice{*x.(*lineSlice)} }, } ) func (wfjc *WFSFeatureJobCreator) Description() string { return wfjc.description } func (wfjc *WFSFeatureJobCreator) Depends() [2][]string { return wfjc.depends } func (wfjc *WFSFeatureJobCreator) AutoAccept() bool { return wfjc.stageDone == nil } func (wfjc *WFSFeatureJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, feedback Feedback, ) error { if wfjc.stageDone == nil { return nil } return wfjc.stageDone(ctx, tx, id, feedback) } func (wfjc *WFSFeatureJobCreator) Create() Job { return &WFSFeatureJob{creator: wfjc} } // Description gives a short info about relevant facts of this import. func (wfj *WFSFeatureJob) Description() (string, error) { return wfj.URL + "|" + wfj.FeatureType, nil } // CleanUp for WFS imports is a NOP. func (*WFSFeatureJob) CleanUp() error { return nil } func (wfj *WFSFeatureJob) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import %s", wfj.creator.Description()) feedback.Info("Loading capabilities from %s", wfj.URL) caps, err := wfs.GetCapabilities(wfj.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(wfj.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType) } feedback.Info("Found feature type '%s'", wfj.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } if nilString(wfj.SortBy) != "" { feedback.Info("Features will be sorted by '%s'", *wfj.SortBy) } dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy)) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int dupes int features int ) consumer, err := wfj.creator.newConsumer(ctx, conn, feedback) if err != nil { return nil, err } defer consumer.Rollback() if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) feedback.Info( "Found %d features in data source", len(rfc.Features)) for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } kind, props := consumer.NewFeature() if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } // Look if we can deserialize given type makeGeom := kindToGeometry[feature.Geometry.Type] if makeGeom == nil { unsupported[feature.Geometry.Type]++ continue } // Optional wrapping wrap := func(x interface{}) interface{} { return x } if feature.Geometry.Type != kind { // Look if we can wrap it if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil { unsupported[feature.Geometry.Type]++ continue } } geom := makeGeom() if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil { return err } switch err := consumer.Consume(wrap(geom), props, epsg); { case err == ErrFeatureDuplicated: dupes++ case err == ErrFeatureIgnored: // be silent case err != nil: return err default: features++ } } return nil }); err != nil { return nil, err } if dupes > 0 { feedback.Info( "Features outside responsibility area, duplicates or unchanged: %d", dupes) } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified { feedback.Info("Storing %d features took %s", features, time.Since(start)) } // Commit before eventually returning UnchangedError because we might // have updated last_found if features == 0 && err == ErrFeaturesUnmodified { return nil, UnchangedError("no valid new features found") } if err == ErrFeaturesUnmodified { // It's not really an error. err = nil } return nil, err } type ( SQLGeometryConsumer struct { ctx context.Context tx *sql.Tx feedback Feedback consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error newFeature func() (string, interface{}) preCommit func(*SQLGeometryConsumer) error savepoint func(func() error) error stmts []*sql.Stmt } ) func (sgc *SQLGeometryConsumer) Rollback() error { if tx := sgc.tx; tx != nil { sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil return tx.Rollback() } return nil } func (sgc *SQLGeometryConsumer) Commit() error { var err error if tx := sgc.tx; tx != nil { if sgc.preCommit != nil { err = sgc.preCommit(sgc) } sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil if err2 := tx.Commit(); err2 != nil && (err == nil || err == ErrFeaturesUnmodified) { // An error on commit that is not induced by the first // overrules the first. err = err2 } } return err } func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}) { return sgc.newFeature() } func (sgc *SQLGeometryConsumer) Consume( geom, properties interface{}, epsg int, ) error { return sgc.consume(sgc, geom, properties, epsg) } func (sgc *SQLGeometryConsumer) ConsumePolygon( polygon polygonSlice, properties interface{}, epsg int, ) error { return sgc.consume(sgc, polygon, properties, epsg) } func newSQLConsumer( init func(*SQLGeometryConsumer) error, consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error, preCommit func(*SQLGeometryConsumer) error, newFeature func() (string, interface{}), ) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) { return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } sgc := &SQLGeometryConsumer{ ctx: ctx, tx: tx, feedback: feedback, consume: consume, newFeature: newFeature, preCommit: preCommit, savepoint: Savepoint(ctx, tx, "feature"), } if err := init(sgc); err != nil { tx.Rollback() return nil, err } return sgc, nil } } func (sgc *SQLGeometryConsumer) releaseStmts() { for i := len(sgc.stmts); i > 0; i-- { sgc.stmts[i-1].Close() sgc.stmts[i-1] = nil } sgc.stmts = nil } func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error { return func(sgc *SQLGeometryConsumer) error { for _, query := range queries { stmt, err := sgc.tx.PrepareContext(sgc.ctx, query) if err != nil { return err } sgc.stmts = append(sgc.stmts, stmt) } return nil } }