view pkg/imports/wfsjob.go @ 5718:3d497077f888 uploadwg

Implemented direct file upload as alternative import method for WG. For testing and data corrections it is useful to be able to import waterway gauges data directly by uploading a xml file.
author Sascha Wilde <wilde@sha-bang.de>
date Thu, 18 Apr 2024 19:23:19 +0200
parents 6270951dda28
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

var (
	// ErrFeatureIgnored indicates if a feature is ignored.
	ErrFeatureIgnored = errors.New("feature ignored")
	// ErrFeatureDuplicated indicates if a feature is a duplicate.
	ErrFeatureDuplicated = errors.New("feature duplicated")
	// ErrFeaturesUnmodified indicates if a feature is unmodified.
	ErrFeaturesUnmodified = errors.New("features unmodified")
)

type (
	// WFSFeatureConsumer is an interface to model a transactional
	// handling to create new features.
	WFSFeatureConsumer interface {
		Commit() error
		Rollback() error

		NewFeature() (kind string, properties any)

		Consume(geom, properties any, epsg int) error
	}

	// WFSFeatureJobCreator is a factory to create feature consumers.
	WFSFeatureJobCreator struct {
		description string
		depends     [2][]string

		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)

		stageDone func(context.Context, *sql.Tx, int64, Feedback) error
	}

	// WFSFeatureJob is job to do an WFS import.
	WFSFeatureJob struct {
		models.WFSImport
		creator *WFSFeatureJobCreator
	}
)

var (
	kindToGeometry = map[string]func() any{
		// TODO: extend me!
		"Point":           func() any { return new(pointSlice) },
		"LineString":      func() any { return new(lineSlice) },
		"MultiLineString": func() any { return new(multiLineSlice) },
	}

	wrapGeomKind = map[[2]string]func(any) any{
		// TODO: extend me!
		{"LineString", "MultiLineString"}: func(x any) any {
			return &multiLineSlice{*x.(*lineSlice)}
		},
	}
)

// Description gives a short description of this creator.
func (wfjc *WFSFeatureJobCreator) Description() string {
	return wfjc.description
}

// Depends lists the dependencies of this creator.
func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
	return wfjc.depends
}

// AutoAccept auto accepts this job if there
// is no stageDone implementation.
func (wfjc *WFSFeatureJobCreator) AutoAccept() bool {
	return wfjc.stageDone == nil
}

// StageDone forwards the StageDone call.
func (wfjc *WFSFeatureJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	feedback Feedback,
) error {
	if wfjc.stageDone == nil {
		return nil
	}
	return wfjc.stageDone(ctx, tx, id, feedback)
}

// Create creates the WFS job.
func (wfjc *WFSFeatureJobCreator) Create() Job {
	return &WFSFeatureJob{creator: wfjc}
}

// Description gives a short info about relevant facts of this import.
func (wfj *WFSFeatureJob) Description([]string) (string, error) {
	return wfj.URL + "|" + wfj.FeatureType, nil
}

// CleanUp for WFS imports is a NOP.
func (*WFSFeatureJob) CleanUp() error {
	return nil
}

// Do implements the actual WFS import.
func (wfj *WFSFeatureJob) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (any, error) {

	start := time.Now()

	feedback.Info("Import %s", wfj.creator.Description())

	feedback.Info("Loading capabilities from %s", wfj.URL)
	caps, err := wfs.GetCapabilities(wfj.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wfj.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wfj.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if nilString(wfj.SortBy) != "" {
		feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		dupes             int
		features          int
	)

	consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
	if err != nil {
		return nil, err
	}
	defer consumer.Rollback()

	if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		feedback.Info(
			"Found %d features in data source", len(rfc.Features))

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			kind, props := consumer.NewFeature()

			if err := json.Unmarshal(*feature.Properties, props); err != nil {
				badProperties++
				continue
			}

			// Look if we can deserialize given type
			makeGeom := kindToGeometry[feature.Geometry.Type]
			if makeGeom == nil {
				unsupported[feature.Geometry.Type]++
				continue
			}

			// Optional wrapping
			wrap := func(x any) any { return x }
			if feature.Geometry.Type != kind {
				// Look if we can wrap it
				if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil {
					unsupported[feature.Geometry.Type]++
					continue
				}
			}

			geom := makeGeom()
			if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
				return err
			}

			switch err := consumer.Consume(wrap(geom), props, epsg); {
			case err == ErrFeatureDuplicated:
				dupes++
			case err == ErrFeatureIgnored:
				// be silent
			case err != nil:
				return err
			default:
				features++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if dupes > 0 {
		feedback.Info(
			"Features outside responsibility area, duplicates or unchanged: %d",
			dupes)
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	// Commit before eventually returning UnchangedError because we might
	// have updated last_found
	if features == 0 && err == ErrFeaturesUnmodified {
		return nil, UnchangedError("no valid new features found")
	}

	if err == ErrFeaturesUnmodified {
		// It's not really an error.
		err = nil
	}

	return nil, err
}

// SQLGeometryConsumer stores a WFS feature into the DB.
type SQLGeometryConsumer struct {
	ctx        context.Context
	tx         *sql.Tx
	feedback   Feedback
	consume    func(*SQLGeometryConsumer, any, any, int) error
	newFeature func() (string, any)
	preCommit  func(*SQLGeometryConsumer) error
	savepoint  func(func() error) error
	stmts      []*sql.Stmt
}

// Rollback rolls back the database state of this consumer.
func (sgc *SQLGeometryConsumer) Rollback() error {
	if tx := sgc.tx; tx != nil {
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		return tx.Rollback()
	}
	return nil
}

// Commit commits the database changes of this consumer to the database.
func (sgc *SQLGeometryConsumer) Commit() error {
	var err error
	if tx := sgc.tx; tx != nil {
		if sgc.preCommit != nil {
			err = sgc.preCommit(sgc)
		}
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		if err2 := tx.Commit(); err2 != nil &&
			(err == nil || err == ErrFeaturesUnmodified) {
			// An error on commit that is not induced by the first
			// overrules the first.
			err = err2
		}
	}
	return err
}

// NewFeature forwards the feature creation.
func (sgc *SQLGeometryConsumer) NewFeature() (string, any) {
	return sgc.newFeature()
}

// Consume forwards the consumption of the given feature.
func (sgc *SQLGeometryConsumer) Consume(
	geom, properties any,
	epsg int,
) error {
	return sgc.consume(sgc, geom, properties, epsg)
}

// ConsumePolygon forwards the consumption of a polygon.
func (sgc *SQLGeometryConsumer) ConsumePolygon(
	polygon polygonSlice,
	properties any,
	epsg int,
) error {
	return sgc.consume(sgc, polygon, properties, epsg)
}

func newSQLConsumer(
	init func(*SQLGeometryConsumer) error,
	consume func(*SQLGeometryConsumer, any, any, int) error,
	preCommit func(*SQLGeometryConsumer) error,
	newFeature func() (string, any),

) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		sgc := &SQLGeometryConsumer{
			ctx:        ctx,
			tx:         tx,
			feedback:   feedback,
			consume:    consume,
			newFeature: newFeature,
			preCommit:  preCommit,
			savepoint:  Savepoint(ctx, tx, "feature"),
		}
		if err := init(sgc); err != nil {
			tx.Rollback()
			return nil, err
		}
		return sgc, nil
	}
}

func (sgc *SQLGeometryConsumer) releaseStmts() {
	for i := len(sgc.stmts); i > 0; i-- {
		sgc.stmts[i-1].Close()
		sgc.stmts[i-1] = nil
	}
	sgc.stmts = nil
}

func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
	return func(sgc *SQLGeometryConsumer) error {
		for _, query := range queries {
			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
			if err != nil {
				return err
			}
			sgc.stmts = append(sgc.stmts, stmt)
		}
		return nil
	}
}