view pkg/imports/wfsjob.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 733f7136a30e
children ade07a3f2cfd
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

var (
	ErrFeatureIgnored     = errors.New("feature ignored")
	ErrFeatureDuplicated  = errors.New("feature duplicated")
	ErrFeaturesUnmodified = errors.New("features unmodified")
)

type (
	WFSFeatureConsumer interface {
		Commit() error
		Rollback() error

		NewFeature() (kind string, properties interface{})

		Consume(geom, properties interface{}, epsg int) error
	}

	WFSFeatureJobCreator struct {
		description string
		depends     [2][]string

		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)

		stageDone func(context.Context, *sql.Tx, int64, Feedback) error
	}

	WFSFeatureJob struct {
		models.WFSImport
		creator *WFSFeatureJobCreator
	}
)

var (
	kindToGeometry = map[string]func() interface{}{
		// TODO: extend me!
		"Point":           func() interface{} { return new(pointSlice) },
		"LineString":      func() interface{} { return new(lineSlice) },
		"MultiLineString": func() interface{} { return new(multiLineSlice) },
	}

	wrapGeomKind = map[[2]string]func(interface{}) interface{}{
		// TODO: extend me!
		{"LineString", "MultiLineString"}: func(x interface{}) interface{} {
			return &multiLineSlice{*x.(*lineSlice)}
		},
	}
)

func (wfjc *WFSFeatureJobCreator) Description() string {
	return wfjc.description
}

func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
	return wfjc.depends
}

func (wfjc *WFSFeatureJobCreator) AutoAccept() bool {
	return wfjc.stageDone == nil
}

func (wfjc *WFSFeatureJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	feedback Feedback,
) error {
	if wfjc.stageDone == nil {
		return nil
	}
	return wfjc.stageDone(ctx, tx, id, feedback)
}

func (wfjc *WFSFeatureJobCreator) Create() Job {
	return &WFSFeatureJob{creator: wfjc}
}

// Description gives a short info about relevant facts of this import.
func (wfj *WFSFeatureJob) Description() (string, error) {
	return wfj.URL + "|" + wfj.FeatureType, nil
}

// CleanUp for WFS imports is a NOP.
func (*WFSFeatureJob) CleanUp() error {
	return nil
}

func (wfj *WFSFeatureJob) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import %s", wfj.creator.Description())

	feedback.Info("Loading capabilities from %s", wfj.URL)
	caps, err := wfs.GetCapabilities(wfj.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wfj.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wfj.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if nilString(wfj.SortBy) != "" {
		feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		dupes             int
		features          int
	)

	consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
	if err != nil {
		return nil, err
	}
	defer consumer.Rollback()

	if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		feedback.Info(
			"Found %d features in data source", len(rfc.Features))

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			kind, props := consumer.NewFeature()

			if err := json.Unmarshal(*feature.Properties, props); err != nil {
				badProperties++
				continue
			}

			// Look if we can deserialize given type
			makeGeom := kindToGeometry[feature.Geometry.Type]
			if makeGeom == nil {
				unsupported[feature.Geometry.Type]++
				continue
			}

			// Optional wrapping
			wrap := func(x interface{}) interface{} { return x }
			if feature.Geometry.Type != kind {
				// Look if we can wrap it
				if wrap = wrapGeomKind[[2]string{feature.Geometry.Type, kind}]; wrap == nil {
					unsupported[feature.Geometry.Type]++
					continue
				}
			}

			geom := makeGeom()
			if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
				return err
			}

			switch err := consumer.Consume(wrap(geom), props, epsg); {
			case err == ErrFeatureDuplicated:
				dupes++
			case err == ErrFeatureIgnored:
				// be silent
			case err != nil:
				return err
			default:
				features++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if dupes > 0 {
		feedback.Info(
			"Features outside responsibility area, duplicates or unchanged: %d",
			dupes)
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	// Commit before eventually returning UnchangedError because we might
	// have updated last_found
	if features == 0 && err == ErrFeaturesUnmodified {
		return nil, UnchangedError("no valid new features found")
	}

	if err == ErrFeaturesUnmodified {
		// It's not really an error.
		err = nil
	}

	return nil, err
}

type (
	SQLGeometryConsumer struct {
		ctx        context.Context
		tx         *sql.Tx
		feedback   Feedback
		consume    func(*SQLGeometryConsumer, interface{}, interface{}, int) error
		newFeature func() (string, interface{})
		preCommit  func(*SQLGeometryConsumer) error
		savepoint  func(func() error) error
		stmts      []*sql.Stmt
	}
)

func (sgc *SQLGeometryConsumer) Rollback() error {
	if tx := sgc.tx; tx != nil {
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		return tx.Rollback()
	}
	return nil
}

func (sgc *SQLGeometryConsumer) Commit() error {
	var err error
	if tx := sgc.tx; tx != nil {
		if sgc.preCommit != nil {
			err = sgc.preCommit(sgc)
		}
		sgc.releaseStmts()
		sgc.tx = nil
		sgc.ctx = nil
		if err2 := tx.Commit(); err2 != nil &&
			(err == nil || err == ErrFeaturesUnmodified) {
			// An error on commit that is not induced by the first
			// overrules the first.
			err = err2
		}
	}
	return err
}

func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}) {
	return sgc.newFeature()
}

func (sgc *SQLGeometryConsumer) Consume(
	geom, properties interface{},
	epsg int,
) error {
	return sgc.consume(sgc, geom, properties, epsg)
}

func (sgc *SQLGeometryConsumer) ConsumePolygon(
	polygon polygonSlice,
	properties interface{},
	epsg int,
) error {
	return sgc.consume(sgc, polygon, properties, epsg)
}

func newSQLConsumer(
	init func(*SQLGeometryConsumer) error,
	consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
	preCommit func(*SQLGeometryConsumer) error,
	newFeature func() (string, interface{}),

) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		sgc := &SQLGeometryConsumer{
			ctx:        ctx,
			tx:         tx,
			feedback:   feedback,
			consume:    consume,
			newFeature: newFeature,
			preCommit:  preCommit,
			savepoint:  Savepoint(ctx, tx, "feature"),
		}
		if err := init(sgc); err != nil {
			tx.Rollback()
			return nil, err
		}
		return sgc, nil
	}
}

func (sgc *SQLGeometryConsumer) releaseStmts() {
	for i := len(sgc.stmts); i > 0; i-- {
		sgc.stmts[i-1].Close()
		sgc.stmts[i-1] = nil
	}
	sgc.stmts = nil
}

func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
	return func(sgc *SQLGeometryConsumer) error {
		for _, query := range queries {
			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
			if err != nil {
				return err
			}
			sgc.stmts = append(sgc.stmts, stmt)
		}
		return nil
	}
}