changeset 4979:ee12730acd73

Moved generalized WFS feature import to a better named file.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 03 Mar 2020 18:13:57 +0100
parents 35a3dc12050f
children 21d2acc080c9
files pkg/imports/pointwfs.go pkg/imports/wfsjob.go
diffstat 2 files changed, 351 insertions(+), 351 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/pointwfs.go	Tue Mar 03 18:12:18 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,351 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"io"
-	"time"
-
-	"gemma.intevation.de/gemma/pkg/models"
-	"gemma.intevation.de/gemma/pkg/wfs"
-)
-
-var (
-	ErrFeatureIgnored     = errors.New("feature ignored")
-	ErrFeatureDuplicated  = errors.New("feature duplicated")
-	ErrFeaturesUnmodified = errors.New("features unmodified")
-)
-
-type (
-	WFSFeatureConsumer interface {
-		Commit() error
-		Rollback() error
-
-		NewFeature() (kind string, geom interface{}, properties interface{})
-
-		Consume(geom, properties interface{}, epsg int) error
-	}
-
-	WFSFeatureJobCreator struct {
-		description string
-		depends     [2][]string
-
-		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)
-	}
-
-	WFSFeatureJob struct {
-		models.WFSImport
-		creator *WFSFeatureJobCreator
-	}
-)
-
-func (wfjc *WFSFeatureJobCreator) Description() string {
-	return wfjc.description
-}
-
-func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
-	return wfjc.depends
-}
-
-func (*WFSFeatureJobCreator) AutoAccept() bool {
-	return true
-}
-
-// StageDone is a NOP for WFS imports.
-func (*WFSFeatureJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-func (wfjc *WFSFeatureJobCreator) Create() Job {
-	return &WFSFeatureJob{creator: wfjc}
-}
-
-// Description gives a short info about relevant facts of this import.
-func (wfj *WFSFeatureJob) Description() (string, error) {
-	return wfj.URL + "|" + wfj.FeatureType, nil
-}
-
-// CleanUp for WFS imports is a NOP.
-func (*WFSFeatureJob) CleanUp() error {
-	return nil
-}
-
-func (wfj *WFSFeatureJob) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	start := time.Now()
-
-	feedback.Info("Import %s", wfj.creator.Description())
-
-	feedback.Info("Loading capabilities from %s", wfj.URL)
-	caps, err := wfs.GetCapabilities(wfj.URL)
-	if err != nil {
-		feedback.Error("Loading capabilities failed: %v", err)
-		return nil, err
-	}
-
-	ft := caps.FindFeatureType(wfj.FeatureType)
-	if ft == nil {
-		return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
-	}
-
-	feedback.Info("Found feature type '%s'", wfj.FeatureType)
-
-	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
-	if err != nil {
-		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
-		return nil, err
-	}
-
-	if nilString(wfj.SortBy) != "" {
-		feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
-	}
-
-	dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
-	if err != nil {
-		feedback.Error("Cannot create GetFeature URLs. %v", err)
-		return nil, err
-	}
-
-	var (
-		unsupported       = stringCounter{}
-		missingProperties int
-		badProperties     int
-		dupes             int
-		features          int
-	)
-
-	consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
-	if err != nil {
-		return nil, err
-	}
-	defer consumer.Rollback()
-
-	if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
-		feedback.Info("Get features from: '%s'", url)
-		rfc, err := wfs.ParseRawFeatureCollection(r)
-		if err != nil {
-			return fmt.Errorf("parsing GetFeature document failed: %v", err)
-		}
-		if rfc.CRS != nil {
-			crsName := rfc.CRS.Properties.Name
-			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
-				feedback.Error("Unsupported CRS: %d", crsName)
-				return err
-			}
-		}
-
-		// No features -> ignore.
-		if rfc.Features == nil {
-			return nil
-		}
-
-		feedback.Info("Using EPSG: %d", epsg)
-
-		feedback.Info(
-			"Found %d features in data source", len(rfc.Features))
-
-		for _, feature := range rfc.Features {
-			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
-				missingProperties++
-				continue
-			}
-
-			kind, geom, props := consumer.NewFeature()
-
-			if err := json.Unmarshal(*feature.Properties, props); err != nil {
-				badProperties++
-				continue
-			}
-
-			if feature.Geometry.Type == kind {
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
-					return err
-				}
-
-				err := consumer.Consume(geom, props, epsg)
-				switch {
-				case err == ErrFeatureDuplicated:
-					dupes++
-				case err == ErrFeatureIgnored:
-					// be silent
-				case err != nil:
-					return err
-				default:
-					features++
-				}
-			} else {
-				unsupported[feature.Geometry.Type]++
-			}
-		}
-		return nil
-	}); err != nil {
-		return nil, err
-	}
-
-	if dupes > 0 {
-		feedback.Info(
-			"Features outside responsibility area, duplicates or unchanged: %d",
-			dupes)
-	}
-
-	if badProperties > 0 {
-		feedback.Warn("Bad properties: %d", badProperties)
-	}
-
-	if missingProperties > 0 {
-		feedback.Warn("Missing properties: %d", missingProperties)
-	}
-
-	if len(unsupported) != 0 {
-		feedback.Warn("Unsupported types found: %s", unsupported)
-	}
-
-	if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
-		feedback.Info("Storing %d features took %s",
-			features, time.Since(start))
-	}
-
-	// Commit before eventually returning UnchangedError because we might
-	// have updated last_found
-	if features == 0 && err == ErrFeaturesUnmodified {
-		return nil, UnchangedError("no valid new features found")
-	}
-
-	if err == ErrFeaturesUnmodified {
-		// It's not really an error.
-		err = nil
-	}
-
-	return nil, err
-}
-
-type (
-	SQLGeometryConsumer struct {
-		ctx        context.Context
-		tx         *sql.Tx
-		feedback   Feedback
-		consume    func(*SQLGeometryConsumer, interface{}, interface{}, int) error
-		newFeature func() (string, interface{}, interface{})
-		preCommit  func(*SQLGeometryConsumer) error
-		savepoint  func(func() error) error
-		stmts      []*sql.Stmt
-	}
-)
-
-func (sgc *SQLGeometryConsumer) Rollback() error {
-	if tx := sgc.tx; tx != nil {
-		sgc.releaseStmts()
-		sgc.tx = nil
-		sgc.ctx = nil
-		return tx.Rollback()
-	}
-	return nil
-}
-
-func (sgc *SQLGeometryConsumer) Commit() error {
-	var err error
-	if tx := sgc.tx; tx != nil {
-		if sgc.preCommit != nil {
-			err = sgc.preCommit(sgc)
-		}
-		sgc.releaseStmts()
-		sgc.tx = nil
-		sgc.ctx = nil
-		if err2 := tx.Commit(); err2 != nil {
-			// A real error on commit overrules the first.
-			err = err2
-		}
-	}
-	return err
-}
-
-func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) {
-	return sgc.newFeature()
-}
-
-func (sgc *SQLGeometryConsumer) Consume(
-	geom, properties interface{},
-	epsg int,
-) error {
-	return sgc.consume(sgc, geom, properties, epsg)
-}
-
-func (sgc *SQLGeometryConsumer) ConsumePolygon(
-	polygon polygonSlice,
-	properties interface{},
-	epsg int,
-) error {
-	return sgc.consume(sgc, polygon, properties, epsg)
-}
-
-func newSQLConsumer(
-	init func(*SQLGeometryConsumer) error,
-	consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
-	preCommit func(*SQLGeometryConsumer) error,
-	newFeature func() (string, interface{}, interface{}),
-
-) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
-	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
-		tx, err := conn.BeginTx(ctx, nil)
-		if err != nil {
-			return nil, err
-		}
-		sgc := &SQLGeometryConsumer{
-			ctx:        ctx,
-			tx:         tx,
-			feedback:   feedback,
-			consume:    consume,
-			newFeature: newFeature,
-			preCommit:  preCommit,
-			savepoint:  Savepoint(ctx, tx, "feature"),
-		}
-		if err := init(sgc); err != nil {
-			tx.Rollback()
-			return nil, err
-		}
-		return sgc, nil
-	}
-}
-
-func (sgc *SQLGeometryConsumer) releaseStmts() {
-	for i := len(sgc.stmts); i > 0; i-- {
-		sgc.stmts[i-1].Close()
-		sgc.stmts[i-1] = nil
-	}
-	sgc.stmts = nil
-}
-
-func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
-	return func(sgc *SQLGeometryConsumer) error {
-		for _, query := range queries {
-			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
-			if err != nil {
-				return err
-			}
-			sgc.stmts = append(sgc.stmts, stmt)
-		}
-		return nil
-	}
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/wfsjob.go	Tue Mar 03 18:13:57 2020 +0100
@@ -0,0 +1,351 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/models"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+var (
+	ErrFeatureIgnored     = errors.New("feature ignored")
+	ErrFeatureDuplicated  = errors.New("feature duplicated")
+	ErrFeaturesUnmodified = errors.New("features unmodified")
+)
+
+type (
+	WFSFeatureConsumer interface {
+		Commit() error
+		Rollback() error
+
+		NewFeature() (kind string, geom interface{}, properties interface{})
+
+		Consume(geom, properties interface{}, epsg int) error
+	}
+
+	WFSFeatureJobCreator struct {
+		description string
+		depends     [2][]string
+
+		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)
+	}
+
+	WFSFeatureJob struct {
+		models.WFSImport
+		creator *WFSFeatureJobCreator
+	}
+)
+
+func (wfjc *WFSFeatureJobCreator) Description() string {
+	return wfjc.description
+}
+
+func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
+	return wfjc.depends
+}
+
+func (*WFSFeatureJobCreator) AutoAccept() bool {
+	return true
+}
+
+// StageDone is a NOP for WFS imports.
+func (*WFSFeatureJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+func (wfjc *WFSFeatureJobCreator) Create() Job {
+	return &WFSFeatureJob{creator: wfjc}
+}
+
+// Description gives a short info about relevant facts of this import.
+func (wfj *WFSFeatureJob) Description() (string, error) {
+	return wfj.URL + "|" + wfj.FeatureType, nil
+}
+
+// CleanUp for WFS imports is a NOP.
+func (*WFSFeatureJob) CleanUp() error {
+	return nil
+}
+
+func (wfj *WFSFeatureJob) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import %s", wfj.creator.Description())
+
+	feedback.Info("Loading capabilities from %s", wfj.URL)
+	caps, err := wfs.GetCapabilities(wfj.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(wfj.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s'", wfj.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if nilString(wfj.SortBy) != "" {
+		feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
+	}
+
+	dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		dupes             int
+		features          int
+	)
+
+	consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
+	if err != nil {
+		return nil, err
+	}
+	defer consumer.Rollback()
+
+	if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
+		feedback.Info("Get features from: '%s'", url)
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		feedback.Info(
+			"Found %d features in data source", len(rfc.Features))
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			kind, geom, props := consumer.NewFeature()
+
+			if err := json.Unmarshal(*feature.Properties, props); err != nil {
+				badProperties++
+				continue
+			}
+
+			if feature.Geometry.Type == kind {
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
+					return err
+				}
+
+				err := consumer.Consume(geom, props, epsg)
+				switch {
+				case err == ErrFeatureDuplicated:
+					dupes++
+				case err == ErrFeatureIgnored:
+					// be silent
+				case err != nil:
+					return err
+				default:
+					features++
+				}
+			} else {
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	if dupes > 0 {
+		feedback.Info(
+			"Features outside responsibility area, duplicates or unchanged: %d",
+			dupes)
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	// Commit before eventually returning UnchangedError because we might
+	// have updated last_found
+	if features == 0 && err == ErrFeaturesUnmodified {
+		return nil, UnchangedError("no valid new features found")
+	}
+
+	if err == ErrFeaturesUnmodified {
+		// It's not really an error.
+		err = nil
+	}
+
+	return nil, err
+}
+
+type (
+	SQLGeometryConsumer struct {
+		ctx        context.Context
+		tx         *sql.Tx
+		feedback   Feedback
+		consume    func(*SQLGeometryConsumer, interface{}, interface{}, int) error
+		newFeature func() (string, interface{}, interface{})
+		preCommit  func(*SQLGeometryConsumer) error
+		savepoint  func(func() error) error
+		stmts      []*sql.Stmt
+	}
+)
+
+func (sgc *SQLGeometryConsumer) Rollback() error {
+	if tx := sgc.tx; tx != nil {
+		sgc.releaseStmts()
+		sgc.tx = nil
+		sgc.ctx = nil
+		return tx.Rollback()
+	}
+	return nil
+}
+
+func (sgc *SQLGeometryConsumer) Commit() error {
+	var err error
+	if tx := sgc.tx; tx != nil {
+		if sgc.preCommit != nil {
+			err = sgc.preCommit(sgc)
+		}
+		sgc.releaseStmts()
+		sgc.tx = nil
+		sgc.ctx = nil
+		if err2 := tx.Commit(); err2 != nil {
+			// A real error on commit overrules the first.
+			err = err2
+		}
+	}
+	return err
+}
+
+func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) {
+	return sgc.newFeature()
+}
+
+func (sgc *SQLGeometryConsumer) Consume(
+	geom, properties interface{},
+	epsg int,
+) error {
+	return sgc.consume(sgc, geom, properties, epsg)
+}
+
+func (sgc *SQLGeometryConsumer) ConsumePolygon(
+	polygon polygonSlice,
+	properties interface{},
+	epsg int,
+) error {
+	return sgc.consume(sgc, polygon, properties, epsg)
+}
+
+func newSQLConsumer(
+	init func(*SQLGeometryConsumer) error,
+	consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
+	preCommit func(*SQLGeometryConsumer) error,
+	newFeature func() (string, interface{}, interface{}),
+
+) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
+	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+		sgc := &SQLGeometryConsumer{
+			ctx:        ctx,
+			tx:         tx,
+			feedback:   feedback,
+			consume:    consume,
+			newFeature: newFeature,
+			preCommit:  preCommit,
+			savepoint:  Savepoint(ctx, tx, "feature"),
+		}
+		if err := init(sgc); err != nil {
+			tx.Rollback()
+			return nil, err
+		}
+		return sgc, nil
+	}
+}
+
+func (sgc *SQLGeometryConsumer) releaseStmts() {
+	for i := len(sgc.stmts); i > 0; i-- {
+		sgc.stmts[i-1].Close()
+		sgc.stmts[i-1] = nil
+	}
+	sgc.stmts = nil
+}
+
+func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
+	return func(sgc *SQLGeometryConsumer) error {
+		for _, query := range queries {
+			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
+			if err != nil {
+				return err
+			}
+			sgc.stmts = append(sgc.stmts, stmt)
+		}
+		return nil
+	}
+}