view pkg/imports/fm.go @ 4961:67d78b74fe43 fairway-marks-import

Be more careful with unchanged errors if features were removed.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 26 Feb 2020 19:28:21 +0100
parents 2ab75c48e8e7
children 1b309a8e7673
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"database/sql"
	"fmt"
	"strings"

	"gemma.intevation.de/gemma/pkg/pgxutils"
)

type (
	// Properties common to all types of fairway marks
	fairwayMarksProperties struct {
		Datsta *string `json:"hydro_datsta"`
		Datend *string `json:"hydro_datend"`
		Persta *string `json:"hydro_persta"`
		Perend *string `json:"hydro_perend"`
		Objnam *string `json:"hydro_objnam"`
		Nobjnm *string `json:"hydro_nobjnm"`
		Inform *string `json:"hydro_inform"`
		Ninfom *string `json:"hydro_ninfom"`
		Scamin *int    `json:"hydro_scamin"`
		Picrep *string `json:"hydro_picrep"`
		Txtdsc *string `json:"hydro_txtdsc"`
		Sordat *string `json:"hydro_sordat"`
		Sorind *string `json:"hydro_sorind"`
	}

	bcnlatProperties struct {
		fairwayMarksProperties
		Colour      *string `json:"hydro_colour"`
		Colpat      *string `json:"hydro_colpat"`
		Condtn      *int    `json:"hydro_condtn"`
		Bcnshp      *int    `json:"hydro_bcnshp"`
		HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
		IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
		Dirimp      *string `json:"ienc_dirimp,omitempty"`
	}

	boylatProperties struct {
		fairwayMarksProperties
		Colour      *string `json:"hydro_colour"`
		Colpat      *string `json:"hydro_colpat"`
		Conrad      *int    `json:"hydro_conrad"`
		HydroMarsys *int64  `json:"hydro_marsys,omitempty"`
		IENCMarsys  *int64  `json:"ienc_marsys,omitempty"`
		Boyshp      *int    `json:"hydro_boyshp"`
		HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
		IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
	}

	boycarProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int    `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catcam *int    `json:"hydro_catcam"`
	}

	boysawProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boysppProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catspm *string `json:"hydro_catspm"`
	}

	daymarProperties struct {
		fairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Colpat *string  `json:"hydro_colpat"`
		Condtn *int     `json:"hydro_condtn"`
		Dirimp *string  `json:"ienc_dirimp,omitempty"`
		Topshp *int     `json:"hydro_topshp"`
		Orient *float64 `json:"hydro_orient,omitempty"`
	}

	lightsProperties struct {
		fairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Condtn *int     `json:"hydro_condtn"`
		Orient *float64 `json:"hydro_orient"`
		Catlit *string  `json:"hydro_catlit"`
		Exclit *int     `json:"hydro_exclit"`
		Litchr *int     `json:"hydro_litchr"`
		Litvis *string  `json:"hydro_litvis"`
		Mltylt *int     `json:"hydro_mltylt"`
		Sectr1 *float64 `json:"hydro_sectr1"`
		Sectr2 *float64 `json:"hydro_sectr2"`
		Siggrp *string  `json:"hydro_siggrp"`
		Sigper *float64 `json:"hydro_sigper"`
		Sigseq *string  `json:"hydro_sigseq"`
		Status *string  `json:"hydro_status"`
	}

	notmrkProperties struct {
		fairwayMarksProperties
		Condtn *int     `json:"hydro_condtn"`
		Marsys *int     `json:"hydro_bcnshp"`
		Dirimp *string  `json:"ienc_dirimp"`
		Orient *float64 `json:"hydro_orient"`
		Status *string  `json:"hydro_status"`
		Addmrk *string  `json:"ienc_addmrk"`
		Catnmk *int     `json:"ienc_catnmk"`
		Disipd *float64 `json:"ienc_disipd"`
		Disipu *float64 `json:"ienc_disipu"`
		Disbk1 *float64 `json:"ienc_disbk1"`
		Disbk2 *float64 `json:"ienc_disbk2"`
		Fnctnm *int     `json:"ienc_fnctnm"`
		Bnkwtw *int     `json:"ienc_bnkwtw"`
	}

	rtpbcnProperties struct {
		fairwayMarksProperties
		Condtn *int    `json:"hydro_condtn"`
		Siggrp *string `json:"hydro_siggrp"`
		Catrtb *int    `json:"hydro_catrtb"`
		Radwal *string `json:"hydro_radwal"`
	}

	topmarProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}
)

const (
	BCNLATJobKind JobKind = "fm_bcnlat"
	BOYLATJobKind JobKind = "fm_boylat"
	BOYCARJobKind JobKind = "fm_boycar"
	BOYSAWJobKind JobKind = "fm_boysaw"
	BOYSPPJobKind JobKind = "fm_boyspp"
	DAYMARJobKind JobKind = "fm_daymar"
	LIGHTSJobKind JobKind = "fm_lights"
	NOTMRKJobKind JobKind = "fm_notmrk"
	RTPBCNJobKind JobKind = "fm_rtpbcn"
	TOPMARJobKind JobKind = "fm_topmar"
)

func init() {
	RegisterJobCreator(BCNLATJobKind,
		&PointWFSJobCreator{
			description: "fairway marks bcnlat",
			depends:     [2][]string{{"fairway_marks_bcnlat"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
					insertBcnlatDirimpSQL,
				),
				consumeBCNLAT,
				createInvalidation("bcnlat"),
				func() interface{} { return new(bcnlatProperties) },
			),
		})

	RegisterJobCreator(BOYLATJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boylat",
			depends:     [2][]string{{"fairway_marks_boylat"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consumeBOYLAT,
				createInvalidation("boylat"),
				func() interface{} { return new(boylatProperties) },
			),
		})

	RegisterJobCreator(BOYCARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boycar",
			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boycar",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catcam"),
				),
				consumeBOYCAR,
				createInvalidation("boycar"),
				func() interface{} { return new(boycarProperties) },
			),
		})

	RegisterJobCreator(BOYSAWJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boysaw",
			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boysaw",
						"colour", "colpat", "conrad", "marsys", "boyshp"),
				),
				consumeBOYSAW,
				createInvalidation("boysaw"),
				func() interface{} { return new(boysawProperties) },
			),
		})

	RegisterJobCreator(BOYSPPJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boyspp",
			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boyspp",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catspm"),
				),
				consumeBOYSPP,
				createInvalidation("boyspp"),
				func() interface{} { return new(boysppProperties) },
			),
		})

	RegisterJobCreator(DAYMARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks daymar",
			depends:     [2][]string{{"fairway_marks_daymar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar",
						"colour", "colpat", "condtn", "topshp", "orient"),
					insertDaymarDirimpSQL,
				),
				consumeDAYMAR,
				createInvalidation("daymar"),
				func() interface{} { return new(daymarProperties) },
			),
		})

	RegisterJobCreator(LIGHTSJobKind,
		&PointWFSJobCreator{
			description: "fairway marks lights",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("lights",
						"colour", "condtn", "orient",
						"catlit", "exclit", "litchr",
						"litvis", "mltylt", "sectr1",
						"sectr2", "siggrp", "sigper",
						"sigseq", "status"),
				),
				consumeLIGHTS,
				createInvalidation("lights"),
				func() interface{} { return new(lightsProperties) },
			),
		})

	RegisterJobCreator(NOTMRKJobKind,
		&PointWFSJobCreator{
			description: "fairway marks notmrk",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("notmrk",
						"condtn", "marsys", "orient",
						"status", "addmrk", "catnmk",
						"disipd", "disipu", "disbk1",
						"disbk2", "fnctnm", "bnkwtw"),
					insertNotmrkDirimpSQL,
				),
				consumeNOTMRK,
				createInvalidation("notmark"),
				func() interface{} { return new(notmrkProperties) },
			),
		})

	RegisterJobCreator(RTPBCNJobKind,
		&PointWFSJobCreator{
			description: "fairway marks rtpbcn",
			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("rtpbcn",
						"condtn", "siggrp", "catrtb", "radwal"),
				),
				consumeRTPBCN,
				createInvalidation("rtpbcn"),
				func() interface{} { return new(rtpbcnProperties) },
			),
		})

	RegisterJobCreator(TOPMARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks topmar",
			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("topmar",
						"colour", "colpat", "condtn", "topshp"),
				),
				consumeTOPMAR,
				createInvalidation("topmar"),
				func() interface{} { return new(topmarProperties) },
			),
		})
}

const (
	// Format string to be completed with type and additional attributes
	/* Instead of the row comparisons in the WHERE clauses
	of the CTE with the UPDATE and the INSERT ... SELECT, we could have
	used the row-based UNIQUE indexes as arbiter indexes
	in an INSERT ... ON CONFLICT ... DO UPDATE, but that turned out
	to be able to bypass the UNIQUE index in some cases.
	*/
	insertFMSQLtmpl = `
WITH a AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT newfm
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
),
t AS (
  -- Currently valid and otherwise identical entry's validity.
  /* If there are no intermittent updates of validity,
     there will always be only one currently valid and
     otherwise identical entry. */
  UPDATE waterway.fairway_marks_%[1]s SET last_found = current_timestamp
  WHERE validity @> current_timestamp
    AND (geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT newfm FROM g),
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s)
  RETURNING validity
)
INSERT INTO waterway.fairway_marks_%[1]s (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  %[2]s
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
    %[3]s
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM waterway.fairway_marks_%[1]s
    WHERE (
        validity, geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT validity FROM t), newfm,
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s
      )
    )
RETURNING id
`

	// Assume validity ended for all entries not found in data source
	invalidateFairwayMarksSQLtmpl = `
WITH
  a AS (
    SELECT users.current_user_area_utm() AS a
  )
UPDATE waterway.fairway_marks_%s
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Intersects((select a from a),
        ST_Transform(CAST(geom AS geometry), (select ST_SRID(a) from a))))
 `

	insertBcnlatDirimpSQL = `
INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
  VALUES ($1, $2)
`

	insertDaymarDirimpSQL = `
INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
  VALUES ($1, $2)
`

	insertNotmrkDirimpSQL = `
INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
  VALUES ($1, $2)
`
)

func createInvalidation(fmType string) func(*SQLPointConsumer) error {

	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)

	return func(spc *SQLPointConsumer) error {
		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
		if err != nil {
			return err
		}
		old, err := res.RowsAffected()
		if err != nil {
			return err
		}
		if old == 0 {
			return ErrFeaturesUnmodified
		}
		spc.feedback.Info("Number of features removed from data source: %d", old)
		return nil
	}
}

// Create INSERT statement for specific fairway marks type
func createInsertFMSQL(fmType string, attributes ...string) string {
	attNums := "$16"
	for i := 1; i < len(attributes); i++ {
		attNums += fmt.Sprintf(",$%d", 16+i)
	}

	return fmt.Sprintf(
		insertFMSQLtmpl,
		fmType,
		strings.Join(attributes, ","),
		attNums,
	)
}

func coalesceInt64(ints ...*int64) sql.NullInt64 {
	for _, i := range ints {
		if i != nil {
			return sql.NullInt64{Int64: *i, Valid: true}
		}
	}
	return sql.NullInt64{}
}

func storeAttribs(spc *SQLPointConsumer, id int64, attrs *string) {
	if attrs == nil || *attrs == "" {
		return
	}
	dirimps := strings.Split(*attrs, ",")
	for _, dirimp := range dirimps {
		if err := spc.savepoint(func() error {
			_, err := spc.stmts[1].ExecContext(
				spc.ctx, id, dirimp)
			return err
		}); err != nil {
			spc.feedback.Warn(
				pgxutils.ReadableError{Err: err}.Error())
			spc.feedback.Info(
				"Tried to import '%s' as dirimp value",
				dirimp)
		}
	}
}

func consumeBCNLAT(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*bcnlatProperties)

	catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Bcnshp,
			catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeBOYLAT(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boylatProperties)

	marsys := coalesceInt64(props.HydroMarsys, props.IENCMarsys)
	catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			marsys,
			props.Boyshp,
			catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYCAR(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boycarProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catcam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYSAW(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boysawProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYSPP(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boysppProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catspm,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeDAYMAR(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*daymarProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Topshp,
			props.Orient,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeLIGHTS(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*lightsProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Condtn,
			props.Orient,
			props.Catlit,
			props.Exclit,
			props.Litchr,
			props.Litvis,
			props.Mltylt,
			props.Sectr1,
			props.Sectr2,
			props.Siggrp,
			props.Sigper,
			props.Sigseq,
			props.Status,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeNOTMRK(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*notmrkProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Condtn,
			props.Marsys,
			props.Orient,
			props.Status,
			props.Addmrk,
			props.Catnmk,
			props.Disipd,
			props.Disipu,
			props.Disbk1,
			props.Disbk2,
			props.Fnctnm,
			props.Bnkwtw,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeRTPBCN(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*rtpbcnProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Condtn,
			props.Siggrp,
			props.Catrtb,
			props.Radwal,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeTOPMAR(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*topmarProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Topshp,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}