Mercurial > gemma
view pkg/imports/wfsjob.go @ 5718:3d497077f888 uploadwg
Implemented direct file upload as alternative import method for WG.
For testing and data corrections it is useful to be able to import
waterway gauges data directly by uploading a xml file.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Thu, 18 Apr 2024 19:23:19 +0200 |
parents | 6270951dda28 |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) var ( // ErrFeatureIgnored indicates if a feature is ignored. ErrFeatureIgnored = errors.New("feature ignored") // ErrFeatureDuplicated indicates if a feature is a duplicate. ErrFeatureDuplicated = errors.New("feature duplicated") // ErrFeaturesUnmodified indicates if a feature is unmodified. ErrFeaturesUnmodified = errors.New("features unmodified") ) type ( // WFSFeatureConsumer is an interface to model a transactional // handling to create new features. WFSFeatureConsumer interface { Commit() error Rollback() error NewFeature() (kind string, properties any) Consume(geom, properties any, epsg int) error } // WFSFeatureJobCreator is a factory to create feature consumers. WFSFeatureJobCreator struct { description string depends [2][]string newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) stageDone func(context.Context, *sql.Tx, int64, Feedback) error } // WFSFeatureJob is job to do an WFS import. WFSFeatureJob struct { models.WFSImport creator *WFSFeatureJobCreator } ) var ( kindToGeometry = map[string]func() any{ // TODO: extend me! "Point": func() any { return new(pointSlice) }, "LineString": func() any { return new(lineSlice) }, "MultiLineString": func() any { return new(multiLineSlice) }, } wrapGeomKind = map[[2]string]func(any) any{ // TODO: extend me! {"LineString", "MultiLineString"}: func(x any) any { return &multiLineSlice{*x.(*lineSlice)} }, } ) // Description gives a short description of this creator. func (wfjc *WFSFeatureJobCreator) Description() string { return wfjc.description } // Depends lists the dependencies of this creator. func (wfjc *WFSFeatureJobCreator) Depends() [2][]string { return wfjc.depends } // AutoAccept auto accepts this job if there // is no stageDone implementation. func (wfjc *WFSFeatureJobCreator) AutoAccept() bool { return wfjc.stageDone == nil } // StageDone forwards the StageDone call. func (wfjc *WFSFeatureJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, feedback Feedback, ) error { if wfjc.stageDone == nil { return nil } return wfjc.stageDone(ctx, tx, id, feedback) } // Create creates the WFS job. func (wfjc *WFSFeatureJobCreator) Create() Job { return &WFSFeatureJob{creator: wfjc} } // Description gives a short info about relevant facts of this import. func (wfj *WFSFeatureJob) Description([]string) (string, error) { return wfj.URL + "|" + wfj.FeatureType, nil } // CleanUp for WFS imports is a NOP. func (*WFSFeatureJob) CleanUp() error { return nil } // Do implements the actual WFS import. func (wfj *WFSFeatureJob) Do( ctx context.Context, _ int64, conn *sql.Conn, feedback Feedback, ) (any, error) { start := time.Now() feedback.Info("Import %s", wfj.creator.Description()) feedback.Info("Loading capabilities from %s", wfj.URL) caps, err := wfs.GetCapabilities(wfj.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(wfj.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType) } feedback.Info("Found feature type '%s'", wfj.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } if nilString(wfj.SortBy) != "" { feedback.Info("Features will be sorted by '%s'", *wfj.SortBy) } dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy)) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int dupes int features int ) consumer, err := wfj.creator.newConsumer(ctx, conn, feedback) if err != nil { return nil, err } defer consumer.Rollback() if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) feedback.Info( "Found %d features in data source", len(rfc.Features)) for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } kind, props := consumer.NewFeature() if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } // Look if we can deserialize given type makeGeom := kindToGeometry[feature.Geometry.Type] if makeGeom == nil { unsupported[feature.Geometry.Type]++ continue } // Optional wrapping wrap := func(x any) any { return x } if feature.Geometry.Type != kind { // Look if we can wrap it if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil { unsupported[feature.Geometry.Type]++ continue } } geom := makeGeom() if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil { return err } switch err := consumer.Consume(wrap(geom), props, epsg); { case err == ErrFeatureDuplicated: dupes++ case err == ErrFeatureIgnored: // be silent case err != nil: return err default: features++ } } return nil }); err != nil { return nil, err } if dupes > 0 { feedback.Info( "Features outside responsibility area, duplicates or unchanged: %d", dupes) } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified { feedback.Info("Storing %d features took %s", features, time.Since(start)) } // Commit before eventually returning UnchangedError because we might // have updated last_found if features == 0 && err == ErrFeaturesUnmodified { return nil, UnchangedError("no valid new features found") } if err == ErrFeaturesUnmodified { // It's not really an error. err = nil } return nil, err } // SQLGeometryConsumer stores a WFS feature into the DB. type SQLGeometryConsumer struct { ctx context.Context tx *sql.Tx feedback Feedback consume func(*SQLGeometryConsumer, any, any, int) error newFeature func() (string, any) preCommit func(*SQLGeometryConsumer) error savepoint func(func() error) error stmts []*sql.Stmt } // Rollback rolls back the database state of this consumer. func (sgc *SQLGeometryConsumer) Rollback() error { if tx := sgc.tx; tx != nil { sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil return tx.Rollback() } return nil } // Commit commits the database changes of this consumer to the database. func (sgc *SQLGeometryConsumer) Commit() error { var err error if tx := sgc.tx; tx != nil { if sgc.preCommit != nil { err = sgc.preCommit(sgc) } sgc.releaseStmts() sgc.tx = nil sgc.ctx = nil if err2 := tx.Commit(); err2 != nil && (err == nil || err == ErrFeaturesUnmodified) { // An error on commit that is not induced by the first // overrules the first. err = err2 } } return err } // NewFeature forwards the feature creation. func (sgc *SQLGeometryConsumer) NewFeature() (string, any) { return sgc.newFeature() } // Consume forwards the consumption of the given feature. func (sgc *SQLGeometryConsumer) Consume( geom, properties any, epsg int, ) error { return sgc.consume(sgc, geom, properties, epsg) } // ConsumePolygon forwards the consumption of a polygon. func (sgc *SQLGeometryConsumer) ConsumePolygon( polygon polygonSlice, properties any, epsg int, ) error { return sgc.consume(sgc, polygon, properties, epsg) } func newSQLConsumer( init func(*SQLGeometryConsumer) error, consume func(*SQLGeometryConsumer, any, any, int) error, preCommit func(*SQLGeometryConsumer) error, newFeature func() (string, any), ) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) { return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } sgc := &SQLGeometryConsumer{ ctx: ctx, tx: tx, feedback: feedback, consume: consume, newFeature: newFeature, preCommit: preCommit, savepoint: Savepoint(ctx, tx, "feature"), } if err := init(sgc); err != nil { tx.Rollback() return nil, err } return sgc, nil } } func (sgc *SQLGeometryConsumer) releaseStmts() { for i := len(sgc.stmts); i > 0; i-- { sgc.stmts[i-1].Close() sgc.stmts[i-1] = nil } sgc.stmts = nil } func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error { return func(sgc *SQLGeometryConsumer) error { for _, query := range queries { stmt, err := sgc.tx.PrepareContext(sgc.ctx, query) if err != nil { return err } sgc.stmts = append(sgc.stmts, stmt) } return nil } }