Mercurial > gemma
view pkg/imports/pointwfs.go @ 4948:821ae20b6a20 fairway-marks-import
Re-added missing header lines.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 20 Feb 2020 17:41:36 +0100 |
parents | b0dbc0f2c748 |
children | dd83c2dfffc9 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Tom Gottfried <tom.gottfried@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) var ( ErrFeatureIgnored = errors.New("feature ignored") ErrFeatureDuplicated = errors.New("feature duplicated") ) type ( WFSPointConsumer interface { Commit() error Rollback() error NewProperties() interface{} Consume(points pointSlice, properties interface{}, epsg int) error } PointWFSJobCreator struct { description string depends [2][]string newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) } PointWFSJob struct { models.WFSImport creator *PointWFSJobCreator } ) func (pwjc *PointWFSJobCreator) Description() string { return pwjc.description } func (pwjc *PointWFSJobCreator) Depends() [2][]string { return pwjc.depends } func (*PointWFSJobCreator) AutoAccept() bool { return true } // StageDone is a NOP for WFS imports. func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } func (pwjc *PointWFSJobCreator) Create() Job { return &PointWFSJob{creator: pwjc} } // Description gives a short info about relevant facts of this import. func (pwj *PointWFSJob) Description() (string, error) { return pwj.URL + "|" + pwj.FeatureType, nil } // CleanUp for WFS imports is a NOP. func (*PointWFSJob) CleanUp() error { return nil } func (pwj *PointWFSJob) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import %s", pwj.creator.Description()) feedback.Info("Loading capabilities from %s", pwj.URL) caps, err := wfs.GetCapabilities(pwj.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(pwj.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType) } feedback.Info("Found feature type '%s", pwj.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } if nilString(pwj.SortBy) != "" { feedback.Info("Features will be sorted by '%s'", pwj.SortBy) } dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy)) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int dupes int features int ) consumer, err := pwj.creator.newConsumer(ctx, conn, feedback) if err != nil { return nil, err } defer consumer.Rollback() if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } props := consumer.NewProperties() if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } switch feature.Geometry.Type { case "Point": var p pointSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { return err } err := consumer.Consume(p, props, epsg) switch { case err == ErrFeatureDuplicated: dupes++ case err == ErrFeatureIgnored: // be silent case err != nil: return err default: features++ } default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return nil, err } if dupes > 0 { feedback.Info( "Features outside responsibility area and duplicates: %d", dupes) } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { return nil, UnchangedError("no valid new features found") } feedback.Info("Found %d usable features in data source", features) if err = consumer.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } return nil, nil } type ( SQLPointConsumer struct { ctx context.Context tx *sql.Tx feedback Feedback newProperties func() interface{} consume func(*SQLPointConsumer, pointSlice, interface{}, int) error savepoint func(func() error) error stmts []*sql.Stmt } ) func (spc *SQLPointConsumer) Rollback() error { if tx := spc.tx; tx != nil { spc.releaseStmts() spc.tx = nil spc.ctx = nil return tx.Rollback() } return nil } func (spc *SQLPointConsumer) Commit() error { if tx := spc.tx; tx != nil { spc.releaseStmts() spc.tx = nil spc.ctx = nil return tx.Commit() } return nil } func (spc *SQLPointConsumer) NewProperties() interface{} { return spc.newProperties() } func (spc *SQLPointConsumer) Consume( points pointSlice, properties interface{}, epsg int, ) error { return spc.consume(spc, points, properties, epsg) } func newSQLConsumer( init func(*SQLPointConsumer) error, consume func(*SQLPointConsumer, pointSlice, interface{}, int) error, newProperties func() interface{}, ) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) { return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } spc := &SQLPointConsumer{ ctx: ctx, tx: tx, feedback: feedback, newProperties: newProperties, consume: consume, savepoint: Savepoint(ctx, tx, "feature"), } if err := init(spc); err != nil { tx.Rollback() return nil, err } return spc, nil } } func (spc *SQLPointConsumer) releaseStmts() { for i := len(spc.stmts); i > 0; i-- { spc.stmts[i-1].Close() spc.stmts[i-1] = nil } spc.stmts = nil } func prepareStmnts(queries ...string) func(*SQLPointConsumer) error { return func(spc *SQLPointConsumer) error { for _, query := range queries { stmt, err := spc.tx.PrepareContext(spc.ctx, query) if err != nil { return err } spc.stmts = append(spc.stmts, stmt) } return nil } }