view pkg/imports/fm.go @ 4969:5621586e7a54 fairway-marks-import

Avoid checking the same condition twice An entry should either be UPDATEd or INSERTed, if it's not already in the database. Thus simply INSERT if there was no UPDATE.
author Tom Gottfried <tom@intevation.de>
date Fri, 28 Feb 2020 17:55:55 +0100
parents 1ba11ade2cf3
children 5890e62e6d52
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"database/sql"
	"fmt"
	"strings"

	"gemma.intevation.de/gemma/pkg/pgxutils"
)

type (
	// Properties common to all types of fairway marks
	fairwayMarksProperties struct {
		Datsta *string `json:"hydro_datsta"`
		Datend *string `json:"hydro_datend"`
		Persta *string `json:"hydro_persta"`
		Perend *string `json:"hydro_perend"`
		Objnam *string `json:"hydro_objnam"`
		Nobjnm *string `json:"hydro_nobjnm"`
		Inform *string `json:"hydro_inform"`
		Ninfom *string `json:"hydro_ninfom"`
		Scamin *int    `json:"hydro_scamin"`
		Picrep *string `json:"hydro_picrep"`
		Txtdsc *string `json:"hydro_txtdsc"`
		Sordat *string `json:"hydro_sordat"`
		Sorind *string `json:"hydro_sorind"`
	}

	bcnlatProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Bcnshp *int    `json:"hydro_bcnshp"`
	}

	bcnlatHydroProperties struct {
		bcnlatProperties
		Catlam *int64 `json:"hydro_catlam"`
	}

	bcnlatIencProperties struct {
		bcnlatProperties
		Catlam *int64  `json:"ienc_catlam"`
		Dirimp *string `json:"ienc_dirimp"`
	}

	boylatProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boylatHydroProperties struct {
		boylatProperties
		Marsys *int64 `json:"hydro_marsys"`
		Catlam *int64 `json:"hydro_catlam"`
	}

	boylatIencProperties struct {
		boylatProperties
		Marsys *int64 `json:"ienc_marsys"`
		Catlam *int64 `json:"ienc_catlam"`
	}

	boycarProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int    `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catcam *int    `json:"hydro_catcam"`
	}

	boysawProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boysppProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catspm *string `json:"hydro_catspm"`
	}

	daymarHydroProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}

	daymarIencProperties struct {
		daymarHydroProperties
		Dirimp *string  `json:"ienc_dirimp"`
		Orient *float64 `json:"hydro_orient"`
	}

	lightsProperties struct {
		fairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Condtn *int     `json:"hydro_condtn"`
		Orient *float64 `json:"hydro_orient"`
		Catlit *string  `json:"hydro_catlit"`
		Exclit *int     `json:"hydro_exclit"`
		Litchr *int     `json:"hydro_litchr"`
		Litvis *string  `json:"hydro_litvis"`
		Mltylt *int     `json:"hydro_mltylt"`
		Sectr1 *float64 `json:"hydro_sectr1"`
		Sectr2 *float64 `json:"hydro_sectr2"`
		Siggrp *string  `json:"hydro_siggrp"`
		Sigper *float64 `json:"hydro_sigper"`
		Sigseq *string  `json:"hydro_sigseq"`
		Status *string  `json:"hydro_status"`
	}

	notmrkProperties struct {
		fairwayMarksProperties
		Condtn *int     `json:"hydro_condtn"`
		Marsys *int     `json:"hydro_bcnshp"`
		Dirimp *string  `json:"ienc_dirimp"`
		Orient *float64 `json:"hydro_orient"`
		Status *string  `json:"hydro_status"`
		Addmrk *string  `json:"ienc_addmrk"`
		Catnmk *int     `json:"ienc_catnmk"`
		Disipd *float64 `json:"ienc_disipd"`
		Disipu *float64 `json:"ienc_disipu"`
		Disbk1 *float64 `json:"ienc_disbk1"`
		Disbk2 *float64 `json:"ienc_disbk2"`
		Fnctnm *int     `json:"ienc_fnctnm"`
		Bnkwtw *int     `json:"ienc_bnkwtw"`
	}

	rtpbcnProperties struct {
		fairwayMarksProperties
		Condtn *int    `json:"hydro_condtn"`
		Siggrp *string `json:"hydro_siggrp"`
		Catrtb *int    `json:"hydro_catrtb"`
		Radwal *string `json:"hydro_radwal"`
	}

	topmarProperties struct {
		fairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}
)

const (
	BCNLATHYDROJobKind JobKind = "fm_bcnlat_hydro"
	BCNLATIENCJobKind  JobKind = "fm_bcnlat_ienc"
	BOYLATHYDROJobKind JobKind = "fm_boylat_hydro"
	BOYLATIENCJobKind  JobKind = "fm_boylat_ienc"
	BOYCARJobKind      JobKind = "fm_boycar"
	BOYSAWJobKind      JobKind = "fm_boysaw"
	BOYSPPJobKind      JobKind = "fm_boyspp"
	DAYMARHYDROJobKind JobKind = "fm_daymar_hydro"
	DAYMARIENCJobKind  JobKind = "fm_daymar_ienc"
	LIGHTSJobKind      JobKind = "fm_lights"
	NOTMRKJobKind      JobKind = "fm_notmrk"
	RTPBCNJobKind      JobKind = "fm_rtpbcn"
	TOPMARJobKind      JobKind = "fm_topmar"
)

func init() {
	RegisterJobCreator(BCNLATHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks bcnlat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_hydro",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
				),
				consumeBCNLATHydro,
				createInvalidation("bcnlat_hydro"),
				func() interface{} { return new(bcnlatHydroProperties) },
			),
		})

	RegisterJobCreator(BCNLATIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks bcnlat (IENC)",
			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_ienc",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
					insertBcnlatDirimpSQL,
				),
				consumeBCNLATIenc,
				createInvalidation("bcnlat_ienc"),
				func() interface{} { return new(bcnlatIencProperties) },
			),
		})

	RegisterJobCreator(BOYLATHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boylat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_boylat_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_hydro",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consumeBOYLATHydro,
				createInvalidation("boylat_hydro"),
				func() interface{} { return new(boylatHydroProperties) },
			),
		})

	RegisterJobCreator(BOYLATIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boylat (IENC)",
			depends:     [2][]string{{"fairway_marks_boylat_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_ienc",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consumeBOYLATIenc,
				createInvalidation("boylat_ienc"),
				func() interface{} { return new(boylatIencProperties) },
			),
		})

	RegisterJobCreator(BOYCARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boycar",
			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boycar",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catcam"),
				),
				consumeBOYCAR,
				createInvalidation("boycar"),
				func() interface{} { return new(boycarProperties) },
			),
		})

	RegisterJobCreator(BOYSAWJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boysaw",
			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boysaw",
						"colour", "colpat", "conrad", "marsys", "boyshp"),
				),
				consumeBOYSAW,
				createInvalidation("boysaw"),
				func() interface{} { return new(boysawProperties) },
			),
		})

	RegisterJobCreator(BOYSPPJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boyspp",
			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boyspp",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catspm"),
				),
				consumeBOYSPP,
				createInvalidation("boyspp"),
				func() interface{} { return new(boysppProperties) },
			),
		})

	RegisterJobCreator(DAYMARHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks daymar (HYDRO)",
			depends:     [2][]string{{"fairway_marks_daymar_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_hydro",
						"colour", "colpat", "condtn", "topshp"),
				),
				consumeDAYMARHydro,
				createInvalidation("daymar_hydro"),
				func() interface{} { return new(daymarHydroProperties) },
			),
		})

	RegisterJobCreator(DAYMARIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks daymar (IENC)",
			depends:     [2][]string{{"fairway_marks_daymar_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_ienc",
						"colour", "colpat", "condtn", "topshp", "orient"),
					insertDaymarDirimpSQL,
				),
				consumeDAYMARIenc,
				createInvalidation("daymar_ienc"),
				func() interface{} { return new(daymarIencProperties) },
			),
		})

	RegisterJobCreator(LIGHTSJobKind,
		&PointWFSJobCreator{
			description: "fairway marks lights",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("lights",
						"colour", "condtn", "orient",
						"catlit", "exclit", "litchr",
						"litvis", "mltylt", "sectr1",
						"sectr2", "siggrp", "sigper",
						"sigseq", "status"),
				),
				consumeLIGHTS,
				createInvalidation("lights"),
				func() interface{} { return new(lightsProperties) },
			),
		})

	RegisterJobCreator(NOTMRKJobKind,
		&PointWFSJobCreator{
			description: "fairway marks notmrk",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("notmrk",
						"condtn", "marsys", "orient",
						"status", "addmrk", "catnmk",
						"disipd", "disipu", "disbk1",
						"disbk2", "fnctnm", "bnkwtw"),
					insertNotmrkDirimpSQL,
				),
				consumeNOTMRK,
				createInvalidation("notmark"),
				func() interface{} { return new(notmrkProperties) },
			),
		})

	RegisterJobCreator(RTPBCNJobKind,
		&PointWFSJobCreator{
			description: "fairway marks rtpbcn",
			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("rtpbcn",
						"condtn", "siggrp", "catrtb", "radwal"),
				),
				consumeRTPBCN,
				createInvalidation("rtpbcn"),
				func() interface{} { return new(rtpbcnProperties) },
			),
		})

	RegisterJobCreator(TOPMARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks topmar",
			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("topmar",
						"colour", "colpat", "condtn", "topshp"),
				),
				consumeTOPMAR,
				createInvalidation("topmar"),
				func() interface{} { return new(topmarProperties) },
			),
		})
}

const (
	// Format string to be completed with type and additional attributes
	/* Instead of the row comparisons in the WHERE clauses
	of the CTE with the UPDATE and the INSERT ... SELECT, we could have
	used the row-based UNIQUE indexes as arbiter indexes
	in an INSERT ... ON CONFLICT ... DO UPDATE, but that turned out
	to be able to bypass the UNIQUE index in some cases.
	*/
	insertFMSQLtmpl = `
WITH a AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT newfm
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
),
t AS (
  -- Currently valid and otherwise identical entry's validity.
  /* If there are no intermittent updates of validity,
     there will always be only one currently valid and
     otherwise identical entry. */
  UPDATE waterway.fairway_marks_%[1]s SET last_found = current_timestamp
  WHERE validity @> current_timestamp
    AND (geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT newfm FROM g),
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s)
  RETURNING 1
)
INSERT INTO waterway.fairway_marks_%[1]s (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  %[2]s
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
    %[3]s
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`

	// Assume validity ended for all entries not found in data source
	invalidateFairwayMarksSQLtmpl = `
WITH
  a AS (
    SELECT users.current_user_area_utm() AS a
  )
UPDATE waterway.fairway_marks_%s
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Intersects((select a from a),
        ST_Transform(CAST(geom AS geometry), (select ST_SRID(a) from a))))
 `

	insertBcnlatDirimpSQL = `
INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
  VALUES ($1, $2)
`

	insertDaymarDirimpSQL = `
INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
  VALUES ($1, $2)
`

	insertNotmrkDirimpSQL = `
INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
  VALUES ($1, $2)
`
)

func createInvalidation(fmType string) func(*SQLPointConsumer) error {

	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)

	return func(spc *SQLPointConsumer) error {
		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
		if err != nil {
			return err
		}
		old, err := res.RowsAffected()
		if err != nil {
			return err
		}
		if old == 0 {
			return ErrFeaturesUnmodified
		}
		spc.feedback.Info("Number of features removed from data source: %d", old)
		return nil
	}
}

// Create INSERT statement for specific fairway marks type
func createInsertFMSQL(fmType string, attributes ...string) string {
	attNums := "$16"
	for i := 1; i < len(attributes); i++ {
		attNums += fmt.Sprintf(",$%d", 16+i)
	}

	return fmt.Sprintf(
		insertFMSQLtmpl,
		fmType,
		strings.Join(attributes, ","),
		attNums,
	)
}

func storeAttribs(spc *SQLPointConsumer, id int64, attrs *string) {
	if attrs == nil || *attrs == "" {
		return
	}
	dirimps := strings.Split(*attrs, ",")
	for _, dirimp := range dirimps {
		if err := spc.savepoint(func() error {
			_, err := spc.stmts[1].ExecContext(
				spc.ctx, id, dirimp)
			return err
		}); err != nil {
			spc.feedback.Warn(
				pgxutils.ReadableError{Err: err}.Error())
			spc.feedback.Info(
				"Tried to import '%s' as dirimp value",
				dirimp)
		}
	}
}

func consumeBCNLATHydro(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*bcnlatHydroProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Bcnshp,
			props.Catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBCNLATIenc(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*bcnlatIencProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Bcnshp,
			props.Catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeBOYLATHydro(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boylatHydroProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYLATIenc(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boylatIencProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catlam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYCAR(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boycarProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catcam,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYSAW(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boysawProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeBOYSPP(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*boysppProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Conrad,
			props.Marsys,
			props.Boyshp,
			props.Catspm,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility_areas
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeDAYMARHydro(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*daymarHydroProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Topshp,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeDAYMARIenc(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*daymarIencProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Topshp,
			props.Orient,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeLIGHTS(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*lightsProperties)

	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Condtn,
			props.Orient,
			props.Catlit,
			props.Exclit,
			props.Litchr,
			props.Litvis,
			props.Mltylt,
			props.Sectr1,
			props.Sectr2,
			props.Siggrp,
			props.Sigper,
			props.Sigseq,
			props.Status,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeNOTMRK(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*notmrkProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Condtn,
			props.Marsys,
			props.Orient,
			props.Status,
			props.Addmrk,
			props.Catnmk,
			props.Disipd,
			props.Disipu,
			props.Disbk1,
			props.Disbk2,
			props.Fnctnm,
			props.Bnkwtw,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
		// TODO: handle eventual changes to dirimp
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		storeAttribs(spc, fmid, props.Dirimp)
	}
	return nil
}

func consumeRTPBCN(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*rtpbcnProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Condtn,
			props.Siggrp,
			props.Catrtb,
			props.Radwal,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}

func consumeTOPMAR(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	props := properties.(*topmarProperties)
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			points.asWKB(),
			epsg,
			props.Datsta,
			props.Datend,
			props.Persta,
			props.Perend,
			props.Objnam,
			props.Nobjnm,
			props.Inform,
			props.Ninfom,
			props.Scamin,
			props.Picrep,
			props.Txtdsc,
			props.Sordat,
			props.Sorind,
			props.Colour,
			props.Colpat,
			props.Condtn,
			props.Topshp,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	}
	return nil
}