view pkg/imports/wp.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents ade07a3f2cfd
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// defaultPointToLinePrecision is the precision in meters
// to match from points to lines.
const defaultPointToLinePrecision = 10

// WaterwayProfiles is a Job to import waterway profiles
// from a given WFS service plus some uploaded CSV file
// and stores them into the database.
type WaterwayProfiles struct {
	Dir string `json:"dir"`
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// Precsion of match points to line strings.
	Precision *float64 `json:"precision,omitempty"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (wp *WaterwayProfiles) Description([]string) (string, error) {
	return wp.URL + "|" + wp.FeatureType, nil
}

// WPJobKind is the unique name of this import job type.
const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})
}

func (wpJobCreator) Create() Job { return new(WaterwayProfiles) }

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"
}

func (wpJobCreator) Depends() [2][]string {
	return [2][]string{
		{"waterway_profiles"},
		{"distance_marks_virtual"},
	}
}

const (
	createGeomTempTableSQL = `
CREATE TEMP TABLE wp_geoms (
  geom geography(linestring, 4326)
) ON COMMIT DROP`

	createTempIndexSQL = `
CREATE INDEX ON wp_geoms USING GIST(geom)`

	analyzeTempTableSQL = `ANALYZE wp_geoms`

	insertGeomTmpTableSQL = `
INSERT INTO wp_geoms (geom) VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::int), 4326)
)`
	hasDistanceMarkSQL = `
SELECT true FROM waterway.distance_marks_virtual
WHERE location_code =
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
LIMIT 1`

	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  location,
  geom,
  validity,
  lnwl,
  mwl,
  hnwl,
  fe30,
  fe100,
  date_info,
  source_organization
) VALUES (
  ($1, $2, $3, $4, $5),
  ( SELECT wp_geoms.geom
    FROM wp_geoms, waterway.distance_marks_virtual AS dmv
    WHERE dmv.location_code =
        ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
      AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float)
    ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true)
    LIMIT 1
  ),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
) RETURNING id, geom is NULL`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`
)

// StageDone moves the imported waterway profiles out of the staging area.
func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err
}

// CleanUp deletes temporary files from the filesystem.
func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) }

// Do performs the actual import.
func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	if err := wp.downloadGeometries(ctx, tx, feedback); err != nil {
		return nil, fmt.Errorf("error downloading geometries: %v", err)
	}

	summary, err := wp.processCSV(
		ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CSV: %v", err)
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"importing waterway profiles failed after %s: %v",
			time.Since(start), err)
	}

	feedback.Info("Importing waterway profiles took %s",
		time.Since(start))
	return summary, nil
}

func (wp *WaterwayProfiles) downloadGeometries(
	ctx context.Context,
	tx *sql.Tx,
	feedback Feedback,
) error {
	feedback.Info("Start downloading geometries from WFS.")

	feedback.Info("Loading capabilities from %s", wp.URL)
	caps, err := wfs.GetCapabilities(wp.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return err
	}

	ft := caps.FindFeatureType(wp.FeatureType)
	if ft == nil {
		return fmt.Errorf("unknown feature type '%s'", wp.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wp.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return err
	}

	if wp.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wp.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wp.FeatureType, wp.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return err
	}

	if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil {
		return err
	}

	if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
		return err
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL)
	if err != nil {
		return err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		features          int
	)

	if err := dl.Download(wp.User, wp.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					l.asWKB(),
					epsg,
				); err != nil {
					return err
				}
				features++
			case "MultiLineString":
				var ml multiLineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ml); err != nil {
					return err
				}
				for i := range ml {
					if _, err := insertStmt.ExecContext(
						ctx,
						ml[i].asWKB(),
						epsg,
					); err != nil {
						return err
					}
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		return errors.New("no features found")
	}
	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
		return err
	}
	return nil
}

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	}
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	}
	return sql.NullFloat64{Float64: v, Valid: true}, nil
}

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {
	feedback.Info("Start processing CSV file.")

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1
	)

	fields := []struct {
		idx    *int
		substr string
	}{
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},
	}

nextHeader:
	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
						fields[j].substr)
				}
				*fields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))
	}

	var precision float64
	if wp.Precision != nil {
		if precision = *wp.Precision; precision < 0 {
			precision = -precision
		}
	} else {
		precision = defaultPointToLinePrecision
	}

	feedback.Info(
		"Matching points to lines with a precision of %.4fm.", precision)

	parseDate := common.TimeParser([]string{"02.01.2006"}).Parse

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL)
	if err != nil {
		return nil, err
	}
	defer hasDistanceMarkStmt.Close()

	var ids []int64

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid ISRS location code in line %d: %v",
				line, err)
		}

		var dummy bool
		err = hasDistanceMarkStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
		).Scan(&dummy)

		switch {
		case err == sql.ErrNoRows:
			feedback.Warn("No virtual distance mark found for %s.", location)
			continue lines
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("unexpected result form database")
		}

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'valid_from' value in line %d: %v",
				line, err)
		}
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'valid_to' value in line %d: %v",
				line, err)
		}

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,
		}

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,
		}

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'lnwl' value in line %d: %v",
				line, err)
		}
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'mwl' value in line %d: %v",
				line, err)
		}
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'hnwl' value in line %d: %v",
				line, err)
		}
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'fe30' value in line %d: %v",
				line, err)
		}
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'fe100' value in line %d: %v",
				line, err)
		}

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"invalid 'date_info' value in line %d: %v",
				line, err)
		}

		source := row[sourceIdx]

		var id int64
		var noGeom bool

		if err := insertStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
			&validity,
			lnwl,
			mwl,
			hnwl,
			fe30,
			fe100,
			dateInfo,
			source,
			precision,
		).Scan(&id, &noGeom); err != nil {
			return nil, err
		}

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err
		}

		if noGeom {
			feedback.Warn(
				"No profile geometry found for %s in line %d.", location, line)
		}

		ids = append(ids, id)
	}
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")
	}

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}