view pkg/imports/wp.go @ 3705:7006b92c0334

Handle updates (vs. historized and new versions) separately. We need this distinction as updated data currently can not be reviewed. More precisely: it can not be declined after review, as the old data is updated in place. The current exclusion from the review is a workaround and not meant to be the final solution. Note that there are additional minor problems, like the fact that the updated data is not counted as changed data for the import.
author Sascha Wilde <wilde@intevation.de>
date Wed, 19 Jun 2019 17:00:08 +0200
parents 4acbee65275d
children eb2f949ddfa2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// defaultPointToLinePrecision is the precision in meters
// to match from points to lines.
const defaultPointToLinePrecision = 10

type WaterwayProfiles struct {
	Dir string `json:"dir"`
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// Precsion of match points to line strings.
	Precision *float64 `json:"precision,omitempty"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})
}

func (wpJobCreator) Create() Job { return new(WaterwayProfiles) }

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"
}

func (wpJobCreator) Depends() [2][]string {
	return [2][]string{
		{"waterway_profiles"},
		{"distance_marks_virtual"},
	}
}

const (
	createGeomTempTableSQL = `
CREATE TEMP TABLE wp_geoms (
  geom geography(linestring, 4326)
) ON COMMIT DROP`

	createTempIndexSQL = `
CREATE INDEX ON wp_geoms USING GIST(geom)`

	analyzeTempTableSQL = `ANALYZE wp_geoms`

	insertGeomTmpTableSQL = `
INSERT INTO wp_geoms (geom) VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::int), 4326)
)`
	hasDistanceMarkSQL = `
SELECT true FROM waterway.distance_marks_virtual
WHERE location_code =
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
LIMIT 1`

	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  location,
  geom,
  validity,
  lnwl,
  mwl,
  hnwl,
  fe30,
  fe100,
  date_info,
  source_organization
) VALUES (
  ($1, $2, $3, $4, $5),
  ( SELECT wp_geoms.geom
    FROM wp_geoms, waterway.distance_marks_virtual AS dmv
    WHERE dmv.location_code =
        ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
      AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float)
    ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true)
    LIMIT 1
  ),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
) RETURNING id, geom is NULL`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`
)

func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err
}

func (wp *WaterwayProfiles) CleanUp() error {
	return os.RemoveAll(wp.Dir)
}

func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	if err := wp.downloadGeometries(
		ctx, importID, tx, start, feedback); err != nil {
		return nil, fmt.Errorf("error downloading geometries: %v", err)
	}

	summary, err := wp.processCSV(
		ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CSV: %v", err)
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"Importing waterway profiles failed after %s: %v",
			time.Since(start), err)
	}

	feedback.Info("Importing waterway profiles took %s",
		time.Since(start))
	return summary, nil
}

func (wp *WaterwayProfiles) downloadGeometries(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) error {
	feedback.Info("Start downloading geometries from WFS.")

	feedback.Info("Loading capabilities from %s", wp.URL)
	caps, err := wfs.GetCapabilities(wp.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return err
	}

	ft := caps.FindFeatureType(wp.FeatureType)
	if ft == nil {
		return fmt.Errorf("Unknown feature type '%s'", wp.FeatureType)
	}

	feedback.Info("Found feature type '%s", wp.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return err
	}

	if wp.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wp.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, wp.FeatureType, wp.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return err
	}

	if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil {
		return err
	}

	if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
		return err
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL)
	if err != nil {
		return err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		features          int
	)

	if err := dl.Download(wp.User, wp.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					l.asWKB(),
					epsg,
				); err != nil {
					return err
				}
				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		return errors.New("No features found")
	}
	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
		return err
	}
	return nil
}

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	}
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	}
	return sql.NullFloat64{Float64: v, Valid: true}, nil
}

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {
	feedback.Info("Start processing CSV file.")

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1
	)

	fields := []struct {
		idx    *int
		substr string
	}{
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},
	}

nextHeader:
	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
						fields[j].substr)
				}
				*fields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))
	}

	var precision float64
	if wp.Precision != nil {
		if precision = *wp.Precision; precision < 0 {
			precision = -precision
		}
	} else {
		precision = defaultPointToLinePrecision
	}

	feedback.Info(
		"Matching points to lines with a precision of %.4fm.", precision)

	parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL)
	if err != nil {
		return nil, err
	}
	defer hasDistanceMarkStmt.Close()

	var ids []int64

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid ISRS location code in line %d: %v",
				line, err)
		}

		var dummy bool
		err = hasDistanceMarkStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
		).Scan(&dummy)

		switch {
		case err == sql.ErrNoRows:
			feedback.Warn("No virtual distance mark found for %s.", location)
			continue lines
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("unexpected result form database")
		}

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_from' value in line %d: %v",
				line, err)
		}
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_to' value in line %d: %v",
				line, err)
		}

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,
		}

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,
		}

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'lnwl' value in line %d: %v",
				line, err)
		}
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'mwl' value in line %d: %v",
				line, err)
		}
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'hnwl' value in line %d: %v",
				line, err)
		}
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe30' value in line %d: %v",
				line, err)
		}
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe100' value in line %d: %v",
				line, err)
		}

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"Invalid 'date_info' value in line %d: %v",
				line, err)
		}

		source := row[sourceIdx]

		var id int64
		var noGeom bool

		if err := insertStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
			&validity,
			lnwl,
			mwl,
			hnwl,
			fe30,
			fe100,
			dateInfo,
			source,
			precision,
		).Scan(&id, &noGeom); err != nil {
			return nil, err
		}

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err
		}

		if noGeom {
			feedback.Warn(
				"No profile geometry found for %s in line %d.", location, line)
		}

		ids = append(ids, id)
	}
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")
	}

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}