view pkg/imports/wa.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents ade07a3f2cfd
children 2dd155cc95ec
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayArea is an import job to import
// the waterway area in form of polygon geometries
// and attribute data from a WFS service.
type WaterwayArea struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (wa *WaterwayArea) Description([]string) (string, error) {
	return wa.URL + "|" + wa.FeatureType, nil
}

// WAJobKind is the import queue type identifier.
const WAJobKind JobKind = "wa"

type waJobCreator struct{}

func init() {
	RegisterJobCreator(WAJobKind, waJobCreator{})
}

func (waJobCreator) Description() string { return "waterway area" }

func (waJobCreator) AutoAccept() bool { return true }

func (waJobCreator) Create() Job { return new(WaterwayArea) }

func (waJobCreator) Depends() [2][]string {
	return [2][]string{{"waterway_area"}}
}

// StageDone is a NOP for waterway area imports.
func (waJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp for waterway area imports is a NOP.
func (*WaterwayArea) CleanUp() error { return nil }

type waterwayAreaProperties struct {
	Catccl *string `json:"ienc_catccl"`
	Dirimp *string `json:"ienc_dirimp"`
}

const (
	deleteWaterwayAreaSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
DELETE FROM waterway.waterway_area
WHERE pg_has_role('sys_admin', 'MEMBER')
  OR ST_Covers((SELECT a FROM resp),
    ST_Transform(area::geometry, (SELECT ST_SRID(a) FROM resp)))
`
	insertWaterwayAreaSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
INSERT INTO waterway.waterway_area (area, catccl, dirimp)
SELECT dmp.geom, $3, $4
  FROM ST_GeomFromWKB($1, $2::integer) AS new_area (new_area),
    ST_Dump(ST_CollectionExtract(ST_MakeValid(ST_Transform(
      CASE WHEN pg_has_role('sys_admin', 'MEMBER')
        THEN new_area
        ELSE ST_Intersection((SELECT a FROM resp),
          ST_MakeValid(ST_Transform(new_area, (SELECT ST_SRID(a) FROM resp))))
        END,
      4326)), 3)) AS dmp
RETURNING id
`
)

// Do executes the actual waterway axis import.
func (wa *WaterwayArea) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway area")

	feedback.Info("Loading capabilities from %s", wa.URL)
	caps, err := wfs.GetCapabilities(wa.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wa.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", wa.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wa.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wa.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wa.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wa.FeatureType, wa.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil {
		return nil, err
	}

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		outside           int
		features          int
	)

	if err := dl.Download(wa.User, wa.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		savepoint := Savepoint(ctx, tx, "feature")

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAreaProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var catccl sql.NullInt64
			if props.Catccl != nil {
				if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil {
					catccl = sql.NullInt64{Int64: value, Valid: true}
				}
			}
			var dirimp sql.NullInt64
			if props.Dirimp != nil {
				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
					dirimp = sql.NullInt64{Int64: value, Valid: true}
				}
			}

			switch feature.Geometry.Type {
			case "Polygon":
				var p polygonSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				var waid int64
				err := savepoint(func() error {
					err := insertStmt.QueryRowContext(
						ctx,
						p.asWKB(),
						epsg,
						catccl,
						dirimp,
					).Scan(&waid)
					return err
				})
				switch {
				case err == sql.ErrNoRows:
					outside++
					// ignore -> filtered by responsibility_areas
				case err != nil:
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				default:
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info("Features outside responsibility area: %d", outside)
	}

	if features == 0 {
		err := errors.New("no features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}