view pkg/imports/fm_bcnlat.go @ 4911:bcb8b69e4358 fairway-marks-import

Type specific names for fairway marks import
author Tom Gottfried <tom@intevation.de>
date Mon, 10 Feb 2020 18:02:22 +0100
parents 6f244b5eb716
children bfd8ef836998
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/pgxutils"
)

// Bcnlat is an import job to import
// fairway marks of type BCNLAT in form of point geometries
// and attribute data from a WFS service.
type Bcnlat struct {
	FairwayMarks
}

// Description gives a short info about relevant facts of this import.
func (bcnlat *Bcnlat) Description() (string, error) {
	return bcnlat.URL + "|" + bcnlat.FeatureType, nil
}

// BCNLATJobKind is the import queue type identifier.
const BCNLATJobKind JobKind = "fm_bcnlat"

type bcnlatJobCreator struct{}

func init() {
	RegisterJobCreator(BCNLATJobKind, bcnlatJobCreator{})
}

func (bcnlatJobCreator) Description() string { return "fairway marks bcnlat" }

func (bcnlatJobCreator) AutoAccept() bool { return true }

func (bcnlatJobCreator) Create() Job { return new(Bcnlat) }

func (bcnlatJobCreator) Depends() [2][]string {
	return [2][]string{
		{"fairway_marks_bcnlat"},
		{},
	}
}

// StageDone is a NOP for fairway marks imports.
func (bcnlatJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for fairway marks imports is a NOP.
func (*Bcnlat) CleanUp() error { return nil }

type bcnlatProperties struct {
	fairwayMarksProperties
	Colour      *string `json:"hydro_colour"`
	Colpat      *string `json:"hydro_colpat"`
	Condtn      *int    `json:"hydro_condtn"`
	Bcnshp      *int    `json:"hydro_bcnshp"`
	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
	Dirimp      *string `json:"ienc_dirimp,omitempty"`
}

type bcnlatFeaturetype struct {
	geom  pointSlice
	props *bcnlatProperties
}

const (
	insertBCNLATSQL = `
with a as (
  select users.current_user_area_utm() AS a
)
INSERT INTO waterway.fairway_marks_bcnlat (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  colour,
  colpat,
  condtn,
  bcnshp,
  catlam,
  dirimp
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
ON CONFLICT (
  CAST((geom,
      datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
      scamin, picrep, txtdsc, sordat, sorind,
      0, colour, colpat, condtn, bcnshp, catlam, dirimp
    ) AS waterway.fairway_marks_bcnlat)
  )
  DO NOTHING
RETURNING id
`
)

// Do executes the actual import.
func (fm *Bcnlat) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import fairway marks of type BCNLAT/bcnlat")

	fms, epsg, err := getFMFeatures(
		feedback,
		fm.FairwayMarks,
		func() interface{} { return new(bcnlatProperties) },
		func(p pointSlice, props interface{}) interface{} {
			return &bcnlatFeaturetype{p, props.(*bcnlatProperties)}
		},
	)
	if err != nil {
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertBCNLATSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	savepoint := Savepoint(ctx, tx, "feature")

	var (
		outsideOrDup int
		features     int
	)
	for _, fm := range fms {

		f := fm.(*bcnlatFeaturetype)

		var catlam sql.NullInt64
		if f.props.HydroCatlam != nil {
			catlam = sql.NullInt64{Int64: *f.props.HydroCatlam, Valid: true}
		} else if f.props.IENCCatlam != nil {
			catlam = sql.NullInt64{Int64: *f.props.IENCCatlam, Valid: true}
		}

		var dirimp sql.NullInt64
		if f.props.Dirimp != nil {
			if value, err := strconv.ParseInt(*f.props.Dirimp, 10, 64); err == nil {
				dirimp = sql.NullInt64{Int64: value, Valid: true}
			}
		}

		var fmid int64
		err := savepoint(func() error {
			err := insertStmt.QueryRowContext(
				ctx,
				f.geom.asWKB(),
				epsg,
				f.props.Datsta,
				f.props.Datend,
				f.props.Persta,
				f.props.Perend,
				f.props.Objnam,
				f.props.Nobjnm,
				f.props.Inform,
				f.props.Ninfom,
				f.props.Scamin,
				f.props.Picrep,
				f.props.Txtdsc,
				f.props.Sordat,
				f.props.Sorind,
				f.props.Colour,
				f.props.Colpat,
				f.props.Condtn,
				f.props.Bcnshp,
				catlam,
				dirimp,
			).Scan(&fmid)
			return err
		})
		switch {
		case err == sql.ErrNoRows:
			outsideOrDup++
			// ignore -> filtered by responsibility_areas
		case err != nil:
			feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		default:
			features++
		}
	}

	if outsideOrDup > 0 {
		feedback.Info(
			"Features outside responsibility area and duplicates: %d",
			outsideOrDup)
	}

	if features == 0 {
		err := UnchangedError("no valid new features found")
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}