diff pkg/imports/pointwfs.go @ 4946:b0dbc0f2c748 fairway-marks-import

Simplified importing of fairway marks.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 19 Feb 2020 15:08:35 +0100
parents
children 821ae20b6a20
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/pointwfs.go	Wed Feb 19 15:08:35 2020 +0100
@@ -0,0 +1,323 @@
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/models"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+var (
+	ErrFeatureIgnored    = errors.New("feature ignored")
+	ErrFeatureDuplicated = errors.New("feature duplicated")
+)
+
+type (
+	WFSPointConsumer interface {
+		Commit() error
+		Rollback() error
+
+		NewProperties() interface{}
+		Consume(points pointSlice, properties interface{}, epsg int) error
+	}
+
+	PointWFSJobCreator struct {
+		description string
+		depends     [2][]string
+
+		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error)
+	}
+
+	PointWFSJob struct {
+		models.WFSImport
+		creator *PointWFSJobCreator
+	}
+)
+
+func (pwjc *PointWFSJobCreator) Description() string {
+	return pwjc.description
+}
+
+func (pwjc *PointWFSJobCreator) Depends() [2][]string {
+	return pwjc.depends
+}
+
+func (*PointWFSJobCreator) AutoAccept() bool {
+	return true
+}
+
+// StageDone is a NOP for WFS imports.
+func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+func (pwjc *PointWFSJobCreator) Create() Job {
+	return &PointWFSJob{creator: pwjc}
+}
+
+// Description gives a short info about relevant facts of this import.
+func (pwj *PointWFSJob) Description() (string, error) {
+	return pwj.URL + "|" + pwj.FeatureType, nil
+}
+
+// CleanUp for WFS imports is a NOP.
+func (*PointWFSJob) CleanUp() error {
+	return nil
+}
+
+func (pwj *PointWFSJob) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import %s", pwj.creator.Description())
+
+	feedback.Info("Loading capabilities from %s", pwj.URL)
+	caps, err := wfs.GetCapabilities(pwj.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(pwj.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", pwj.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if nilString(pwj.SortBy) != "" {
+		feedback.Info("Features will be sorted by '%s'", pwj.SortBy)
+	}
+
+	dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy))
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		dupes             int
+		features          int
+	)
+
+	consumer, err := pwj.creator.newConsumer(ctx, conn, feedback)
+	if err != nil {
+		return nil, err
+	}
+	defer consumer.Rollback()
+
+	if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error {
+		feedback.Info("Get features from: '%s'", url)
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			props := consumer.NewProperties()
+			if err := json.Unmarshal(*feature.Properties, props); err != nil {
+				badProperties++
+				continue
+			}
+
+			switch feature.Geometry.Type {
+			case "Point":
+				var p pointSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+
+				err := consumer.Consume(p, props, epsg)
+				switch {
+				case err == ErrFeatureDuplicated:
+					dupes++
+				case err == ErrFeatureIgnored:
+					// be silent
+				case err != nil:
+					return err
+				default:
+					features++
+				}
+
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	if dupes > 0 {
+		feedback.Info(
+			"Features outside responsibility area and duplicates: %d",
+			dupes)
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if features == 0 {
+		return nil, UnchangedError("no valid new features found")
+	}
+
+	feedback.Info("Found %d usable features in data source", features)
+
+	if err = consumer.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, nil
+}
+
+type (
+	SQLPointConsumer struct {
+		ctx           context.Context
+		tx            *sql.Tx
+		feedback      Feedback
+		newProperties func() interface{}
+		consume       func(*SQLPointConsumer, pointSlice, interface{}, int) error
+		savepoint     func(func() error) error
+		stmts         []*sql.Stmt
+	}
+)
+
+func (spc *SQLPointConsumer) Rollback() error {
+	if tx := spc.tx; tx != nil {
+		spc.releaseStmts()
+		spc.tx = nil
+		spc.ctx = nil
+		return tx.Rollback()
+	}
+	return nil
+}
+
+func (spc *SQLPointConsumer) Commit() error {
+	if tx := spc.tx; tx != nil {
+		spc.releaseStmts()
+		spc.tx = nil
+		spc.ctx = nil
+		return tx.Commit()
+	}
+	return nil
+}
+
+func (spc *SQLPointConsumer) NewProperties() interface{} {
+	return spc.newProperties()
+}
+
+func (spc *SQLPointConsumer) Consume(
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	return spc.consume(spc, points, properties, epsg)
+}
+
+func newSQLConsumer(
+	init func(*SQLPointConsumer) error,
+	consume func(*SQLPointConsumer, pointSlice, interface{}, int) error,
+	newProperties func() interface{},
+
+) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) {
+	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+		spc := &SQLPointConsumer{
+			ctx:           ctx,
+			tx:            tx,
+			feedback:      feedback,
+			newProperties: newProperties,
+			consume:       consume,
+			savepoint:     Savepoint(ctx, tx, "feature"),
+		}
+		if err := init(spc); err != nil {
+			tx.Rollback()
+			return nil, err
+		}
+		return spc, nil
+	}
+}
+
+func (spc *SQLPointConsumer) releaseStmts() {
+	for i := len(spc.stmts); i > 0; i-- {
+		spc.stmts[i-1].Close()
+		spc.stmts[i-1] = nil
+	}
+	spc.stmts = nil
+}
+
+func prepareStmnts(queries ...string) func(*SQLPointConsumer) error {
+	return func(spc *SQLPointConsumer) error {
+		for _, query := range queries {
+			stmt, err := spc.tx.PrepareContext(spc.ctx, query)
+			if err != nil {
+				return err
+			}
+			spc.stmts = append(spc.stmts, stmt)
+		}
+		return nil
+	}
+}