view pkg/imports/pointwfs.go @ 4948:821ae20b6a20 fairway-marks-import

Re-added missing header lines.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 20 Feb 2020 17:41:36 +0100
parents b0dbc0f2c748
children dd83c2dfffc9
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

var (
	ErrFeatureIgnored    = errors.New("feature ignored")
	ErrFeatureDuplicated = errors.New("feature duplicated")
)

type (
	WFSPointConsumer interface {
		Commit() error
		Rollback() error

		NewProperties() interface{}
		Consume(points pointSlice, properties interface{}, epsg int) error
	}

	PointWFSJobCreator struct {
		description string
		depends     [2][]string

		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error)
	}

	PointWFSJob struct {
		models.WFSImport
		creator *PointWFSJobCreator
	}
)

func (pwjc *PointWFSJobCreator) Description() string {
	return pwjc.description
}

func (pwjc *PointWFSJobCreator) Depends() [2][]string {
	return pwjc.depends
}

func (*PointWFSJobCreator) AutoAccept() bool {
	return true
}

// StageDone is a NOP for WFS imports.
func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

func (pwjc *PointWFSJobCreator) Create() Job {
	return &PointWFSJob{creator: pwjc}
}

// Description gives a short info about relevant facts of this import.
func (pwj *PointWFSJob) Description() (string, error) {
	return pwj.URL + "|" + pwj.FeatureType, nil
}

// CleanUp for WFS imports is a NOP.
func (*PointWFSJob) CleanUp() error {
	return nil
}

func (pwj *PointWFSJob) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import %s", pwj.creator.Description())

	feedback.Info("Loading capabilities from %s", pwj.URL)
	caps, err := wfs.GetCapabilities(pwj.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(pwj.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType)
	}

	feedback.Info("Found feature type '%s", pwj.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if nilString(pwj.SortBy) != "" {
		feedback.Info("Features will be sorted by '%s'", pwj.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy))
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		dupes             int
		features          int
	)

	consumer, err := pwj.creator.newConsumer(ctx, conn, feedback)
	if err != nil {
		return nil, err
	}
	defer consumer.Rollback()

	if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			props := consumer.NewProperties()
			if err := json.Unmarshal(*feature.Properties, props); err != nil {
				badProperties++
				continue
			}

			switch feature.Geometry.Type {
			case "Point":
				var p pointSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}

				err := consumer.Consume(p, props, epsg)
				switch {
				case err == ErrFeatureDuplicated:
					dupes++
				case err == ErrFeatureIgnored:
					// be silent
				case err != nil:
					return err
				default:
					features++
				}

			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if dupes > 0 {
		feedback.Info(
			"Features outside responsibility area and duplicates: %d",
			dupes)
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		return nil, UnchangedError("no valid new features found")
	}

	feedback.Info("Found %d usable features in data source", features)

	if err = consumer.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, nil
}

type (
	SQLPointConsumer struct {
		ctx           context.Context
		tx            *sql.Tx
		feedback      Feedback
		newProperties func() interface{}
		consume       func(*SQLPointConsumer, pointSlice, interface{}, int) error
		savepoint     func(func() error) error
		stmts         []*sql.Stmt
	}
)

func (spc *SQLPointConsumer) Rollback() error {
	if tx := spc.tx; tx != nil {
		spc.releaseStmts()
		spc.tx = nil
		spc.ctx = nil
		return tx.Rollback()
	}
	return nil
}

func (spc *SQLPointConsumer) Commit() error {
	if tx := spc.tx; tx != nil {
		spc.releaseStmts()
		spc.tx = nil
		spc.ctx = nil
		return tx.Commit()
	}
	return nil
}

func (spc *SQLPointConsumer) NewProperties() interface{} {
	return spc.newProperties()
}

func (spc *SQLPointConsumer) Consume(
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	return spc.consume(spc, points, properties, epsg)
}

func newSQLConsumer(
	init func(*SQLPointConsumer) error,
	consume func(*SQLPointConsumer, pointSlice, interface{}, int) error,
	newProperties func() interface{},

) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) {
	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		spc := &SQLPointConsumer{
			ctx:           ctx,
			tx:            tx,
			feedback:      feedback,
			newProperties: newProperties,
			consume:       consume,
			savepoint:     Savepoint(ctx, tx, "feature"),
		}
		if err := init(spc); err != nil {
			tx.Rollback()
			return nil, err
		}
		return spc, nil
	}
}

func (spc *SQLPointConsumer) releaseStmts() {
	for i := len(spc.stmts); i > 0; i-- {
		spc.stmts[i-1].Close()
		spc.stmts[i-1] = nil
	}
	spc.stmts = nil
}

func prepareStmnts(queries ...string) func(*SQLPointConsumer) error {
	return func(spc *SQLPointConsumer) error {
		for _, query := range queries {
			stmt, err := spc.tx.PrepareContext(spc.ctx, query)
			if err != nil {
				return err
			}
			spc.stmts = append(spc.stmts, stmt)
		}
		return nil
	}
}