view pkg/imports/wp.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 41a67619c170
children ade07a3f2cfd
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// defaultPointToLinePrecision is the precision in meters
// to match from points to lines.
const defaultPointToLinePrecision = 10

// WaterwayProfiles is a Job to import waterway profiles
// from a given WFS service plus some uploaded CSV file
// and stores them into the database.
type WaterwayProfiles struct {
	Dir string `json:"dir"`
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// Precsion of match points to line strings.
	Precision *float64 `json:"precision,omitempty"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (wp *WaterwayProfiles) Description() (string, error) {
	return wp.URL + "|" + wp.FeatureType, nil
}

// WPJobKind is the unique name of this import job type.
const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})
}

func (wpJobCreator) Create() Job { return new(WaterwayProfiles) }

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"
}

func (wpJobCreator) Depends() [2][]string {
	return [2][]string{
		{"waterway_profiles"},
		{"distance_marks_virtual"},
	}
}

const (
	createGeomTempTableSQL = `
CREATE TEMP TABLE wp_geoms (
  geom geography(linestring, 4326)
) ON COMMIT DROP`

	createTempIndexSQL = `
CREATE INDEX ON wp_geoms USING GIST(geom)`

	analyzeTempTableSQL = `ANALYZE wp_geoms`

	insertGeomTmpTableSQL = `
INSERT INTO wp_geoms (geom) VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::int), 4326)
)`
	hasDistanceMarkSQL = `
SELECT true FROM waterway.distance_marks_virtual
WHERE location_code =
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
LIMIT 1`

	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  location,
  geom,
  validity,
  lnwl,
  mwl,
  hnwl,
  fe30,
  fe100,
  date_info,
  source_organization
) VALUES (
  ($1, $2, $3, $4, $5),
  ( SELECT wp_geoms.geom
    FROM wp_geoms, waterway.distance_marks_virtual AS dmv
    WHERE dmv.location_code =
        ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
      AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float)
    ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true)
    LIMIT 1
  ),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
) RETURNING id, geom is NULL`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`
)

// StageDone moves the imported waterway profiles out of the staging area.
func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err
}

// CleanUp deletes temporary files from the filesystem.
func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) }

// Do performs the actual import.
func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	if err := wp.downloadGeometries(
		ctx, importID, tx, start, feedback); err != nil {
		return nil, fmt.Errorf("error downloading geometries: %v", err)
	}

	summary, err := wp.processCSV(
		ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CSV: %v", err)
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"importing waterway profiles failed after %s: %v",
			time.Since(start), err)
	}

	feedback.Info("Importing waterway profiles took %s",
		time.Since(start))
	return summary, nil
}

func (wp *WaterwayProfiles) downloadGeometries(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) error {
	feedback.Info("Start downloading geometries from WFS.")

	feedback.Info("Loading capabilities from %s", wp.URL)
	caps, err := wfs.GetCapabilities(wp.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return err
	}

	ft := caps.FindFeatureType(wp.FeatureType)
	if ft == nil {
		return fmt.Errorf("unknown feature type '%s'", wp.FeatureType)
	}

	feedback.Info("Found feature type '%s'", wp.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return err
	}

	if wp.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wp.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wp.FeatureType, wp.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return err
	}

	if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil {
		return err
	}

	if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
		return err
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL)
	if err != nil {
		return err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		features          int
	)

	if err := dl.Download(wp.User, wp.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					l.asWKB(),
					epsg,
				); err != nil {
					return err
				}
				features++
			case "MultiLineString":
				var ml multiLineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ml); err != nil {
					return err
				}
				for i := range ml {
					if _, err := insertStmt.ExecContext(
						ctx,
						ml[i].asWKB(),
						epsg,
					); err != nil {
						return err
					}
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		return errors.New("no features found")
	}
	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
		return err
	}
	return nil
}

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	}
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	}
	return sql.NullFloat64{Float64: v, Valid: true}, nil
}

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {
	feedback.Info("Start processing CSV file.")

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1
	)

	fields := []struct {
		idx    *int
		substr string
	}{
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},
	}

nextHeader:
	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
						fields[j].substr)
				}
				*fields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))
	}

	var precision float64
	if wp.Precision != nil {
		if precision = *wp.Precision; precision < 0 {
			precision = -precision
		}
	} else {
		precision = defaultPointToLinePrecision
	}

	feedback.Info(
		"Matching points to lines with a precision of %.4fm.", precision)

	parseDate := common.TimeParser([]string{"02.01.2006"}).Parse

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL)
	if err != nil {
		return nil, err
	}
	defer hasDistanceMarkStmt.Close()

	var ids []int64

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid ISRS location code in line %d: %v",
				line, err)
		}

		var dummy bool
		err = hasDistanceMarkStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
		).Scan(&dummy)

		switch {
		case err == sql.ErrNoRows:
			feedback.Warn("No virtual distance mark found for %s.", location)
			continue lines
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("unexpected result form database")
		}

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'valid_from' value in line %d: %v",
				line, err)
		}
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'valid_to' value in line %d: %v",
				line, err)
		}

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,
		}

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,
		}

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'lnwl' value in line %d: %v",
				line, err)
		}
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'mwl' value in line %d: %v",
				line, err)
		}
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'hnwl' value in line %d: %v",
				line, err)
		}
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'fe30' value in line %d: %v",
				line, err)
		}
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"invalid 'fe100' value in line %d: %v",
				line, err)
		}

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"invalid 'date_info' value in line %d: %v",
				line, err)
		}

		source := row[sourceIdx]

		var id int64
		var noGeom bool

		if err := insertStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
			&validity,
			lnwl,
			mwl,
			hnwl,
			fe30,
			fe100,
			dateInfo,
			source,
			precision,
		).Scan(&id, &noGeom); err != nil {
			return nil, err
		}

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err
		}

		if noGeom {
			feedback.Warn(
				"No profile geometry found for %s in line %d.", location, line)
		}

		ids = append(ids, id)
	}
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")
	}

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}