view pkg/imports/fm.go @ 4973:adeb4f8c0d6c fairway-marks-import

Honour the fact that structs.Values() can access only exported fields Promoted fields not being handed over to the prepared statements lead to lots of errors due to wrong argument counts. Still an issue: since Dirimp is handled specially, some prepared statements now get one argument too much.
author Tom Gottfried <tom@intevation.de>
date Fri, 28 Feb 2020 20:00:05 +0100
parents 0e180d651fc6
children e1d8217954a0
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"database/sql"
	"fmt"
	"strings"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"github.com/fatih/structs"
)

type (
	// Properties common to all types of fairway marks
	FairwayMarksProperties struct {
		Datsta *string `json:"hydro_datsta"`
		Datend *string `json:"hydro_datend"`
		Persta *string `json:"hydro_persta"`
		Perend *string `json:"hydro_perend"`
		Objnam *string `json:"hydro_objnam"`
		Nobjnm *string `json:"hydro_nobjnm"`
		Inform *string `json:"hydro_inform"`
		Ninfom *string `json:"hydro_ninfom"`
		Scamin *int    `json:"hydro_scamin"`
		Picrep *string `json:"hydro_picrep"`
		Txtdsc *string `json:"hydro_txtdsc"`
		Sordat *string `json:"hydro_sordat"`
		Sorind *string `json:"hydro_sorind"`
	}

	BcnlatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Bcnshp *int    `json:"hydro_bcnshp"`
	}

	bcnlatHydroProperties struct {
		BcnlatProperties
		Catlam *int64 `json:"hydro_catlam"`
	}

	bcnlatIencProperties struct {
		BcnlatProperties
		Catlam *int64  `json:"ienc_catlam"`
		Dirimp *string `json:"ienc_dirimp"`
	}

	BoylatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boylatHydroProperties struct {
		BoylatProperties
		Marsys *int64 `json:"hydro_marsys"`
		Catlam *int64 `json:"hydro_catlam"`
	}

	boylatIencProperties struct {
		BoylatProperties
		Marsys *int64 `json:"ienc_marsys"`
		Catlam *int64 `json:"ienc_catlam"`
	}

	boycarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int    `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catcam *int    `json:"hydro_catcam"`
	}

	boysawProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boysppProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catspm *string `json:"hydro_catspm"`
	}

	DaymarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}

	daymarHydroProperties struct {
		DaymarProperties
	}

	daymarIencProperties struct {
		DaymarProperties
		Dirimp *string  `json:"ienc_dirimp"`
		Orient *float64 `json:"hydro_orient"`
	}

	lightsProperties struct {
		FairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Condtn *int     `json:"hydro_condtn"`
		Orient *float64 `json:"hydro_orient"`
		Catlit *string  `json:"hydro_catlit"`
		Exclit *int     `json:"hydro_exclit"`
		Litchr *int     `json:"hydro_litchr"`
		Litvis *string  `json:"hydro_litvis"`
		Mltylt *int     `json:"hydro_mltylt"`
		Sectr1 *float64 `json:"hydro_sectr1"`
		Sectr2 *float64 `json:"hydro_sectr2"`
		Siggrp *string  `json:"hydro_siggrp"`
		Sigper *float64 `json:"hydro_sigper"`
		Sigseq *string  `json:"hydro_sigseq"`
		Status *string  `json:"hydro_status"`
	}

	notmrkProperties struct {
		FairwayMarksProperties
		Condtn *int     `json:"hydro_condtn"`
		Marsys *int     `json:"hydro_bcnshp"`
		Dirimp *string  `json:"ienc_dirimp"`
		Orient *float64 `json:"hydro_orient"`
		Status *string  `json:"hydro_status"`
		Addmrk *string  `json:"ienc_addmrk"`
		Catnmk *int     `json:"ienc_catnmk"`
		Disipd *float64 `json:"ienc_disipd"`
		Disipu *float64 `json:"ienc_disipu"`
		Disbk1 *float64 `json:"ienc_disbk1"`
		Disbk2 *float64 `json:"ienc_disbk2"`
		Fnctnm *int     `json:"ienc_fnctnm"`
		Bnkwtw *int     `json:"ienc_bnkwtw"`
	}

	rtpbcnProperties struct {
		FairwayMarksProperties
		Condtn *int    `json:"hydro_condtn"`
		Siggrp *string `json:"hydro_siggrp"`
		Catrtb *int    `json:"hydro_catrtb"`
		Radwal *string `json:"hydro_radwal"`
	}

	topmarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}
)

func (props *bcnlatIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *daymarIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *notmrkProperties) attrib() *string {
	return props.Dirimp
}

const (
	BCNLATHYDROJobKind JobKind = "fm_bcnlat_hydro"
	BCNLATIENCJobKind  JobKind = "fm_bcnlat_ienc"
	BOYLATHYDROJobKind JobKind = "fm_boylat_hydro"
	BOYLATIENCJobKind  JobKind = "fm_boylat_ienc"
	BOYCARJobKind      JobKind = "fm_boycar"
	BOYSAWJobKind      JobKind = "fm_boysaw"
	BOYSPPJobKind      JobKind = "fm_boyspp"
	DAYMARHYDROJobKind JobKind = "fm_daymar_hydro"
	DAYMARIENCJobKind  JobKind = "fm_daymar_ienc"
	LIGHTSJobKind      JobKind = "fm_lights"
	NOTMRKJobKind      JobKind = "fm_notmrk"
	RTPBCNJobKind      JobKind = "fm_rtpbcn"
	TOPMARJobKind      JobKind = "fm_topmar"
)

func init() {
	RegisterJobCreator(BCNLATHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks bcnlat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_hydro",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
				),
				consume,
				createInvalidation("bcnlat_hydro"),
				func() interface{} { return new(bcnlatHydroProperties) },
			),
		})

	RegisterJobCreator(BCNLATIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks bcnlat (IENC)",
			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_ienc",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
					insertBcnlatDirimpSQL,
				),
				consume,
				createInvalidation("bcnlat_ienc"),
				func() interface{} { return new(bcnlatIencProperties) },
			),
		})

	RegisterJobCreator(BOYLATHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boylat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_boylat_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_hydro",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_hydro"),
				func() interface{} { return new(boylatHydroProperties) },
			),
		})

	RegisterJobCreator(BOYLATIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boylat (IENC)",
			depends:     [2][]string{{"fairway_marks_boylat_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_ienc",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_ienc"),
				func() interface{} { return new(boylatIencProperties) },
			),
		})

	RegisterJobCreator(BOYCARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boycar",
			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boycar",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catcam"),
				),
				consume,
				createInvalidation("boycar"),
				func() interface{} { return new(boycarProperties) },
			),
		})

	RegisterJobCreator(BOYSAWJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boysaw",
			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boysaw",
						"colour", "colpat", "conrad", "marsys", "boyshp"),
				),
				consume,
				createInvalidation("boysaw"),
				func() interface{} { return new(boysawProperties) },
			),
		})

	RegisterJobCreator(BOYSPPJobKind,
		&PointWFSJobCreator{
			description: "fairway marks boyspp",
			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boyspp",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catspm"),
				),
				consume,
				createInvalidation("boyspp"),
				func() interface{} { return new(boysppProperties) },
			),
		})

	RegisterJobCreator(DAYMARHYDROJobKind,
		&PointWFSJobCreator{
			description: "fairway marks daymar (HYDRO)",
			depends:     [2][]string{{"fairway_marks_daymar_hydro"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_hydro",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("daymar_hydro"),
				func() interface{} { return new(daymarHydroProperties) },
			),
		})

	RegisterJobCreator(DAYMARIENCJobKind,
		&PointWFSJobCreator{
			description: "fairway marks daymar (IENC)",
			depends:     [2][]string{{"fairway_marks_daymar_ienc"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_ienc",
						"colour", "colpat", "condtn", "topshp", "orient"),
					insertDaymarDirimpSQL,
				),
				consume,
				createInvalidation("daymar_ienc"),
				func() interface{} { return new(daymarIencProperties) },
			),
		})

	RegisterJobCreator(LIGHTSJobKind,
		&PointWFSJobCreator{
			description: "fairway marks lights",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("lights",
						"colour", "condtn", "orient",
						"catlit", "exclit", "litchr",
						"litvis", "mltylt", "sectr1",
						"sectr2", "siggrp", "sigper",
						"sigseq", "status"),
				),
				consume,
				createInvalidation("lights"),
				func() interface{} { return new(lightsProperties) },
			),
		})

	RegisterJobCreator(NOTMRKJobKind,
		&PointWFSJobCreator{
			description: "fairway marks notmrk",
			depends:     [2][]string{{"fairway_marks_lights"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("notmrk",
						"condtn", "marsys", "orient",
						"status", "addmrk", "catnmk",
						"disipd", "disipu", "disbk1",
						"disbk2", "fnctnm", "bnkwtw"),
					insertNotmrkDirimpSQL,
				),
				consume,
				createInvalidation("notmrk"),
				func() interface{} { return new(notmrkProperties) },
			),
		})

	RegisterJobCreator(RTPBCNJobKind,
		&PointWFSJobCreator{
			description: "fairway marks rtpbcn",
			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("rtpbcn",
						"condtn", "siggrp", "catrtb", "radwal"),
				),
				consume,
				createInvalidation("rtpbcn"),
				func() interface{} { return new(rtpbcnProperties) },
			),
		})

	RegisterJobCreator(TOPMARJobKind,
		&PointWFSJobCreator{
			description: "fairway marks topmar",
			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("topmar",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("topmar"),
				func() interface{} { return new(topmarProperties) },
			),
		})
}

const (
	// Format string to be completed with type and additional attributes
	/* Instead of the row comparisons in the WHERE clauses
	of the CTE with the UPDATE and the INSERT ... SELECT, we could have
	used the row-based UNIQUE indexes as arbiter indexes
	in an INSERT ... ON CONFLICT ... DO UPDATE, but that turned out
	to be able to bypass the UNIQUE index in some cases.
	*/
	insertFMSQLtmpl = `
WITH a AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT newfm
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
),
t AS (
  -- Currently valid and otherwise identical entry's validity.
  /* If there are no intermittent updates of validity,
     there will always be only one currently valid and
     otherwise identical entry. */
  UPDATE waterway.fairway_marks_%[1]s SET last_found = current_timestamp
  WHERE validity @> current_timestamp
    AND (geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT newfm FROM g),
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s)
  RETURNING 1
)
INSERT INTO waterway.fairway_marks_%[1]s (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  %[2]s
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
    %[3]s
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`

	// Assume validity ended for all entries not found in data source
	invalidateFairwayMarksSQLtmpl = `
WITH
  a AS (
    SELECT users.current_user_area_utm() AS a
  )
UPDATE waterway.fairway_marks_%s
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Intersects((select a from a),
        ST_Transform(CAST(geom AS geometry), (select ST_SRID(a) from a))))
 `

	insertBcnlatDirimpSQL = `
INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
  VALUES ($1, $2)
`

	insertDaymarDirimpSQL = `
INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
  VALUES ($1, $2)
`

	insertNotmrkDirimpSQL = `
INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
  VALUES ($1, $2)
`
)

func createInvalidation(fmType string) func(*SQLPointConsumer) error {

	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)

	return func(spc *SQLPointConsumer) error {
		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
		if err != nil {
			return err
		}
		old, err := res.RowsAffected()
		if err != nil {
			return err
		}
		if old == 0 {
			return ErrFeaturesUnmodified
		}
		spc.feedback.Info("Number of features removed from data source: %d", old)
		return nil
	}
}

// Create INSERT statement for specific fairway marks type
func createInsertFMSQL(fmType string, attributes ...string) string {
	attNums := "$16"
	for i := 1; i < len(attributes); i++ {
		attNums += fmt.Sprintf(",$%d", 16+i)
	}

	return fmt.Sprintf(
		insertFMSQLtmpl,
		fmType,
		strings.Join(attributes, ","),
		attNums,
	)
}

func storeAttribs(spc *SQLPointConsumer, id int64, attrs *string) {
	if attrs == nil || *attrs == "" {
		return
	}
	dirimps := strings.Split(*attrs, ",")
	for _, dirimp := range dirimps {
		if err := spc.savepoint(func() error {
			_, err := spc.stmts[1].ExecContext(
				spc.ctx, id, dirimp)
			return err
		}); err != nil {
			spc.feedback.Warn(
				pgxutils.ReadableError{Err: err}.Error())
			spc.feedback.Info(
				"Tried to import '%s' as dirimp value",
				dirimp)
		}
	}
}

func consume(
	spc *SQLPointConsumer,
	points pointSlice,
	properties interface{},
	epsg int,
) error {
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			append(
				[]interface{}{
					points.asWKB(),
					epsg,
				},
				structs.Values(properties)...)...,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		if attr, ok := properties.(interface{ attrib() *string }); ok {
			storeAttribs(spc, fmid, attr.attrib())
		}
	}
	return nil
}