view pkg/imports/wp.go @ 2455:54c9fe587fe6

Subdivide SQL function to prepare for improved error handling The context of an error (e.g. the function in which it occured) can be inferred by the database client. Not doing all in one statement will render the context more meaningful.
author Tom Gottfried <tom@intevation.de>
date Fri, 01 Mar 2019 18:38:02 +0100
parents 7c83b5277c1c
children 02505fcff63c
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// defaultPointToLinePrecision is the precision in meters
// to match from points to lines.
const defaultPointToLinePrecision = 10

type WaterwayProfiles struct {
	Dir string `json:"dir"`
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// Precsion of match points to line strings.
	Precision *float64 `json:"precision,omitempty"`
}

const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})
}

func (wpJobCreator) Create() Job { return new(WaterwayProfiles) }

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"
}

func (wpJobCreator) Depends() []string {
	return []string{
		"waterway_profiles",
	}
}

const (
	createGeomTempTableSQL = `
CREATE TEMP TABLE wp_geoms (
  geom geography(linestring, 4326)
) ON COMMIT DROP`

	createTempIndexSQL = `
CREATE INDEX ON wp_geoms USING GIST(geom)`

	analyzeTempTableSQL = `ANALYZE wp_geoms`

	insertGeomTmpTableSQL = `
INSERT INTO wp_geoms (geom) VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::int), 4326)
)`
	hasDistanceMarkSQL = `
SELECT true FROM waterway.distance_marks_virtual
WHERE location_code =
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
LIMIT 1`

	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  location,
  geom,
  validity,
  lnwl,
  mwl,
  hnwl,
  fe30,
  fe100,
  date_info,
  source_organization
) VALUES (
  ($1, $2, $3, $4, $5),
  ( SELECT wp_geoms.geom
    FROM wp_geoms, waterway.distance_marks_virtual AS dmv
    WHERE dmv.location_code =
        ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
      AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float)
    ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true)
    LIMIT 1
  ),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
) RETURNING id, geom is NULL`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`
)

func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err
}

func (wp *WaterwayProfiles) CleanUp() error {
	return os.RemoveAll(wp.Dir)
}

func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	if err := wp.downloadGeometries(
		ctx, importID, tx, start, feedback); err != nil {
		return nil, fmt.Errorf("error downloading geometries: %v", err)
	}

	summary, err := wp.processCSV(
		ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CSV: %v", err)
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"Importing waterway profiles failed after %s: %v",
			time.Since(start), err)
	}

	feedback.Info("Importing waterway profiles took %s",
		time.Since(start))
	return summary, nil
}

func (wp *WaterwayProfiles) downloadGeometries(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) error {
	feedback.Info("Start downloading geometries from WFS.")

	feedback.Info("Loading capabilities from %s", wp.URL)
	caps, err := wfs.GetCapabilities(wp.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return err
	}

	ft := caps.FindFeatureType(wp.FeatureType)
	if ft == nil {
		return fmt.Errorf("Unknown feature type '%s'", wp.FeatureType)
	}

	feedback.Info("Found feature type '%s", wp.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return err
	}

	if wp.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wp.SortBy)
	}

	urls, err := wfs.GetFeaturesGET(
		caps, wp.FeatureType, "application/json", wp.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return err
	}

	if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil {
		return err
	}

	if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
		return err
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL)
	if err != nil {
		return err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		features          int
	)

	if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		for _, feature := range rfc.Features {
			if feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l lineSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					l.asWKB(),
					epsg,
				); err != nil {
					return err
				}
				features++
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return err
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if features == 0 {
		return errors.New("No features found")
	}
	if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
		return err
	}
	return nil
}

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	}
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	}
	return sql.NullFloat64{Float64: v, Valid: true}, nil
}

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {
	feedback.Info("Start processing CSV file.")

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1
	)

	fields := []struct {
		idx    *int
		substr string
	}{
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},
	}

nextHeader:
	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
						fields[j].substr)
				}
				*fields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))
	}

	var precision float64
	if wp.Precision != nil {
		if precision = *wp.Precision; precision < 0 {
			precision = -precision
		}
	} else {
		precision = defaultPointToLinePrecision
	}

	feedback.Info(
		"Matching points to lines with a precision of %.4fm.", precision)

	parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL)
	if err != nil {
		return nil, err
	}
	defer hasDistanceMarkStmt.Close()

	var ids []int64

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid ISRS location code in line %d: %v",
				line, err)
		}

		var dummy bool
		err = hasDistanceMarkStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
		).Scan(&dummy)

		switch {
		case err == sql.ErrNoRows:
			feedback.Warn("No virtual distance mark found for %s.", location)
			continue lines
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("unexpected result form database")
		}

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_from' value in line %d: %v",
				line, err)
		}
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_to' value in line %d: %v",
				line, err)
		}

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,
		}

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,
		}

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'lnwl' value in line %d: %v",
				line, err)
		}
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'mwl' value in line %d: %v",
				line, err)
		}
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'hnwl' value in line %d: %v",
				line, err)
		}
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe30' value in line %d: %v",
				line, err)
		}
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe100' value in line %d: %v",
				line, err)
		}

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"Invalid 'date_info' value in line %d: %v",
				line, err)
		}

		source := row[sourceIdx]

		var id int64
		var noGeom bool

		if err := insertStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
			&validity,
			lnwl,
			mwl,
			hnwl,
			fe30,
			fe100,
			dateInfo,
			source,
			precision,
		).Scan(&id, &noGeom); err != nil {
			return nil, err
		}

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err
		}

		if noGeom {
			feedback.Warn(
				"No profile geometry found for %s in line %d.", location, line)
		}

		ids = append(ids, id)
	}
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")
	}

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}