view pkg/imports/fm.go @ 4895:9f799077a3e6 fairway-marks-import

Prevent importing non-distinct fairway marks
author Tom Gottfried <tom@intevation.de>
date Tue, 14 Jan 2020 18:24:51 +0100
parents 8eb36d0d5bdf
children 53d929f658f3
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// FairwayMarks is an import job to import
// fairway marks in form of point geometries
// and attribute data from a WFS service.
type FairwayMarks struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
	// User is an optional username for Basic Auth.
	User string `json:"user,omitempty"`
	// Password is an optional password for Basic Auth.
	Password string `json:"password,omitempty"`
}

// Description gives a short info about relevant facts of this import.
func (fm *FairwayMarks) Description() (string, error) {
	return fm.URL + "|" + fm.FeatureType, nil
}

// FMJobKind is the import queue type identifier.
const FMJobKind JobKind = "fm"

type fmJobCreator struct{}

func init() {
	RegisterJobCreator(FMJobKind, fmJobCreator{})
}

func (fmJobCreator) Description() string { return "fairway marks" }

func (fmJobCreator) AutoAccept() bool { return true }

func (fmJobCreator) Create() Job { return new(FairwayMarks) }

func (fmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"fairway_marks"},
		{},
	}
}

// StageDone is a NOP for fairway marks imports.
func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for fairway marks imports is a NOP.
func (*FairwayMarks) CleanUp() error { return nil }

type fairwayMarksProperties struct {
	Datsta      *string `json:"hydro_datsta"`
	Datend      *string `json:"hydro_datend"`
	Persta      *string `json:"hydro_persta"`
	Perend      *string `json:"hydro_perend"`
	Objnam      *string `json:"hydro_objnam"`
	Nobjnm      *string `json:"hydro_nobjnm"`
	Inform      *string `json:"hydro_inform"`
	Ninfom      *string `json:"hydro_ninfom"`
	Scamin      *int    `json:"hydro_scamin"`
	Picrep      *string `json:"hydro_picrep"`
	Txtdsc      *string `json:"hydro_txtdsc"`
	Sordat      *string `json:"hydro_sordat"`
	Sorind      *string `json:"hydro_sorind"`
	Colour      *string `json:"hydro_colour"`
	Colpat      *string `json:"hydro_colpat"`
	Condtn      *int    `json:"hydro_condtn"`
	Bcnshp      *int    `json:"hydro_bcnshp"`
	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
	Dirimp      *string `json:"ienc_dirimp,omitempty"`
}

const (
	insertFairwayMarksSQL = `
with a as (
  select users.current_user_area_utm() AS a
)
INSERT INTO waterway.fairway_marks (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  colour,
  colpat,
  condtn,
  bcnshp,
  catlam,
  dirimp
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
ON CONFLICT (
    CAST((0, geom,
      datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
      scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn,
      bcnshp, catlam, dirimp) AS waterway.fairway_marks)
  )
  DO NOTHING
RETURNING id
`
)

// Do executes the actual fairway marks import.
func (fm *FairwayMarks) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import fairway marks")

	feedback.Info("Loading capabilities from %s", fm.URL)
	caps, err := wfs.GetCapabilities(fm.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(fm.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
	}

	feedback.Info("Found feature type '%s", fm.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
		return nil, err
	}

	if fm.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", fm.SortBy)
	}

	dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	var (
		unsupported       = stringCounter{}
		missingProperties int
		badProperties     int
		outsideOrDup      int
		features          int
	)

	if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
		feedback.Info("Get features from: '%s'", url)
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return fmt.Errorf("parsing GetFeature document failed: %v", err)
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		savepoint := Savepoint(ctx, tx, "feature")

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props fairwayMarksProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var catlam sql.NullInt64
			if props.HydroCatlam != nil {
				catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true}
			} else if props.IENCCatlam != nil {
				catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true}
			}

			var dirimp sql.NullInt64
			if props.Dirimp != nil {
				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
					dirimp = sql.NullInt64{Int64: value, Valid: true}
				}
			}

			switch feature.Geometry.Type {
			case "Point":
				var p pointSlice
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
					return err
				}
				var fmid int64
				err := savepoint(func() error {
					err := insertStmt.QueryRowContext(
						ctx,
						p.asWKB(),
						epsg,
						props.Datsta,
						props.Datend,
						props.Persta,
						props.Perend,
						props.Objnam,
						props.Nobjnm,
						props.Inform,
						props.Ninfom,
						props.Scamin,
						props.Picrep,
						props.Txtdsc,
						props.Sordat,
						props.Sorind,
						props.Colour,
						props.Colpat,
						props.Condtn,
						props.Bcnshp,
						catlam,
						dirimp,
					).Scan(&fmid)
					return err
				})
				switch {
				case err == sql.ErrNoRows:
					outsideOrDup++
					// ignore -> filtered by responsibility_areas
				case err != nil:
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				default:
					features++
				}
			default:
				unsupported[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupported) != 0 {
		feedback.Warn("Unsupported types found: %s", unsupported)
	}

	if outsideOrDup > 0 {
		feedback.Info(
			"Features outside responsibility area and duplicates: %d",
			outsideOrDup)
	}

	if features == 0 {
		err := errors.New("no valid new features found")
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}