changeset 4946:b0dbc0f2c748 fairway-marks-import

Simplified importing of fairway marks.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 19 Feb 2020 15:08:35 +0100
parents 97533bbfaa2d
children 407103c299a0
files pkg/imports/fm.go pkg/imports/fm_bcnlat.go pkg/imports/fm_boycar.go pkg/imports/fm_boylat.go pkg/imports/fm_boysaw.go pkg/imports/fm_boyspp.go pkg/imports/fm_daymar.go pkg/imports/fm_lights.go pkg/imports/fm_notmrk.go pkg/imports/fm_rtpbcn.go pkg/imports/fm_topmar.go pkg/imports/modelconvert.go pkg/imports/pointwfs.go pkg/models/imports.go
diffstat 14 files changed, 1155 insertions(+), 2066 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fm.go	Tue Feb 18 13:00:25 2020 +0100
+++ b/pkg/imports/fm.go	Wed Feb 19 15:08:35 2020 +0100
@@ -1,6 +1,3 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
@@ -10,58 +7,321 @@
 //
 // Author(s):
 //  * Tom Gottfried <tom.gottfried@intevation.de>
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
 
 package imports
 
 import (
-	"context"
 	"database/sql"
-	"encoding/json"
 	"fmt"
-	"io"
 	"strings"
-	"time"
+
+	"gemma.intevation.de/gemma/pkg/pgxutils"
+)
+
+type (
+	// Properties common to all types of fairway marks
+	fairwayMarksProperties struct {
+		Datsta *string `json:"hydro_datsta"`
+		Datend *string `json:"hydro_datend"`
+		Persta *string `json:"hydro_persta"`
+		Perend *string `json:"hydro_perend"`
+		Objnam *string `json:"hydro_objnam"`
+		Nobjnm *string `json:"hydro_nobjnm"`
+		Inform *string `json:"hydro_inform"`
+		Ninfom *string `json:"hydro_ninfom"`
+		Scamin *int    `json:"hydro_scamin"`
+		Picrep *string `json:"hydro_picrep"`
+		Txtdsc *string `json:"hydro_txtdsc"`
+		Sordat *string `json:"hydro_sordat"`
+		Sorind *string `json:"hydro_sorind"`
+	}
+
+	bcnlatProperties struct {
+		fairwayMarksProperties
+		Colour      *string `json:"hydro_colour"`
+		Colpat      *string `json:"hydro_colpat"`
+		Condtn      *int    `json:"hydro_condtn"`
+		Bcnshp      *int    `json:"hydro_bcnshp"`
+		HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
+		IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
+		Dirimp      *string `json:"ienc_dirimp,omitempty"`
+	}
+
+	boylatProperties struct {
+		fairwayMarksProperties
+		Colour      *string `json:"hydro_colour"`
+		Colpat      *string `json:"hydro_colpat"`
+		Conrad      *int    `json:"hydro_conrad"`
+		HydroMarsys *int64  `json:"hydro_marsys,omitempty"`
+		IENCMarsys  *int64  `json:"ienc_marsys,omitempty"`
+		Boyshp      *int    `json:"hydro_boyshp"`
+		HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
+		IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
+	}
+
+	boycarProperties struct {
+		fairwayMarksProperties
+		Colour *string `json:"hydro_colour"`
+		Colpat *string `json:"hydro_colpat"`
+		Conrad *int    `json:"hydro_conrad"`
+		Marsys *int    `json:"hydro_marsys"`
+		Boyshp *int    `json:"hydro_boyshp"`
+		Catcam *int    `json:"hydro_catcam"`
+	}
+
+	boysawProperties struct {
+		fairwayMarksProperties
+		Colour *string `json:"hydro_colour"`
+		Colpat *string `json:"hydro_colpat"`
+		Conrad *int    `json:"hydro_conrad"`
+		Marsys *int64  `json:"hydro_marsys"`
+		Boyshp *int    `json:"hydro_boyshp"`
+	}
+
+	boysppProperties struct {
+		fairwayMarksProperties
+		Colour *string `json:"hydro_colour"`
+		Colpat *string `json:"hydro_colpat"`
+		Conrad *int    `json:"hydro_conrad"`
+		Marsys *int64  `json:"hydro_marsys"`
+		Boyshp *int    `json:"hydro_boyshp"`
+		Catspm *string `json:"hydro_catspm"`
+	}
 
-	"gemma.intevation.de/gemma/pkg/wfs"
+	daymarProperties struct {
+		fairwayMarksProperties
+		Colour *string  `json:"hydro_colour"`
+		Colpat *string  `json:"hydro_colpat"`
+		Condtn *int     `json:"hydro_condtn"`
+		Dirimp *string  `json:"ienc_dirimp,omitempty"`
+		Topshp *int     `json:"hydro_topshp"`
+		Orient *float64 `json:"hydro_orient,omitempty"`
+	}
+
+	lightsProperties struct {
+		fairwayMarksProperties
+		Colour *string  `json:"hydro_colour"`
+		Condtn *int     `json:"hydro_condtn"`
+		Orient *float64 `json:"hydro_orient"`
+		Catlit *string  `json:"hydro_catlit"`
+		Exclit *int     `json:"hydro_exclit"`
+		Litchr *int     `json:"hydro_litchr"`
+		Litvis *string  `json:"hydro_litvis"`
+		Mltylt *int     `json:"hydro_mltylt"`
+		Sectr1 *float64 `json:"hydro_sectr1"`
+		Sectr2 *float64 `json:"hydro_sectr2"`
+		Siggrp *string  `json:"hydro_siggrp"`
+		Sigper *float64 `json:"hydro_sigper"`
+		Sigseq *string  `json:"hydro_sigseq"`
+		Status *string  `json:"hydro_status"`
+	}
+
+	notmrkProperties struct {
+		fairwayMarksProperties
+		Condtn *int     `json:"hydro_condtn"`
+		Marsys *int     `json:"hydro_bcnshp"`
+		Dirimp *string  `json:"ienc_dirimp"`
+		Orient *float64 `json:"hydro_orient"`
+		Status *string  `json:"hydro_status"`
+		Addmrk *string  `json:"ienc_addmrk"`
+		Catnmk *int     `json:"ienc_catnmk"`
+		Disipd *float64 `json:"ienc_disipd"`
+		Disipu *float64 `json:"ienc_disipu"`
+		Disbk1 *float64 `json:"ienc_disbk1"`
+		Disbk2 *float64 `json:"ienc_disbk2"`
+		Fnctnm *int     `json:"ienc_fnctnm"`
+		Bnkwtw *int     `json:"ienc_bnkwtw"`
+	}
+
+	rtpbcnProperties struct {
+		fairwayMarksProperties
+		Condtn *int    `json:"hydro_condtn"`
+		Siggrp *string `json:"hydro_siggrp"`
+		Catrtb *int    `json:"hydro_catrtb"`
+		Radwal *string `json:"hydro_radwal"`
+	}
+
+	topmarProperties struct {
+		fairwayMarksProperties
+		Colour *string `json:"hydro_colour"`
+		Colpat *string `json:"hydro_colpat"`
+		Condtn *int    `json:"hydro_condtn"`
+		Topshp *int    `json:"hydro_topshp"`
+	}
+)
+
+const (
+	BCNLATJobKind JobKind = "fm_bcnlat"
+	BOYLATJobKind JobKind = "fm_boylat"
+	BOYCARJobKind JobKind = "fm_boycar"
+	BOYSAWJobKind JobKind = "fm_boysaw"
+	BOYSPPJobKind JobKind = "fm_boyspp"
+	DAYMARJobKind JobKind = "fm_daymar"
+	LIGHTSJobKind JobKind = "fm_lights"
+	NOTMRKJobKind JobKind = "fm_notmrk"
+	RTPBCNJobKind JobKind = "fm_rtpbcn"
+	TOPMARJobKind JobKind = "fm_topmar"
 )
 
-// FairwayMarks is a struct
-// to be used as the basis for imports of
-// specific types for fairway marks.
-type FairwayMarks struct {
-	// URL the GetCapabilities URL of the WFS service.
-	URL string `json:"url"`
-	// FeatureType selects the feature type of the WFS service.
-	FeatureType string `json:"feature-type"`
-	// SortBy works around misconfigured services to
-	// establish a sort order to get the features.
-	SortBy string `json:"sort-by"`
-	// User is an optional username for Basic Auth.
-	User string `json:"user,omitempty"`
-	// Password is an optional password for Basic Auth.
-	Password string `json:"password,omitempty"`
-}
+func init() {
+	RegisterJobCreator(BCNLATJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks bcnlat",
+			depends:     [2][]string{{"fairway_marks_bcnlat"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("fm_bcnlat",
+						"colour", "colpat", "condtn", "bcnshp", "catlam"),
+					insertBcnlatDirimpSQL,
+				),
+				consumeBCNLAT,
+				func() interface{} { return new(bcnlatProperties) },
+			),
+		})
+
+	RegisterJobCreator(BOYLATJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks boylat",
+			depends:     [2][]string{{"fairway_marks_boylat"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("boylat",
+						"colour", "colpat", "conrad",
+						"marsys", "boyshp", "catlam"),
+				),
+				consumeBOYLAT,
+				func() interface{} { return new(boylatProperties) },
+			),
+		})
+
+	RegisterJobCreator(BOYCARJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks boycar",
+			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("boycar",
+						"colour", "colpat", "conrad",
+						"marsys", "boyshp", "catcam"),
+				),
+				consumeBOYCAR,
+				func() interface{} { return new(boycarProperties) },
+			),
+		})
+
+	RegisterJobCreator(BOYSAWJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks boysaw",
+			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("boysaw",
+						"colour", "colpat", "conrad", "marsys", "boyshp"),
+				),
+				consumeBOYSAW,
+				func() interface{} { return new(boysawProperties) },
+			),
+		})
+
+	RegisterJobCreator(BOYSPPJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks boyspp",
+			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("boyspp",
+						"colour", "colpat", "conrad",
+						"marsys", "boyshp", "catspm"),
+				),
+				consumeBOYSPP,
+				func() interface{} { return new(boysppProperties) },
+			),
+		})
 
-// Properties common to all types of fairway marks
-type fairwayMarksProperties struct {
-	Datsta *string `json:"hydro_datsta"`
-	Datend *string `json:"hydro_datend"`
-	Persta *string `json:"hydro_persta"`
-	Perend *string `json:"hydro_perend"`
-	Objnam *string `json:"hydro_objnam"`
-	Nobjnm *string `json:"hydro_nobjnm"`
-	Inform *string `json:"hydro_inform"`
-	Ninfom *string `json:"hydro_ninfom"`
-	Scamin *int    `json:"hydro_scamin"`
-	Picrep *string `json:"hydro_picrep"`
-	Txtdsc *string `json:"hydro_txtdsc"`
-	Sordat *string `json:"hydro_sordat"`
-	Sorind *string `json:"hydro_sorind"`
+	RegisterJobCreator(DAYMARJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks daymar",
+			depends:     [2][]string{{"fairway_marks_daymar"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("daymar",
+						"colour", "colpat", "condtn", "topshp", "orient"),
+					insertDaymarDirimpSQL,
+				),
+				consumeDAYMAR,
+				func() interface{} { return new(daymarProperties) },
+			),
+		})
+
+	RegisterJobCreator(LIGHTSJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks lights",
+			depends:     [2][]string{{"fairway_marks_lights"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("lights",
+						"colour", "condtn", "orient",
+						"catlit", "exclit", "litchr",
+						"litvis", "mltylt", "sectr1",
+						"sectr2", "siggrp", "sigper",
+						"sigseq", "status"),
+				),
+				consumeLIGHTS,
+				func() interface{} { return new(lightsProperties) },
+			),
+		})
+
+	RegisterJobCreator(NOTMRKJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks notmrk",
+			depends:     [2][]string{{"fairway_marks_lights"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("notmrk",
+						"condtn", "marsys", "orient",
+						"status", "addmrk", "catnmk",
+						"disipd", "disipu", "disbk1",
+						"disbk2", "fnctnm", "bnkwtw"),
+					insertNotmrkDirimpSQL,
+				),
+				consumeNOTMRK,
+				func() interface{} { return new(notmrkProperties) },
+			),
+		})
+
+	RegisterJobCreator(RTPBCNJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks rtpbcn",
+			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("rtpbcn",
+						"condtn", "siggrp", "catrtb", "radwal"),
+				),
+				consumeRTPBCN,
+				func() interface{} { return new(rtpbcnProperties) },
+			),
+		})
+
+	RegisterJobCreator(TOPMARJobKind,
+		&PointWFSJobCreator{
+			description: "fairway marks topmar",
+			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(
+					createInsertFMSQL("topmar",
+						"colour", "colpat", "condtn", "topshp"),
+				),
+				consumeTOPMAR,
+				func() interface{} { return new(topmarProperties) },
+			),
+		})
 }
 
 const (
 	// Format string to be completed with type and additional attributes
-	insertFairwayMarkSQL = `
+	insertFMSQLtmpl = `
 WITH a AS (
   SELECT users.current_user_area_utm() AS a
 )
@@ -98,176 +358,563 @@
   DO NOTHING
 RETURNING id
 `
+
+	insertBcnlatDirimpSQL = `
+INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
+  VALUES ($1, $2)
+`
+
+	insertDaymarDirimpSQL = `
+INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
+  VALUES ($1, $2)
+`
+
+	insertNotmrkDirimpSQL = `
+INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
+  VALUES ($1, $2)
+`
 )
 
 // Create INSERT statement for specific fairway marks type
-func getFMInsertSQL(fmType string, attributes ...string) string {
+func createInsertFMSQL(fmType string, attributes ...string) string {
 	attNums := "$16"
 	for i := 1; i < len(attributes); i++ {
 		attNums += fmt.Sprintf(",$%d", 16+i)
 	}
 
 	return fmt.Sprintf(
-		insertFairwayMarkSQL,
+		insertFMSQLtmpl,
 		fmType,
 		strings.Join(attributes, ","),
 		attNums,
 	)
 }
 
-// Common operation of FM imports to get features from WFS service
-func getFMFeatures(
-	ctx context.Context,
-	conn *sql.Conn,
-	feedback Feedback,
-	fm FairwayMarks,
-	// Constructor returning pointer to struct
-	// representing featuretype's properties
-	newProps func() interface{},
-	// Construct pointer to featuretype from given pointSlice and properties
-	newFeat func(pointSlice, interface{}) interface{},
-	// Store features in type specific database tables
-	storeFMs func(
-		tx *sql.Tx,
-		epsg int,
-		// Slice of features to be converted to featuretypes type
-		fms []interface{},
-	) (outsideOrDup int, features int, err error),
-) (err error) {
+func coalesceInt64(ints ...*int64) sql.NullInt64 {
+	for _, i := range ints {
+		if i != nil {
+			return sql.NullInt64{Int64: *i, Valid: true}
+		}
+	}
+	return sql.NullInt64{}
+}
 
-	start := time.Now()
-
-	feedback.Info("Import fairway marks")
+func consumeBCNLAT(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*bcnlatProperties)
 
-	feedback.Info("Loading capabilities from %s", fm.URL)
-	caps, err := wfs.GetCapabilities(fm.URL)
-	if err != nil {
-		feedback.Error("Loading capabilities failed: %v", err)
-		return
-	}
-
-	ft := caps.FindFeatureType(fm.FeatureType)
-	if ft == nil {
-		err = fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
-		return
-	}
+	catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam)
 
-	feedback.Info("Found feature type '%s", fm.FeatureType)
-
-	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
-	if err != nil {
-		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
-		return
-	}
-
-	if fm.SortBy != "" {
-		feedback.Info("Features will be sorted by '%s'", fm.SortBy)
-	}
-
-	dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy)
-	if err != nil {
-		feedback.Error("Cannot create GetFeature URLs. %v", err)
-		return
-	}
-
-	var (
-		fms               []interface{}
-		unsupported       = stringCounter{}
-		missingProperties int
-		badProperties     int
-	)
-
-	err = dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
-		feedback.Info("Get features from: '%s'", url)
-		rfc, err := wfs.ParseRawFeatureCollection(r)
-		if err != nil {
-			return fmt.Errorf("parsing GetFeature document failed: %v", err)
-		}
-		if rfc.CRS != nil {
-			crsName := rfc.CRS.Properties.Name
-			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
-				feedback.Error("Unsupported CRS: %d", crsName)
-				return err
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Condtn,
+			props.Bcnshp,
+			catlam,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+		// TODO: handle eventual changes to dirimp
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	default:
+		if props.Dirimp != nil && *props.Dirimp != "" {
+			dirimps := strings.Split(*props.Dirimp, ",")
+			for _, dirimp := range dirimps {
+				if err := spc.savepoint(func() error {
+					_, err := spc.stmts[1].ExecContext(
+						spc.ctx, fmid, dirimp)
+					return err
+				}); err != nil {
+					spc.feedback.Warn(
+						pgxutils.ReadableError{Err: err}.Error())
+					spc.feedback.Info(
+						"Tried to import '%s' as dirimp value",
+						dirimp)
+				}
 			}
 		}
+	}
+	return nil
+}
 
-		// No features -> ignore.
-		if rfc.Features == nil {
-			return nil
-		}
+func consumeBOYLAT(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*boylatProperties)
+
+	marsys := coalesceInt64(props.HydroMarsys, props.IENCMarsys)
+	catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam)
 
-		feedback.Info("Using EPSG: %d", epsg)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Conrad,
+			marsys,
+			props.Boyshp,
+			catlam,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility_areas
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	}
+	return nil
+}
 
-		for _, feature := range rfc.Features {
-			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
-				missingProperties++
-				continue
-			}
+func consumeBOYCAR(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*boycarProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Conrad,
+			props.Marsys,
+			props.Boyshp,
+			props.Catcam,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility_areas
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	}
+	return nil
+}
 
-			props := newProps()
-			if err := json.Unmarshal(*feature.Properties, props); err != nil {
-				badProperties++
-				continue
-			}
+func consumeBOYSAW(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*boysawProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Conrad,
+			props.Marsys,
+			props.Boyshp,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility_areas
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	}
+	return nil
+}
 
-			switch feature.Geometry.Type {
-			case "Point":
-				var p pointSlice
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+func consumeBOYSPP(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*boysppProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Conrad,
+			props.Marsys,
+			props.Boyshp,
+			props.Catspm,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility_areas
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	}
+	return nil
+}
+
+func consumeDAYMAR(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*daymarProperties)
+
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Condtn,
+			props.Topshp,
+			props.Orient,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+		// TODO: handle eventual changes to dirimp
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	default:
+		if props.Dirimp != nil && *props.Dirimp != "" {
+			dirimps := strings.Split(*props.Dirimp, ",")
+			for _, dirimp := range dirimps {
+				if err := spc.savepoint(func() error {
+					_, err := spc.stmts[1].ExecContext(
+						spc.ctx, fmid, dirimp)
 					return err
+				}); err != nil {
+					spc.feedback.Warn(
+						pgxutils.ReadableError{Err: err}.Error())
+					spc.feedback.Info(
+						"Tried to import '%s' as dirimp value",
+						dirimp)
 				}
-
-				f := newFeat(p, props)
-				fms = append(fms, f)
-			default:
-				unsupported[feature.Geometry.Type]++
 			}
 		}
-		return nil
-	})
-	if err != nil {
-		return
 	}
+	return nil
+}
 
-	if badProperties > 0 {
-		feedback.Warn("Bad properties: %d", badProperties)
-	}
+func consumeLIGHTS(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*lightsProperties)
 
-	if missingProperties > 0 {
-		feedback.Warn("Missing properties: %d", missingProperties)
-	}
-
-	if len(unsupported) != 0 {
-		feedback.Warn("Unsupported types found: %s", unsupported)
-	}
-
-	feedback.Info("Found %d usable features in data source", len(fms))
-
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Condtn,
+			props.Orient,
+			props.Catlit,
+			props.Exclit,
+			props.Litchr,
+			props.Litvis,
+			props.Mltylt,
+			props.Sectr1,
+			props.Sectr2,
+			props.Siggrp,
+			props.Sigper,
+			props.Sigseq,
+			props.Status,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
 	}
-	defer tx.Rollback()
+	return nil
+}
 
-	outsideOrDup, features, err := storeFMs(tx, epsg, fms)
-	if err != nil {
-		return
-	}
-
-	if outsideOrDup > 0 {
-		feedback.Info(
-			"Features outside responsibility area and duplicates: %d",
-			outsideOrDup)
+func consumeNOTMRK(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*notmrkProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Condtn,
+			props.Marsys,
+			props.Orient,
+			props.Status,
+			props.Addmrk,
+			props.Catnmk,
+			props.Disipd,
+			props.Disipu,
+			props.Disbk1,
+			props.Disbk2,
+			props.Fnctnm,
+			props.Bnkwtw,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+		// TODO: handle eventual changes to dirimp
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	default:
+		if props.Dirimp != nil && *props.Dirimp != "" {
+			dirimps := strings.Split(*props.Dirimp, ",")
+			for _, dirimp := range dirimps {
+				if err := spc.savepoint(func() error {
+					_, err := spc.stmts[1].ExecContext(
+						spc.ctx, fmid, dirimp)
+					return err
+				}); err != nil {
+					spc.feedback.Warn(
+						pgxutils.ReadableError{Err: err}.Error())
+					spc.feedback.Info(
+						"Tried to import '%s' as dirimp value",
+						dirimp)
+				}
+			}
+		}
 	}
-
-	if features == 0 {
-		return UnchangedError("no valid new features found")
-	}
+	return nil
+}
 
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Storing %d features took %s",
-			features, time.Since(start))
+func consumeRTPBCN(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*rtpbcnProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Condtn,
+			props.Siggrp,
+			props.Catrtb,
+			props.Radwal,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
 	}
+	return nil
+}
 
-	return
+func consumeTOPMAR(
+	spc *SQLPointConsumer,
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	props := properties.(*topmarProperties)
+	var fmid int64
+	err := spc.savepoint(func() error {
+		return spc.stmts[0].QueryRowContext(
+			spc.ctx,
+			points.asWKB(),
+			epsg,
+			props.Datsta,
+			props.Datend,
+			props.Persta,
+			props.Perend,
+			props.Objnam,
+			props.Nobjnm,
+			props.Inform,
+			props.Ninfom,
+			props.Scamin,
+			props.Picrep,
+			props.Txtdsc,
+			props.Sordat,
+			props.Sorind,
+			props.Colour,
+			props.Colpat,
+			props.Condtn,
+			props.Topshp,
+		).Scan(&fmid)
+	})
+	switch {
+	case err == sql.ErrNoRows:
+		return ErrFeatureDuplicated
+		// ignore -> filtered by responsibility area or a duplicate
+	case err != nil:
+		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		return ErrFeatureIgnored
+	}
+	return nil
 }
--- a/pkg/imports/fm_bcnlat.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,203 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-	"strings"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Bcnlat is an import job to import
-// fairway marks of type BCNLAT in form of point geometries
-// and attribute data from a WFS service.
-type Bcnlat struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (bcnlat *Bcnlat) Description() (string, error) {
-	return bcnlat.URL + "|" + bcnlat.FeatureType, nil
-}
-
-// BCNLATJobKind is the import queue type identifier.
-const BCNLATJobKind JobKind = "fm_bcnlat"
-
-type bcnlatJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(BCNLATJobKind, bcnlatJobCreator{})
-}
-
-func (bcnlatJobCreator) Description() string { return "fairway marks bcnlat" }
-
-func (bcnlatJobCreator) AutoAccept() bool { return true }
-
-func (bcnlatJobCreator) Create() Job { return new(Bcnlat) }
-
-func (bcnlatJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_bcnlat"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (bcnlatJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Bcnlat) CleanUp() error { return nil }
-
-type bcnlatProperties struct {
-	fairwayMarksProperties
-	Colour      *string `json:"hydro_colour"`
-	Colpat      *string `json:"hydro_colpat"`
-	Condtn      *int    `json:"hydro_condtn"`
-	Bcnshp      *int    `json:"hydro_bcnshp"`
-	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
-	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
-	Dirimp      *string `json:"ienc_dirimp,omitempty"`
-}
-
-type bcnlatFeaturetype struct {
-	geom  pointSlice
-	props *bcnlatProperties
-}
-
-const (
-	insertBcnlatDirimpSQL = `
-INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
-  VALUES ($1, $2)
-`
-)
-
-// Do executes the actual import.
-func (fm *Bcnlat) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(bcnlatProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &bcnlatFeaturetype{p, props.(*bcnlatProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type BCNLAT/bcnlat")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("bcnlat",
-					"colour", "colpat", "condtn", "bcnshp", "catlam"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			insertBcnlatDirimpStmt, err := tx.PrepareContext(
-				ctx, insertBcnlatDirimpSQL)
-			if err != nil {
-				return
-			}
-			defer insertBcnlatDirimpStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*bcnlatFeaturetype)
-
-				var catlam sql.NullInt64
-				if f.props.HydroCatlam != nil {
-					catlam = sql.NullInt64{
-						Int64: *f.props.HydroCatlam, Valid: true}
-				} else if f.props.IENCCatlam != nil {
-					catlam = sql.NullInt64{
-						Int64: *f.props.IENCCatlam, Valid: true}
-				}
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Condtn,
-						f.props.Bcnshp,
-						catlam,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-					// TODO: handle eventual changes to dirimp
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-
-					if f.props.Dirimp != nil && *f.props.Dirimp != "" {
-						dirimps := strings.Split(*f.props.Dirimp, ",")
-						for _, dirimp := range dirimps {
-							if err := savepoint(func() error {
-								_, err := insertBcnlatDirimpStmt.ExecContext(
-									ctx, fmid, dirimp)
-								return err
-							}); err != nil {
-								feedback.Warn(
-									pgxutils.ReadableError{Err: err}.Error())
-								feedback.Info(
-									"Tried to import '%s' as dirimp value",
-									dirimp)
-							}
-						}
-					}
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_boycar.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,162 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Boycar is an import job to import
-// fairway marks of type BOYCAR in form of point geometries
-// and attribute data from a WFS service.
-type Boycar struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (boycar *Boycar) Description() (string, error) {
-	return boycar.URL + "|" + boycar.FeatureType, nil
-}
-
-// BOYCARJobKind is the import queue type identifier.
-const BOYCARJobKind JobKind = "fm_boycar"
-
-type boycarJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(BOYCARJobKind, boycarJobCreator{})
-}
-
-func (boycarJobCreator) Description() string { return "fairway marks boycar" }
-
-func (boycarJobCreator) AutoAccept() bool { return true }
-
-func (boycarJobCreator) Create() Job { return new(Boycar) }
-
-func (boycarJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_boycar"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (boycarJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Boycar) CleanUp() error { return nil }
-
-type boycarProperties struct {
-	fairwayMarksProperties
-	Colour *string `json:"hydro_colour"`
-	Colpat *string `json:"hydro_colpat"`
-	Conrad *int    `json:"hydro_conrad"`
-	Marsys *int    `json:"hydro_marsys"`
-	Boyshp *int    `json:"hydro_boyshp"`
-	Catcam *int    `json:"hydro_catcam"`
-}
-
-type boycarFeaturetype struct {
-	geom  pointSlice
-	props *boycarProperties
-}
-
-// Do executes the actual import.
-func (fm *Boycar) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(boycarProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &boycarFeaturetype{p, props.(*boycarProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type BOYCAR")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("boycar",
-					"colour", "colpat", "conrad",
-					"marsys", "boyshp", "catcam"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*boycarFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Conrad,
-						f.props.Marsys,
-						f.props.Boyshp,
-						f.props.Catcam,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility_areas
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_boylat.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,182 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Boylat is an import job to import
-// fairway marks of type BOYLAT in form of point geometries
-// and attribute data from a WFS service.
-type Boylat struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (boylat *Boylat) Description() (string, error) {
-	return boylat.URL + "|" + boylat.FeatureType, nil
-}
-
-// BOYLATJobKind is the import queue type identifier.
-const BOYLATJobKind JobKind = "fm_boylat"
-
-type boylatJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(BOYLATJobKind, boylatJobCreator{})
-}
-
-func (boylatJobCreator) Description() string { return "fairway marks boylat" }
-
-func (boylatJobCreator) AutoAccept() bool { return true }
-
-func (boylatJobCreator) Create() Job { return new(Boylat) }
-
-func (boylatJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_boylat"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (boylatJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Boylat) CleanUp() error { return nil }
-
-type boylatProperties struct {
-	fairwayMarksProperties
-	Colour      *string `json:"hydro_colour"`
-	Colpat      *string `json:"hydro_colpat"`
-	Conrad      *int    `json:"hydro_conrad"`
-	HydroMarsys *int64  `json:"hydro_marsys,omitempty"`
-	IENCMarsys  *int64  `json:"ienc_marsys,omitempty"`
-	Boyshp      *int    `json:"hydro_boyshp"`
-	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
-	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
-}
-
-type boylatFeaturetype struct {
-	geom  pointSlice
-	props *boylatProperties
-}
-
-// Do executes the actual import.
-func (fm *Boylat) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(boylatProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &boylatFeaturetype{p, props.(*boylatProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type BOYLAT")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("boylat",
-					"colour", "colpat", "conrad",
-					"marsys", "boyshp", "catlam"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*boylatFeaturetype)
-
-				var marsys sql.NullInt64
-				if f.props.HydroMarsys != nil {
-					marsys = sql.NullInt64{
-						Int64: *f.props.HydroMarsys, Valid: true}
-				} else if f.props.IENCMarsys != nil {
-					marsys = sql.NullInt64{
-						Int64: *f.props.IENCMarsys, Valid: true}
-				}
-
-				var catlam sql.NullInt64
-				if f.props.HydroCatlam != nil {
-					catlam = sql.NullInt64{
-						Int64: *f.props.HydroCatlam, Valid: true}
-				} else if f.props.IENCCatlam != nil {
-					catlam = sql.NullInt64{
-						Int64: *f.props.IENCCatlam, Valid: true}
-				}
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Conrad,
-						marsys,
-						f.props.Boyshp,
-						catlam,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility_areas
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_boysaw.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,159 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Boysaw is an import job to import
-// fairway marks of type BOYSAW in form of point geometries
-// and attribute data from a WFS service.
-type Boysaw struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (boysaw *Boysaw) Description() (string, error) {
-	return boysaw.URL + "|" + boysaw.FeatureType, nil
-}
-
-// BOYSAWJobKind is the import queue type identifier.
-const BOYSAWJobKind JobKind = "fm_boysaw"
-
-type boysawJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(BOYSAWJobKind, boysawJobCreator{})
-}
-
-func (boysawJobCreator) Description() string { return "fairway marks boysaw" }
-
-func (boysawJobCreator) AutoAccept() bool { return true }
-
-func (boysawJobCreator) Create() Job { return new(Boysaw) }
-
-func (boysawJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_boysaw"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (boysawJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Boysaw) CleanUp() error { return nil }
-
-type boysawProperties struct {
-	fairwayMarksProperties
-	Colour *string `json:"hydro_colour"`
-	Colpat *string `json:"hydro_colpat"`
-	Conrad *int    `json:"hydro_conrad"`
-	Marsys *int64  `json:"hydro_marsys"`
-	Boyshp *int    `json:"hydro_boyshp"`
-}
-
-type boysawFeaturetype struct {
-	geom  pointSlice
-	props *boysawProperties
-}
-
-// Do executes the actual import.
-func (fm *Boysaw) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(boysawProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &boysawFeaturetype{p, props.(*boysawProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type BOYSAW")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("boysaw",
-					"colour", "colpat", "conrad", "marsys", "boyshp"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*boysawFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Conrad,
-						f.props.Marsys,
-						f.props.Boyshp,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility_areas
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_boyspp.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,162 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Boyspp is an import job to import
-// fairway marks of type BOYSPP in form of point geometries
-// and attribute data from a WFS service.
-type Boyspp struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (boyspp *Boyspp) Description() (string, error) {
-	return boyspp.URL + "|" + boyspp.FeatureType, nil
-}
-
-// BOYSPPJobKind is the import queue type identifier.
-const BOYSPPJobKind JobKind = "fm_boyspp"
-
-type boysppJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(BOYSPPJobKind, boysppJobCreator{})
-}
-
-func (boysppJobCreator) Description() string { return "fairway marks boyspp" }
-
-func (boysppJobCreator) AutoAccept() bool { return true }
-
-func (boysppJobCreator) Create() Job { return new(Boyspp) }
-
-func (boysppJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_boyspp"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (boysppJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Boyspp) CleanUp() error { return nil }
-
-type boysppProperties struct {
-	fairwayMarksProperties
-	Colour *string `json:"hydro_colour"`
-	Colpat *string `json:"hydro_colpat"`
-	Conrad *int    `json:"hydro_conrad"`
-	Marsys *int64  `json:"hydro_marsys"`
-	Boyshp *int    `json:"hydro_boyshp"`
-	Catspm *string `json:"hydro_catspm"`
-}
-
-type boysppFeaturetype struct {
-	geom  pointSlice
-	props *boysppProperties
-}
-
-// Do executes the actual import.
-func (fm *Boyspp) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(boysppProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &boysppFeaturetype{p, props.(*boysppProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type BOYSPP")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("boyspp",
-					"colour", "colpat", "conrad",
-					"marsys", "boyshp", "catspm"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*boysppFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Conrad,
-						f.props.Marsys,
-						f.props.Boyshp,
-						f.props.Catspm,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility_areas
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_daymar.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,193 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-	"strings"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Daymar is an import job to import
-// fairway marks of type DAYMAR in form of point geometries
-// and attribute data from a WFS service.
-type Daymar struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (daymar *Daymar) Description() (string, error) {
-	return daymar.URL + "|" + daymar.FeatureType, nil
-}
-
-// DAYMARJobKind is the import queue type identifier.
-const DAYMARJobKind JobKind = "fm_daymar"
-
-type daymarJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(DAYMARJobKind, daymarJobCreator{})
-}
-
-func (daymarJobCreator) Description() string { return "fairway marks daymar" }
-
-func (daymarJobCreator) AutoAccept() bool { return true }
-
-func (daymarJobCreator) Create() Job { return new(Daymar) }
-
-func (daymarJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_daymar"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (daymarJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Daymar) CleanUp() error { return nil }
-
-type daymarProperties struct {
-	fairwayMarksProperties
-	Colour *string  `json:"hydro_colour"`
-	Colpat *string  `json:"hydro_colpat"`
-	Condtn *int     `json:"hydro_condtn"`
-	Dirimp *string  `json:"ienc_dirimp,omitempty"`
-	Topshp *int     `json:"hydro_topshp"`
-	Orient *float64 `json:"hydro_orient,omitempty"`
-}
-
-type daymarFeaturetype struct {
-	geom  pointSlice
-	props *daymarProperties
-}
-
-const (
-	insertDaymarDirimpSQL = `
-INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
-  VALUES ($1, $2)
-`
-)
-
-// Do executes the actual import.
-func (fm *Daymar) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(daymarProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &daymarFeaturetype{p, props.(*daymarProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type DAYMAR/daymar")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("daymar",
-					"colour", "colpat", "condtn", "topshp", "orient"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			insertDaymarDirimpStmt, err := tx.PrepareContext(
-				ctx, insertDaymarDirimpSQL)
-			if err != nil {
-				return
-			}
-			defer insertDaymarDirimpStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*daymarFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Condtn,
-						f.props.Topshp,
-						f.props.Orient,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-					// TODO: handle eventual changes to dirimp
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-
-					if f.props.Dirimp != nil && *f.props.Dirimp != "" {
-						dirimps := strings.Split(*f.props.Dirimp, ",")
-						for _, dirimp := range dirimps {
-							if err := savepoint(func() error {
-								_, err := insertDaymarDirimpStmt.ExecContext(
-									ctx, fmid, dirimp)
-								return err
-							}); err != nil {
-								feedback.Warn(
-									pgxutils.ReadableError{Err: err}.Error())
-								feedback.Info(
-									"Tried to import '%s' as dirimp value",
-									dirimp)
-							}
-						}
-					}
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_lights.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,181 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Lights is an import job to import
-// fairway marks of type LIGHTS in form of point geometries
-// and attribute data from a WFS service.
-type Lights struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (lights *Lights) Description() (string, error) {
-	return lights.URL + "|" + lights.FeatureType, nil
-}
-
-// LIGHTSJobKind is the import queue type identifier.
-const LIGHTSJobKind JobKind = "fm_lights"
-
-type lightsJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(LIGHTSJobKind, lightsJobCreator{})
-}
-
-func (lightsJobCreator) Description() string { return "fairway marks lights" }
-
-func (lightsJobCreator) AutoAccept() bool { return true }
-
-func (lightsJobCreator) Create() Job { return new(Lights) }
-
-func (lightsJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_lights"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (lightsJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Lights) CleanUp() error { return nil }
-
-type lightsProperties struct {
-	fairwayMarksProperties
-	Colour *string  `json:"hydro_colour"`
-	Condtn *int     `json:"hydro_condtn"`
-	Orient *float64 `json:"hydro_orient"`
-	Catlit *string  `json:"hydro_catlit"`
-	Exclit *int     `json:"hydro_exclit"`
-	Litchr *int     `json:"hydro_litchr"`
-	Litvis *string  `json:"hydro_litvis"`
-	Mltylt *int     `json:"hydro_mltylt"`
-	Sectr1 *float64 `json:"hydro_sectr1"`
-	Sectr2 *float64 `json:"hydro_sectr2"`
-	Siggrp *string  `json:"hydro_siggrp"`
-	Sigper *float64 `json:"hydro_sigper"`
-	Sigseq *string  `json:"hydro_sigseq"`
-	Status *string  `json:"hydro_status"`
-}
-
-type lightsFeaturetype struct {
-	geom  pointSlice
-	props *lightsProperties
-}
-
-// Do executes the actual import.
-func (fm *Lights) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(lightsProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &lightsFeaturetype{p, props.(*lightsProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type LIGHTS")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("lights",
-					"colour", "condtn", "orient",
-					"catlit", "exclit", "litchr",
-					"litvis", "mltylt", "sectr1",
-					"sectr2", "siggrp", "sigper",
-					"sigseq", "status"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*lightsFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Condtn,
-						f.props.Orient,
-						f.props.Catlit,
-						f.props.Exclit,
-						f.props.Litchr,
-						f.props.Litvis,
-						f.props.Mltylt,
-						f.props.Sectr1,
-						f.props.Sectr2,
-						f.props.Siggrp,
-						f.props.Sigper,
-						f.props.Sigseq,
-						f.props.Status,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_notmrk.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,210 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-	"strings"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Notmrk is an import job to import
-// fairway marks of type NOTMRK in form of point geometries
-// and attribute data from a WFS service.
-type Notmrk struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (notmrk *Notmrk) Description() (string, error) {
-	return notmrk.URL + "|" + notmrk.FeatureType, nil
-}
-
-// NOTMRKJobKind is the import queue type identifier.
-const NOTMRKJobKind JobKind = "fm_notmrk"
-
-type notmrkJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(NOTMRKJobKind, notmrkJobCreator{})
-}
-
-func (notmrkJobCreator) Description() string { return "fairway marks notmrk" }
-
-func (notmrkJobCreator) AutoAccept() bool { return true }
-
-func (notmrkJobCreator) Create() Job { return new(Notmrk) }
-
-func (notmrkJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_notmrk"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (notmrkJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Notmrk) CleanUp() error { return nil }
-
-type notmrkProperties struct {
-	fairwayMarksProperties
-	Condtn *int     `json:"hydro_condtn"`
-	Marsys *int     `json:"hydro_bcnshp"`
-	Dirimp *string  `json:"ienc_dirimp"`
-	Orient *float64 `json:"hydro_orient"`
-	Status *string  `json:"hydro_status"`
-	Addmrk *string  `json:"ienc_addmrk"`
-	Catnmk *int     `json:"ienc_catnmk"`
-	Disipd *float64 `json:"ienc_disipd"`
-	Disipu *float64 `json:"ienc_disipu"`
-	Disbk1 *float64 `json:"ienc_disbk1"`
-	Disbk2 *float64 `json:"ienc_disbk2"`
-	Fnctnm *int     `json:"ienc_fnctnm"`
-	Bnkwtw *int     `json:"ienc_bnkwtw"`
-}
-
-type notmrkFeaturetype struct {
-	geom  pointSlice
-	props *notmrkProperties
-}
-
-const (
-	insertNotmrkDirimpSQL = `
-INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
-  VALUES ($1, $2)
-`
-)
-
-// Do executes the actual import.
-func (fm *Notmrk) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(notmrkProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &notmrkFeaturetype{p, props.(*notmrkProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type NOTMRK")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("notmrk",
-					"condtn", "marsys", "orient",
-					"status", "addmrk", "catnmk",
-					"disipd", "disipu", "disbk1",
-					"disbk2", "fnctnm", "bnkwtw"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			insertNotmrkDirimpStmt, err := tx.PrepareContext(
-				ctx, insertNotmrkDirimpSQL)
-			if err != nil {
-				return
-			}
-			defer insertNotmrkDirimpStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*notmrkFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Condtn,
-						f.props.Marsys,
-						f.props.Orient,
-						f.props.Status,
-						f.props.Addmrk,
-						f.props.Catnmk,
-						f.props.Disipd,
-						f.props.Disipu,
-						f.props.Disbk1,
-						f.props.Disbk2,
-						f.props.Fnctnm,
-						f.props.Bnkwtw,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-					// TODO: handle eventual changes to dirimp
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-
-					if f.props.Dirimp != nil && *f.props.Dirimp != "" {
-						dirimps := strings.Split(*f.props.Dirimp, ",")
-						for _, dirimp := range dirimps {
-							if err := savepoint(func() error {
-								_, err := insertNotmrkDirimpStmt.ExecContext(
-									ctx, fmid, dirimp)
-								return err
-							}); err != nil {
-								feedback.Warn(
-									pgxutils.ReadableError{Err: err}.Error())
-								feedback.Info(
-									"Tried to import '%s' as dirimp value",
-									dirimp)
-							}
-						}
-					}
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_rtpbcn.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,157 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Rtpbcn is an import job to import
-// fairway marks of type RTPBCN in form of point geometries
-// and attribute data from a WFS service.
-type Rtpbcn struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (rtpbcn *Rtpbcn) Description() (string, error) {
-	return rtpbcn.URL + "|" + rtpbcn.FeatureType, nil
-}
-
-// RTPBCNJobKind is the import queue type identifier.
-const RTPBCNJobKind JobKind = "fm_rtpbcn"
-
-type rtpbcnJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(RTPBCNJobKind, rtpbcnJobCreator{})
-}
-
-func (rtpbcnJobCreator) Description() string { return "fairway marks rtpbcn" }
-
-func (rtpbcnJobCreator) AutoAccept() bool { return true }
-
-func (rtpbcnJobCreator) Create() Job { return new(Rtpbcn) }
-
-func (rtpbcnJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_rtpbcn"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (rtpbcnJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Rtpbcn) CleanUp() error { return nil }
-
-type rtpbcnProperties struct {
-	fairwayMarksProperties
-	Condtn *int    `json:"hydro_condtn"`
-	Siggrp *string `json:"hydro_siggrp"`
-	Catrtb *int    `json:"hydro_catrtb"`
-	Radwal *string `json:"hydro_radwal"`
-}
-
-type rtpbcnFeaturetype struct {
-	geom  pointSlice
-	props *rtpbcnProperties
-}
-
-// Do executes the actual import.
-func (fm *Rtpbcn) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(rtpbcnProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &rtpbcnFeaturetype{p, props.(*rtpbcnProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type RTPBCN")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("rtpbcn",
-					"condtn", "siggrp", "catrtb", "radwal"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*rtpbcnFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Condtn,
-						f.props.Siggrp,
-						f.props.Catrtb,
-						f.props.Radwal,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/fm_topmar.go	Tue Feb 18 13:00:25 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,157 +0,0 @@
-// This is Free Software under GNU Affero General Public License v >= 3.0
-// without warranty, see README.md and license for details.
-//
-// SPDX-License-Identifier: AGPL-3.0-or-later
-// License-Filename: LICENSES/AGPL-3.0.txt
-//
-// Copyright (C) 2020 by via donau
-//   – Österreichische Wasserstraßen-Gesellschaft mbH
-// Software engineering by Intevation GmbH
-//
-// Author(s):
-//  * Tom Gottfried <tom.gottfried@intevation.de>
-
-package imports
-
-import (
-	"context"
-	"database/sql"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-)
-
-// Topmar is an import job to import
-// fairway marks of type TOPMAR in form of point geometries
-// and attribute data from a WFS service.
-type Topmar struct {
-	FairwayMarks
-}
-
-// Description gives a short info about relevant facts of this import.
-func (topmar *Topmar) Description() (string, error) {
-	return topmar.URL + "|" + topmar.FeatureType, nil
-}
-
-// TOPMARJobKind is the import queue type identifier.
-const TOPMARJobKind JobKind = "fm_topmar"
-
-type topmarJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(TOPMARJobKind, topmarJobCreator{})
-}
-
-func (topmarJobCreator) Description() string { return "fairway marks topmar" }
-
-func (topmarJobCreator) AutoAccept() bool { return true }
-
-func (topmarJobCreator) Create() Job { return new(Topmar) }
-
-func (topmarJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks_topmar"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (topmarJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for fairway marks imports is a NOP.
-func (*Topmar) CleanUp() error { return nil }
-
-type topmarProperties struct {
-	fairwayMarksProperties
-	Colour *string `json:"hydro_colour"`
-	Colpat *string `json:"hydro_colpat"`
-	Condtn *int    `json:"hydro_condtn"`
-	Topshp *int    `json:"hydro_topshp"`
-}
-
-type topmarFeaturetype struct {
-	geom  pointSlice
-	props *topmarProperties
-}
-
-// Do executes the actual import.
-func (fm *Topmar) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	err := getFMFeatures(
-		ctx,
-		conn,
-		feedback,
-		fm.FairwayMarks,
-		func() interface{} { return new(topmarProperties) },
-		func(p pointSlice, props interface{}) interface{} {
-			return &topmarFeaturetype{p, props.(*topmarProperties)}
-		},
-		func(
-			tx *sql.Tx, epsg int, fms []interface{},
-		) (outsideOrDup int, features int, err error) {
-
-			feedback.Info("Store fairway marks of type TOPMAR")
-
-			insertStmt, err := tx.PrepareContext(
-				ctx,
-				getFMInsertSQL("topmar",
-					"colour", "colpat", "condtn", "topshp"),
-			)
-			if err != nil {
-				return
-			}
-			defer insertStmt.Close()
-
-			savepoint := Savepoint(ctx, tx, "feature")
-
-			for _, fm := range fms {
-
-				f := fm.(*topmarFeaturetype)
-
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						f.geom.asWKB(),
-						epsg,
-						f.props.Datsta,
-						f.props.Datend,
-						f.props.Persta,
-						f.props.Perend,
-						f.props.Objnam,
-						f.props.Nobjnm,
-						f.props.Inform,
-						f.props.Ninfom,
-						f.props.Scamin,
-						f.props.Picrep,
-						f.props.Txtdsc,
-						f.props.Sordat,
-						f.props.Sorind,
-						f.props.Colour,
-						f.props.Colpat,
-						f.props.Condtn,
-						f.props.Topshp,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility area or a duplicate
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
-			}
-			return
-		})
-
-	return nil, err
-}
--- a/pkg/imports/modelconvert.go	Tue Feb 18 13:00:25 2020 +0100
+++ b/pkg/imports/modelconvert.go	Wed Feb 19 15:08:35 2020 +0100
@@ -27,16 +27,16 @@
 	DMVJobKind:    func() interface{} { return new(models.DistanceMarksVirtualImport) },
 	FDJobKind:     func() interface{} { return new(models.FairwayDimensionImport) },
 	DMAJobKind:    func() interface{} { return new(models.DistanceMarksAshoreImport) },
-	BCNLATJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	BOYCARJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	BOYLATJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	BOYSAWJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	BOYSPPJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	DAYMARJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	LIGHTSJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	RTPBCNJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	TOPMARJobKind: func() interface{} { return new(models.FairwayMarksImport) },
-	NOTMRKJobKind: func() interface{} { return new(models.FairwayMarksImport) },
+	BCNLATJobKind: func() interface{} { return FindJobCreator(BCNLATJobKind).Create() },
+	BOYCARJobKind: func() interface{} { return FindJobCreator(BOYCARJobKind).Create() },
+	BOYLATJobKind: func() interface{} { return FindJobCreator(BOYLATJobKind).Create() },
+	BOYSAWJobKind: func() interface{} { return FindJobCreator(BOYSAWJobKind).Create() },
+	BOYSPPJobKind: func() interface{} { return FindJobCreator(BOYSPPJobKind).Create() },
+	DAYMARJobKind: func() interface{} { return FindJobCreator(DAYMARJobKind).Create() },
+	LIGHTSJobKind: func() interface{} { return FindJobCreator(LIGHTSJobKind).Create() },
+	RTPBCNJobKind: func() interface{} { return FindJobCreator(RTPBCNJobKind).Create() },
+	TOPMARJobKind: func() interface{} { return FindJobCreator(TOPMARJobKind).Create() },
+	NOTMRKJobKind: func() interface{} { return FindJobCreator(NOTMRKJobKind).Create() },
 	STJobKind:     func() interface{} { return new(models.StretchImport) },
 	SECJobKind:    func() interface{} { return new(models.SectionImport) },
 	DSECJobKind:   func() interface{} { return new(models.SectionDelete) },
@@ -146,116 +146,6 @@
 		}
 	},
 
-	BCNLATJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	BOYCARJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	BOYLATJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	BOYSAWJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	BOYSPPJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	DAYMARJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	LIGHTSJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	RTPBCNJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	TOPMARJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
-	NOTMRKJobKind: func(input interface{}) interface{} {
-		fmi := input.(*models.FairwayMarksImport)
-		return &FairwayMarks{
-			URL:         fmi.URL,
-			FeatureType: fmi.FeatureType,
-			SortBy:      nilString(fmi.SortBy),
-			User:        nilString(fmi.User),
-			Password:    nilString(fmi.Password),
-		}
-	},
-
 	STJobKind: func(input interface{}) interface{} {
 		sti := input.(*models.StretchImport)
 		return &Stretch{
@@ -316,7 +206,7 @@
 func ConvertToInternal(kind JobKind, src interface{}) interface{} {
 	fn := convertModel[kind]
 	if fn == nil {
-		return nil
+		return src
 	}
 	return fn(src)
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/pointwfs.go	Wed Feb 19 15:08:35 2020 +0100
@@ -0,0 +1,323 @@
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/models"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+var (
+	ErrFeatureIgnored    = errors.New("feature ignored")
+	ErrFeatureDuplicated = errors.New("feature duplicated")
+)
+
+type (
+	WFSPointConsumer interface {
+		Commit() error
+		Rollback() error
+
+		NewProperties() interface{}
+		Consume(points pointSlice, properties interface{}, epsg int) error
+	}
+
+	PointWFSJobCreator struct {
+		description string
+		depends     [2][]string
+
+		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error)
+	}
+
+	PointWFSJob struct {
+		models.WFSImport
+		creator *PointWFSJobCreator
+	}
+)
+
+func (pwjc *PointWFSJobCreator) Description() string {
+	return pwjc.description
+}
+
+func (pwjc *PointWFSJobCreator) Depends() [2][]string {
+	return pwjc.depends
+}
+
+func (*PointWFSJobCreator) AutoAccept() bool {
+	return true
+}
+
+// StageDone is a NOP for WFS imports.
+func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+func (pwjc *PointWFSJobCreator) Create() Job {
+	return &PointWFSJob{creator: pwjc}
+}
+
+// Description gives a short info about relevant facts of this import.
+func (pwj *PointWFSJob) Description() (string, error) {
+	return pwj.URL + "|" + pwj.FeatureType, nil
+}
+
+// CleanUp for WFS imports is a NOP.
+func (*PointWFSJob) CleanUp() error {
+	return nil
+}
+
+func (pwj *PointWFSJob) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import %s", pwj.creator.Description())
+
+	feedback.Info("Loading capabilities from %s", pwj.URL)
+	caps, err := wfs.GetCapabilities(pwj.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(pwj.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", pwj.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if nilString(pwj.SortBy) != "" {
+		feedback.Info("Features will be sorted by '%s'", pwj.SortBy)
+	}
+
+	dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy))
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		dupes             int
+		features          int
+	)
+
+	consumer, err := pwj.creator.newConsumer(ctx, conn, feedback)
+	if err != nil {
+		return nil, err
+	}
+	defer consumer.Rollback()
+
+	if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error {
+		feedback.Info("Get features from: '%s'", url)
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			props := consumer.NewProperties()
+			if err := json.Unmarshal(*feature.Properties, props); err != nil {
+				badProperties++
+				continue
+			}
+
+			switch feature.Geometry.Type {
+			case "Point":
+				var p pointSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+
+				err := consumer.Consume(p, props, epsg)
+				switch {
+				case err == ErrFeatureDuplicated:
+					dupes++
+				case err == ErrFeatureIgnored:
+					// be silent
+				case err != nil:
+					return err
+				default:
+					features++
+				}
+
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	if dupes > 0 {
+		feedback.Info(
+			"Features outside responsibility area and duplicates: %d",
+			dupes)
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if features == 0 {
+		return nil, UnchangedError("no valid new features found")
+	}
+
+	feedback.Info("Found %d usable features in data source", features)
+
+	if err = consumer.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, nil
+}
+
+type (
+	SQLPointConsumer struct {
+		ctx           context.Context
+		tx            *sql.Tx
+		feedback      Feedback
+		newProperties func() interface{}
+		consume       func(*SQLPointConsumer, pointSlice, interface{}, int) error
+		savepoint     func(func() error) error
+		stmts         []*sql.Stmt
+	}
+)
+
+func (spc *SQLPointConsumer) Rollback() error {
+	if tx := spc.tx; tx != nil {
+		spc.releaseStmts()
+		spc.tx = nil
+		spc.ctx = nil
+		return tx.Rollback()
+	}
+	return nil
+}
+
+func (spc *SQLPointConsumer) Commit() error {
+	if tx := spc.tx; tx != nil {
+		spc.releaseStmts()
+		spc.tx = nil
+		spc.ctx = nil
+		return tx.Commit()
+	}
+	return nil
+}
+
+func (spc *SQLPointConsumer) NewProperties() interface{} {
+	return spc.newProperties()
+}
+
+func (spc *SQLPointConsumer) Consume(
+	points pointSlice,
+	properties interface{},
+	epsg int,
+) error {
+	return spc.consume(spc, points, properties, epsg)
+}
+
+func newSQLConsumer(
+	init func(*SQLPointConsumer) error,
+	consume func(*SQLPointConsumer, pointSlice, interface{}, int) error,
+	newProperties func() interface{},
+
+) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) {
+	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+		spc := &SQLPointConsumer{
+			ctx:           ctx,
+			tx:            tx,
+			feedback:      feedback,
+			newProperties: newProperties,
+			consume:       consume,
+			savepoint:     Savepoint(ctx, tx, "feature"),
+		}
+		if err := init(spc); err != nil {
+			tx.Rollback()
+			return nil, err
+		}
+		return spc, nil
+	}
+}
+
+func (spc *SQLPointConsumer) releaseStmts() {
+	for i := len(spc.stmts); i > 0; i-- {
+		spc.stmts[i-1].Close()
+		spc.stmts[i-1] = nil
+	}
+	spc.stmts = nil
+}
+
+func prepareStmnts(queries ...string) func(*SQLPointConsumer) error {
+	return func(spc *SQLPointConsumer) error {
+		for _, query := range queries {
+			stmt, err := spc.tx.PrepareContext(spc.ctx, query)
+			if err != nil {
+				return err
+			}
+			spc.stmts = append(spc.stmts, stmt)
+		}
+		return nil
+	}
+}
--- a/pkg/models/imports.go	Tue Feb 18 13:00:25 2020 +0100
+++ b/pkg/models/imports.go	Wed Feb 19 15:08:35 2020 +0100
@@ -78,11 +78,6 @@
 		WFSImport
 	}
 
-	// FairwayMarksImport specifies an import of fairway marks.
-	FairwayMarksImport struct {
-		WFSImport
-	}
-
 	// FairwayDimensionImport specifies an import of the waterway axis.
 	FairwayDimensionImport struct {
 		WFSImport