view pkg/imports/wa.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents ade07a3f2cfd
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayArea is an import job to import
// the waterway area in form of polygon geometries
// and attribute data from a WFS service.
type WaterwayArea struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (wa *WaterwayArea) Description([]string) (string, error) {
	return wa.URL + "|" + wa.FeatureType, nil
}

// WAJobKind is the import queue type identifier.
const WAJobKind JobKind = "wa"

type waJobCreator struct{}

func init() {
	RegisterJobCreator(WAJobKind, waJobCreator{})
}

func (waJobCreator) Description() string { return "waterway area" }

func (waJobCreator) AutoAccept() bool { return true }

func (waJobCreator) Create() Job { return new(WaterwayArea) }

func (waJobCreator) Depends() [2][]string {
	return [2][]string{{"waterway_area"}}
}

// StageDone is a NOP for waterway area imports.
func (waJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp for waterway area imports is a NOP.
func (*WaterwayArea) CleanUp() error { return nil }

type waterwayAreaProperties struct {
	Catccl *string `json:"ienc_catccl"`
	Dirimp *string `json:"ienc_dirimp"`
}

const (
	deleteWaterwayAreaSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
DELETE FROM waterway.waterway_area
WHERE pg_has_role('sys_admin', 'MEMBER')
  OR ST_Covers((SELECT a FROM resp),
    ST_Transform(area::geometry, (SELECT ST_SRID(a) FROM resp)))
`
	insertWaterwayAreaSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
INSERT INTO waterway.waterway_area (area, catccl, dirimp)
SELECT dmp.geom, $3, $4
  FROM ST_GeomFromWKB($1, $2::integer) AS new_area (new_area),
    ST_Dump(ST_CollectionExtract(ST_MakeValid(ST_Transform(
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
        THEN new_area
        ELSE ST_Intersection((SELECT a FROM resp),
          ST_MakeValid(ST_Transform(new_area, (SELECT ST_SRID(a) FROM resp))))
        END,
      4326)), 3)) AS dmp
RETURNING id
`
)

// Do executes the actual waterway axis import.
func (wa *WaterwayArea) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway area")

	feedback.Info("Loading capabilities from %s", wa.URL)
	caps, err := wfs.GetCapabilities(wa.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wa.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wa.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wa.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wa.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wa.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wa.FeatureType, wa.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		outside           int
		features          int
	)

	if err := dl.Download(wa.User, wa.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		savepoint := Savepoint(ctx, tx, "feature")

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAreaProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var catccl sql.NullInt64
			if props.Catccl != nil {
				if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil {
					catccl = sql.NullInt64{Int64: value, Valid: true}
				}
			}
			var dirimp sql.NullInt64
			if props.Dirimp != nil {
				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
					dirimp = sql.NullInt64{Int64: value, Valid: true}
				}
			}

			switch feature.Geometry.Type {
			case "Polygon":
				var p polygonSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				var waid int64
				err := savepoint(func() error {
					err := insertStmt.QueryRowContext(
						ctx,
						p.asWKB(),
						epsg,
						catccl,
						dirimp,
					).Scan(&waid)
					return err
				})
				switch {
				case err == sql.ErrNoRows:
					outside++
					// ignore -> filtered by responsibility_areas
				case err != nil:
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				default:
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info("Features outside responsibility area: %d", outside)
	}

	if features == 0 {
		err := errors.New("no features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}