view pkg/imports/wp.go @ 2130:f3aabc05f9b2

Fix constraints on waterway profiles staging_done in the UNIQUE constraint had no effect, because the exclusion constraint prevented two rows with equal location and validity anyhow. Adding staging_done to the exclusion constraint makes the UNIQUE constraint checking only a corner case of what the exclusion constraint checks. Thus, remove the UNIQUE constraint. Casting staging_done to int is needed because there is no appropriate operator class for booleans. Casting to smallint or even bit would have been better (i.e. should result in smaller index size), but that would have required creating such a CAST, in addition.
author Tom Gottfried <>
date Wed, 06 Feb 2019 15:42:32 +0100
parents a1f2cfa624cf
children f0641b5ad065
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see and license for details.
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
// Author(s):
//  * Sascha L. Teichmann <>

package imports

import (



// defaultPointToLinePrecision is the precision in meters
// to match from points to lines.
const defaultPointToLinePrecision = 10

type WaterwayProfiles struct {
	Dir string `json:"dir"`
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// Precsion of match points to line strings.
	Precision *float64 `json:"precision,omitempty"`

const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})

func (wpJobCreator) Create(_ JobKind, data string) (Job, error) {
	wp := new(WaterwayProfiles)
	if err := common.FromJSONString(data, wp); err != nil {
		return nil, err
	return wp, nil

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"

func (wpJobCreator) Depends() []string {
	return []string{

const (
	createGeomTempTableSQL = `
  geom geography(linestring, 4326)

	createTempIndexSQL = `

	analyzeTempTableSQL = `ANALYZE wp_geoms`

	insertGeomTmpTableSQL = `
INSERT INTO wp_geoms (geom) VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::int), 4326)

	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  ($1, $2, $3, $4, $5),
  ( SELECT wp_geoms.geom
    FROM wp_geoms, waterway.distance_marks_virtual AS dmv
    WHERE dmv.location_code =
        ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
      AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float)
    ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true)
    LIMIT 1
) RETURNING id, geom is NULL`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`

func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err

func (wp *WaterwayProfiles) CleanUp() error {
	return os.RemoveAll(wp.Dir)

func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	defer tx.Rollback()

	if err := wp.downloadGeometries(
		ctx, importID, tx, start, feedback); err != nil {
		return nil, fmt.Errorf("error downloading geometries: %v", err)

	summary, err := wp.processCSV(
		ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CVS: %v", err)

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"Importing waterway profiles failed after %s: %v",
			time.Since(start), err)

	feedback.Info("Importing waterway profiles took %s",
	return summary, nil

func (wp *WaterwayProfiles) downloadGeometries(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) error {
	feedback.Info("Start downloading geometries from WFS.")

	feedback.Info("Loading capabilities from %s", wp.URL)
	caps, err := wfs.GetCapabilities(wp.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return err

	ft := caps.FindFeatureType(wp.FeatureType)
	if ft == nil {
		return fmt.Errorf("Unknown feature type '%s'", wp.FeatureType)

	feedback.Info("Found feature type '%s", wp.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return err

	if wp.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wp.SortBy)

	urls, err := wfs.GetFeaturesGET(
		caps, wp.FeatureType, "application/json", wp.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return err

	if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil {
		return err

	if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
		return err

	insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL)
	if err != nil {
		return err
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		features          int

	if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err

		// No features -> ignore.
		if rfc.Features == nil {
			return nil

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				if _, err := insertStmt.ExecContext(
				); err != nil {
					return err
		return nil
	}); err != nil {
		return err

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)

	if features == 0 {
		return errors.New("No features found")
	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
		return err
	return nil

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	return sql.NullFloat64{Float64: v, Valid: true}, nil

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {
	feedback.Info("Start processing CSV file.")

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1

	fields := []struct {
		idx    *int
		substr string
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},

	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
				*fields[j].idx = i
				continue nextHeader

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))

	var precision float64
	if wp.Precision != nil {
		if precision = *wp.Precision; precision < 0 {
			precision = -precision
	} else {
		precision = defaultPointToLinePrecision

		"Matching points to lines with a precision of %.4fm.", precision)

	parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	defer trackStmt.Close()

	var ids []int64

	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid ISRS location code in line %d: %v",
				line, err)

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_from' value in line %d: %v",
				line, err)
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_to' value in line %d: %v",
				line, err)

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'lnwl' value in line %d: %v",
				line, err)
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'mwl' value in line %d: %v",
				line, err)
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'hnwl' value in line %d: %v",
				line, err)
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe30' value in line %d: %v",
				line, err)
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe100' value in line %d: %v",
				line, err)

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"Invalid 'date_info' value in line %d: %v",
				line, err)

		source := row[sourceIdx]

		var id int64
		var noGeom bool

		if err := insertStmt.QueryRowContext(
		).Scan(&id, &noGeom); err != nil {
			return nil, err

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err

		if noGeom {
				"No profile geometry found for %s in line %d.", location, line)

		ids = append(ids, id)
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
		IDs: ids,
	return &summary, nil