view pkg/imports/fd.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents f2204f91d286
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019, 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"io"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// FairwayDimension is an import job to import
// the fairway dimensions in form of polygon geometries
// and attribute data from a WFS service.
type FairwayDimension struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType        string `json:"feature-type"`
	SortBy             string `json:"sort-by"`
	LOS                int    `json:"los"`
	MinWidth           int    `json:"min-width"`
	MaxWidth           int    `json:"max-width"`
	Depth              int    `json:"depth"`
	SourceOrganization string `json:"source-organization"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (fd *FairwayDimension) Description([]string) (string, error) {
	return strings.Join([]string{
		fd.URL,
		fd.FeatureType,
		fmt.Sprintf("LOS%d", fd.LOS),
	}, "|"), nil
}

type fdTime struct{ time.Time }

var guessFDTime = common.TimeParser([]string{
	"20060102",
	"2006",
	"",
}).Parse

func (fdt *fdTime) UnmarshalJSON(data []byte) error {
	var s string
	if err := json.Unmarshal(data, &s); err != nil {
		return err
	}
	t, err := guessFDTime(s)
	if err == nil {
		*fdt = fdTime{t}
	}
	return err
}

// FDJobKind is the import queue type identifier.
const FDJobKind JobKind = "fd"

type fdJobCreator struct{}

func init() {
	RegisterJobCreator(FDJobKind, fdJobCreator{})
}

func (fdJobCreator) Description() string { return "fairway dimension" }

func (fdJobCreator) AutoAccept() bool { return false }

func (fdJobCreator) Create() Job { return new(FairwayDimension) }

func (fdJobCreator) Depends() [2][]string {
	return [2][]string{
		{"fairway_dimensions"},
		{"level_of_service"},
	}
}

// StageDone replaces fairway dimensions with those in the staging area
func (fdJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL, id); err != nil {
		return err
	}

	_, err := tx.ExecContext(ctx, fdStageDoneSQL, id)
	return err
}

// CleanUp for fairway dimension imports is a NOP.
func (*FairwayDimension) CleanUp() error { return nil }

type fairwayDimensionProperties struct {
	HydroSorDat *fdTime `json:"hydro_sordat"`
}

type fdSummary struct {
	Lat float64 `json:"lat"`
	Lon float64 `json:"lon"`
	ID  int64   `json:"id"`
}

const (
	fdStageDoneSQL = `
UPDATE waterway.fairway_dimensions SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.fairway_dimensions'::regclass)`

	deleteFairwayDimensionSQL = `
-- Delete entries to be replaced by those in staging area
DELETE FROM waterway.fairway_dimensions
  WHERE id IN (
    SELECT key FROM import.track_imports
      WHERE import_id = $1
        AND relation = 'waterway.fairway_dimensions'::regclass
        AND deletion)
`
	// Temporary table to collect IDs of unchanged entries
	tmpTableSQL = `
CREATE TEMP TABLE unchanged (id int PRIMARY KEY) ON COMMIT DROP
`
	// The ST_MakeValid and ST_Buffer below are a workarround to
	// avoid errors due to reprojection.
	insertFairwayDimensionSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(
    CASE WHEN pg_has_role('sys_admin', 'MEMBER')
        OR ST_Covers((SELECT a FROM resp),
          ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))
      THEN new_fd
      ELSE ST_Intersection(
          (SELECT ST_Buffer(a, -0.0001) FROM resp),
          ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
      END,
    4326)), 3)) AS new_fd
  FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((SELECT a FROM resp),
      ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
),
not_new AS (
  -- Collect IDs of unchanged entries in temp table
  INSERT INTO unchanged
    SELECT id
      FROM g, waterway.fairway_dimensions
      WHERE staging_done
        AND validity @> current_timestamp
        AND (area, level_of_service,
            min_width, max_width, min_depth, source_organization
          ) IS NOT DISTINCT FROM (
            new_fd, $3, $4, $5, $6, $8)
    -- Return something if a duplicate in the data source is encountered
    ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
    RETURNING 1
)
INSERT INTO waterway.fairway_dimensions (
  area,
  level_of_service,
  min_width,
  max_width,
  min_depth,
  date_info,
  source_organization)
SELECT
    new_fd, $3, $4, $5, $6, $7, $8
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM not_new)
RETURNING id,
  ST_X(ST_Centroid(area::geometry)),
  ST_Y(ST_Centroid(area::geometry))
 `
	// Fetch IDs of entries removed from data source
	selectOldSQL = `
WITH resp AS (
  SELECT users.current_user_area_utm() AS a
)
SELECT id FROM waterway.fairway_dimensions
  WHERE staging_done
    AND validity @> current_timestamp
    AND level_of_service = $1
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Covers((SELECT a FROM resp),
        ST_Transform(CAST(area AS geometry), (SELECT ST_SRID(a) FROM resp))))
    AND id NOT IN (SELECT id FROM unchanged)
`
	invalidateFairwayDimensionSQL = `
WITH track AS (
  -- Mark entry for deletion that has been removed from the data source
  INSERT INTO import.track_imports (import_id, deletion, relation, key)
    VALUES($1, true, 'waterway.fairway_dimensions', $2)
)
-- Insert historic version with respective validity
INSERT INTO waterway.fairway_dimensions (
  area,
  validity,
  level_of_service,
  min_width,
  max_width,
  min_depth,
  date_info,
  source_organization)
SELECT
    area,
    tstzrange(lower(validity), current_timestamp),
    level_of_service,
    min_width,
    max_width,
    min_depth,
    date_info,
    source_organization
  FROM waterway.fairway_dimensions
  WHERE id = $2
RETURNING id,
  ST_X(ST_Centroid(area::geometry)),
  ST_Y(ST_Centroid(area::geometry))
`
)

// Do executes the actual fairway dimension import.
func (fd *FairwayDimension) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import fairway dimensions")

	feedback.Info("Loading capabilities from %s", fd.URL)
	caps, err := wfs.GetCapabilities(fd.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(fd.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", fd.FeatureType)
	}

	feedback.Info("Found feature type '%s'", fd.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	dl, err := wfs.GetFeatures(caps, fd.FeatureType, fd.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	if _, err := tx.ExecContext(ctx, tmpTableSQL); err != nil {
		return nil, err
	}

	insertStmt, err := tx.PrepareContext(ctx, insertFairwayDimensionSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	invalidateStmt, err := tx.PrepareContext(
		ctx, invalidateFairwayDimensionSQL)
	if err != nil {
		return nil, err
	}
	defer invalidateStmt.Close()

	savepoint := Savepoint(ctx, tx, "feature")

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		features          int
		outside           int
		fds               []fdSummary
	)

	if err := dl.Download(fd.User, fd.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		feedback.Info(
			"Found %d features in data source", len(rfc.Features))

	features:
		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props fairwayDimensionProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				feedback.Warn("bad property: %v", err)
				badProperties++
				continue
			}
			var dateInfo time.Time
			if props.HydroSorDat == nil || props.HydroSorDat.IsZero() {
				dateInfo = start
			} else {
				dateInfo = (*props.HydroSorDat).Time
			}

			var polys multiPolygonSlice

			switch feature.Geometry.Type {
			case "Polygon":
				var p polygonSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				polys = multiPolygonSlice{p}

			case "MultiPolygon":
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &polys); err != nil {
					return err
				}
			default:
				unsupported[feature.Geometry.Type]++
				continue features
			}

			// Store the features.
		storePolygons:
			for _, p := range polys {
				var fdid int64
				var lat, lon float64
				switch err := savepoint(func() error {
					return insertStmt.QueryRowContext(
						ctx,
						p.asWKB(),
						epsg,
						fd.LOS,
						fd.MinWidth,
						fd.MaxWidth,
						fd.Depth,
						dateInfo,
						fd.SourceOrganization,
					).Scan(&fdid, &lat, &lon)
				}); {
				case err == sql.ErrNoRows:
					outside++
					// ignore -> filtered by responsibility area (stretches)
					continue storePolygons
				case err != nil:
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					continue storePolygons
				}
				// Store for potential later removal.
				if err := track(
					ctx, tx, importID, "waterway.fairway_dimensions", fdid,
				); err != nil {
					return err
				}
				fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon})

				features++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outside > 0 {
		feedback.Info(
			"Features outside responsibility area or unchanged: %d", outside)
	}

	if features == 0 {
		feedback.Info("No new features found")
	} else {
		feedback.Info("Stored %d features", features)
	}

	// Invalidate features that have been removed from data source
	res, err := tx.QueryContext(ctx, selectOldSQL, fd.LOS)
	if err != nil {
		return nil, err
	}
	defer res.Close()
	var oldIDs []int64
	for res.Next() {
		var oldID int64
		if err := res.Scan(&oldID); err != nil {
			return nil, err
		}
		oldIDs = append(oldIDs, oldID)
	}
	if err := res.Err(); err != nil {
		return nil, err
	}

	if features == 0 && len(oldIDs) == 0 {
		return nil, UnchangedError("Nothing changed")
	}

	if len(oldIDs) > 0 {
		feedback.Info(
			"Number of features removed from data source: %d", len(oldIDs))

		var old int
		for _, oldID := range oldIDs {
			var fdid int64
			var lat, lon float64
			if err := savepoint(func() error {
				return invalidateStmt.QueryRowContext(
					ctx,
					importID,
					oldID,
				).Scan(&fdid, &lat, &lon)
			}); err != nil {
				feedback.Error(pgxutils.ReadableError{Err: err}.Error()+
					"- while tracking invalidation of: %d", oldID)
				continue
			}
			fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon})

			if err := track(
				ctx, tx, importID, "waterway.fairway_dimensions", fdid,
			); err != nil {
				return nil, err
			}

			old++
		}

		// Do not fail if features > 0 because otherwise new features are lost
		if features == 0 && old == 0 {
			return nil, fmt.Errorf("invalidating features failed")
		}

		if old > 0 {
			feedback.Info("Number of features invalidated: %d", old)
		}
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	summary := struct {
		Date               time.Time   `json:"date"`
		LOS                int         `json:"los"`
		SourceOrganization string      `json:"source-organization"`
		FdArea             []fdSummary `json:"fd-area"`
	}{
		Date:               time.Now(),
		LOS:                fd.LOS,
		SourceOrganization: fd.SourceOrganization,
		FdArea:             fds,
	}
	return &summary, err
}