view pkg/imports/wfsjob.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents 1222b777f51f
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

var (
	// ErrFeatureIgnored indicates if a feature is ignored.
	ErrFeatureIgnored = errors.New("feature ignored")
	// ErrFeatureDuplicated indicates if a feature is a duplicate.
	ErrFeatureDuplicated = errors.New("feature duplicated")
	// ErrFeaturesUnmodified indicates if a feature is unmodified.
	ErrFeaturesUnmodified = errors.New("features unmodified")
)

type (
	// WFSFeatureConsumer is an interface to model a transactional
	// handling to create new features.
	WFSFeatureConsumer interface {
		Commit() error
		Rollback() error

		NewFeature() (kind string, properties interface{})

		Consume(geom, properties interface{}, epsg int) error
	}

	// WFSFeatureJobCreator is a factory to create feature consumers.
	WFSFeatureJobCreator struct {
		description string
		depends     [2][]string

		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)

		stageDone func(context.Context, *sql.Tx, int64, Feedback) error
	}

	// WFSFeatureJob is job to do an WFS import.
	WFSFeatureJob struct {
		models.WFSImport
		creator *WFSFeatureJobCreator
	}
)

var (
	kindToGeometry = map[string]func() interface{}{
		// TODO: extend me!
		"Point":           func() interface{} { return new(pointSlice) },
		"LineString":      func() interface{} { return new(lineSlice) },
		"MultiLineString": func() interface{} { return new(multiLineSlice) },
	}

	wrapGeomKind = map[[2]string]func(interface{}) interface{}{
		// TODO: extend me!
		{"LineString", "MultiLineString"}: func(x interface{}) interface{} {
			return &multiLineSlice{*x.(*lineSlice)}
		},
	}
)

// Description gives a short description of this creator.
func (wfjc *WFSFeatureJobCreator) Description() string {
	return wfjc.description
}

// Depends lists the dependencies of this creator.
func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
	return wfjc.depends
}

// AutoAccept auto accepts this job if there
// is no stageDone implementation.
func (wfjc *WFSFeatureJobCreator) AutoAccept() bool {
	return wfjc.stageDone == nil
}

// StageDone forwards the StageDone call.
func (wfjc *WFSFeatureJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	feedback Feedback,
) error {
	if wfjc.stageDone == nil {
		return nil
	}
	return wfjc.stageDone(ctx, tx, id, feedback)
}

// Create creates the WFS job.
func (wfjc *WFSFeatureJobCreator) Create() Job {
	return &WFSFeatureJob{creator: wfjc}
}

// Description gives a short info about relevant facts of this import.
func (wfj *WFSFeatureJob) Description([]string) (string, error) {
	return wfj.URL + "|" + wfj.FeatureType, nil
}

// CleanUp for WFS imports is a NOP.
func (*WFSFeatureJob) CleanUp() error {
	return nil
}

// Do implements the actual WFS import.
func (wfj *WFSFeatureJob) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import %s", wfj.creator.Description())

	feedback.Info("Loading capabilities from %s", wfj.URL)
	caps, err := wfs.GetCapabilities(wfj.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wfj.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wfj.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if nilString(wfj.SortBy) != "" {
		feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		dupes             int
		features          int
	)

	consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
	if err != nil {
		return nil, err
	}
	defer consumer.Rollback()

	if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		feedback.Info(
			"Found %d features in data source", len(rfc.Features))

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			kind, props := consumer.NewFeature()

			if err := json.Unmarshal(*feature.Properties, props); err != nil {
				badProperties++
				continue
			}

			// Look if we can deserialize given type
			makeGeom := kindToGeometry[feature.Geometry.Type]
			if makeGeom == nil {
				unsupported[feature.Geometry.Type]++
				continue
			}

			// Optional wrapping
			wrap := func(x interface{}) interface{} { return x }
			if feature.Geometry.Type != kind {
				// Look if we can wrap it
				if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil {
					unsupported[feature.Geometry.Type]++
					continue
				}
			}

			geom := makeGeom()
			if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
				return err
			}

			switch err := consumer.Consume(wrap(geom), props, epsg); {
			case err == ErrFeatureDuplicated:
				dupes++
			case err == ErrFeatureIgnored:
				// be silent
			case err != nil:
				return err
			default:
				features++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if dupes > 0 {
		feedback.Info(
			"Features outside responsibility area, duplicates or unchanged: %d",
			dupes)
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	// Commit before eventually returning UnchangedError because we might
	// have updated last_found
	if features == 0 && err == ErrFeaturesUnmodified {
		return nil, UnchangedError("no valid new features found")
	}

	if err == ErrFeaturesUnmodified {
		// It's not really an error.
		err = nil
	}

	return nil, err
}

// SQLGeometryConsumer stores a WFS feature into the DB.
type SQLGeometryConsumer struct {
	ctx        context.Context
	tx         *sql.Tx
	feedback   Feedback
	consume    func(*SQLGeometryConsumer, interface{}, interface{}, int) error
	newFeature func() (string, interface{})
	preCommit  func(*SQLGeometryConsumer) error
	savepoint  func(func() error) error
	stmts      []*sql.Stmt
}

// Rollback rolls back the database state of this consumer.
func (sgc *SQLGeometryConsumer) Rollback() error {
	if tx := sgc.tx; tx != nil {
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		return tx.Rollback()
	}
	return nil
}

// Commit commits the database changes of this consumer to the database.
func (sgc *SQLGeometryConsumer) Commit() error {
	var err error
	if tx := sgc.tx; tx != nil {
		if sgc.preCommit != nil {
			err = sgc.preCommit(sgc)
		}
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		if err2 := tx.Commit(); err2 != nil &&
			(err == nil || err == ErrFeaturesUnmodified) {
			// An error on commit that is not induced by the first
			// overrules the first.
			err = err2
		}
	}
	return err
}

// NewFeature forwards the feature creation.
func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}) {
	return sgc.newFeature()
}

// Consume forwards the consumption of the given feature.
func (sgc *SQLGeometryConsumer) Consume(
	geom, properties interface{},
	epsg int,
) error {
	return sgc.consume(sgc, geom, properties, epsg)
}

// ConsumePolygon forwards the consumption of a polygon.
func (sgc *SQLGeometryConsumer) ConsumePolygon(
	polygon polygonSlice,
	properties interface{},
	epsg int,
) error {
	return sgc.consume(sgc, polygon, properties, epsg)
}

func newSQLConsumer(
	init func(*SQLGeometryConsumer) error,
	consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
	preCommit func(*SQLGeometryConsumer) error,
	newFeature func() (string, interface{}),

) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		sgc := &SQLGeometryConsumer{
			ctx:        ctx,
			tx:         tx,
			feedback:   feedback,
			consume:    consume,
			newFeature: newFeature,
			preCommit:  preCommit,
			savepoint:  Savepoint(ctx, tx, "feature"),
		}
		if err := init(sgc); err != nil {
			tx.Rollback()
			return nil, err
		}
		return sgc, nil
	}
}

func (sgc *SQLGeometryConsumer) releaseStmts() {
	for i := len(sgc.stmts); i > 0; i-- {
		sgc.stmts[i-1].Close()
		sgc.stmts[i-1] = nil
	}
	sgc.stmts = nil
}

func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
	return func(sgc *SQLGeometryConsumer) error {
		for _, query := range queries {
			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
			if err != nil {
				return err
			}
			sgc.stmts = append(sgc.stmts, stmt)
		}
		return nil
	}
}