view pkg/imports/fm.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents 1222b777f51f
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"database/sql"
	"fmt"
	"strings"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"github.com/fatih/structs"
)

type (
	// FairwayMarksProperties are common to all types of fairway marks
	FairwayMarksProperties struct {
		Datsta *string `json:"hydro_datsta"`
		Datend *string `json:"hydro_datend"`
		Persta *string `json:"hydro_persta"`
		Perend *string `json:"hydro_perend"`
		Objnam *string `json:"hydro_objnam"`
		Nobjnm *string `json:"hydro_nobjnm"`
		Inform *string `json:"hydro_inform"`
		Ninfom *string `json:"hydro_ninfom"`
		Scamin *int    `json:"hydro_scamin"`
		Picrep *string `json:"hydro_picrep"`
		Txtdsc *string `json:"hydro_txtdsc"`
		Sordat *string `json:"hydro_sordat"`
		Sorind *string `json:"hydro_sorind"`
	}

	// BcnlatProperties are embedded in other properties.
	BcnlatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Bcnshp *int    `json:"hydro_bcnshp"`
	}

	bcnlatHydroProperties struct {
		BcnlatProperties
		Catlam *int64 `json:"hydro_catlam"`
	}

	bcnlatIencProperties struct {
		BcnlatProperties
		Catlam *int64  `json:"ienc_catlam"`
		Dirimp *string `json:"ienc_dirimp" structs:"-"`
	}

	// BoylatProperties are embedded in other properties.
	BoylatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boylatHydroProperties struct {
		BoylatProperties
		Marsys *int64 `json:"hydro_marsys"`
		Catlam *int64 `json:"hydro_catlam"`
	}

	boylatIencProperties struct {
		BoylatProperties
		Marsys *int64 `json:"ienc_marsys"`
		Catlam *int64 `json:"ienc_catlam"`
	}

	boycarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int    `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catcam *int    `json:"hydro_catcam"`
	}

	boysawProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boysppProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catspm *string `json:"hydro_catspm"`
	}

	// DaymarProperties are embedded in other properties.
	DaymarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}

	daymarHydroProperties struct {
		DaymarProperties
	}

	daymarIencProperties struct {
		DaymarProperties
		Dirimp *string  `json:"ienc_dirimp" structs:"-"`
		Orient *float64 `json:"hydro_orient"`
	}

	lightsProperties struct {
		FairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Condtn *int     `json:"hydro_condtn"`
		Orient *float64 `json:"hydro_orient"`
		Catlit *string  `json:"hydro_catlit"`
		Exclit *int     `json:"hydro_exclit"`
		Litchr *int     `json:"hydro_litchr"`
		Litvis *string  `json:"hydro_litvis"`
		Mltylt *int     `json:"hydro_mltylt"`
		Sectr1 *float64 `json:"hydro_sectr1"`
		Sectr2 *float64 `json:"hydro_sectr2"`
		Siggrp *string  `json:"hydro_siggrp"`
		Sigper *float64 `json:"hydro_sigper"`
		Sigseq *string  `json:"hydro_sigseq"`
		Status *string  `json:"hydro_status"`
	}

	notmrkProperties struct {
		FairwayMarksProperties
		Condtn *int     `json:"hydro_condtn"`
		Marsys *int     `json:"hydro_bcnshp"`
		Dirimp *string  `json:"ienc_dirimp" structs:"-"`
		Orient *float64 `json:"hydro_orient"`
		Status *string  `json:"hydro_status"`
		Addmrk *string  `json:"ienc_addmrk"`
		Catnmk *int     `json:"ienc_catnmk"`
		Disipd *float64 `json:"ienc_disipd"`
		Disipu *float64 `json:"ienc_disipu"`
		Disbk1 *float64 `json:"ienc_disbk1"`
		Disbk2 *float64 `json:"ienc_disbk2"`
		Fnctnm *int     `json:"ienc_fnctnm"`
		Bnkwtw *int     `json:"ienc_bnkwtw"`
	}

	rtpbcnProperties struct {
		FairwayMarksProperties
		Condtn *int    `json:"hydro_condtn"`
		Siggrp *string `json:"hydro_siggrp"`
		Catrtb *int    `json:"hydro_catrtb"`
		Radwal *string `json:"hydro_radwal"`
	}

	topmarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}
)

func (props *bcnlatIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *daymarIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *notmrkProperties) attrib() *string {
	return props.Dirimp
}

const (
	// BCNLATHYDROJobKind is the unique name for the respective import.
	BCNLATHYDROJobKind JobKind = "fm_bcnlat_hydro"
	// BCNLATIENCJobKind is the unique name for the respective import.
	BCNLATIENCJobKind JobKind = "fm_bcnlat_ienc"
	// BOYLATHYDROJobKind is the unique name for the respective import.
	BOYLATHYDROJobKind JobKind = "fm_boylat_hydro"
	// BOYLATIENCJobKind is the unique name for the respective import.
	BOYLATIENCJobKind JobKind = "fm_boylat_ienc"
	// BOYCARJobKind is the unique name for the respective import.
	BOYCARJobKind JobKind = "fm_boycar"
	// BOYSAWJobKind is the unique name for the respective import.
	BOYSAWJobKind JobKind = "fm_boysaw"
	// BOYSPPJobKind is the unique name for the respective import.
	BOYSPPJobKind JobKind = "fm_boyspp"
	// DAYMARHYDROJobKind is the unique name for the respective import.
	DAYMARHYDROJobKind JobKind = "fm_daymar_hydro"
	// DAYMARIENCJobKind is the unique name for the respective import.
	DAYMARIENCJobKind JobKind = "fm_daymar_ienc"
	// LIGHTSJobKind is the unique name for the respective import.
	LIGHTSJobKind JobKind = "fm_lights"
	// NOTMRKJobKind is the unique name for the respective import.
	NOTMRKJobKind JobKind = "fm_notmrk"
	// RTPBCNJobKind is the unique name for the respective import.
	RTPBCNJobKind JobKind = "fm_rtpbcn"
	// TOPMARJobKind is the unique name for the respective import.
	TOPMARJobKind JobKind = "fm_topmar"
)

func init() {
	RegisterJobCreator(BCNLATHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks bcnlat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_hydro",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
				),
				consume,
				createInvalidation("bcnlat_hydro"),
				newPointFeature(func() interface{} { return new(bcnlatHydroProperties) }),
			),
		})

	RegisterJobCreator(BCNLATIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks bcnlat (IENC)",
			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_ienc",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
					insertBcnlatDirimpSQL,
				),
				consume,
				createInvalidation("bcnlat_ienc"),
				newPointFeature(func() interface{} { return new(bcnlatIencProperties) }),
			),
		})

	RegisterJobCreator(BOYLATHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boylat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_boylat_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_hydro",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_hydro"),
				newPointFeature(func() interface{} { return new(boylatHydroProperties) }),
			),
		})

	RegisterJobCreator(BOYLATIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boylat (IENC)",
			depends:     [2][]string{{"fairway_marks_boylat_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_ienc",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_ienc"),
				newPointFeature(func() interface{} { return new(boylatIencProperties) }),
			),
		})

	RegisterJobCreator(BOYCARJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boycar",
			depends:     [2][]string{{"fairway_marks_boycar"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boycar",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catcam"),
				),
				consume,
				createInvalidation("boycar"),
				newPointFeature(func() interface{} { return new(boycarProperties) }),
			),
		})

	RegisterJobCreator(BOYSAWJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boysaw",
			depends:     [2][]string{{"fairway_marks_boysaw"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boysaw",
						"colour", "colpat", "conrad", "marsys", "boyshp"),
				),
				consume,
				createInvalidation("boysaw"),
				newPointFeature(func() interface{} { return new(boysawProperties) }),
			),
		})

	RegisterJobCreator(BOYSPPJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boyspp",
			depends:     [2][]string{{"fairway_marks_boyspp"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boyspp",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catspm"),
				),
				consume,
				createInvalidation("boyspp"),
				newPointFeature(func() interface{} { return new(boysppProperties) }),
			),
		})

	RegisterJobCreator(DAYMARHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks daymar (HYDRO)",
			depends:     [2][]string{{"fairway_marks_daymar_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_hydro",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("daymar_hydro"),
				newPointFeature(func() interface{} { return new(daymarHydroProperties) }),
			),
		})

	RegisterJobCreator(DAYMARIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks daymar (IENC)",
			depends:     [2][]string{{"fairway_marks_daymar_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_ienc",
						"colour", "colpat", "condtn", "topshp", "orient"),
					insertDaymarDirimpSQL,
				),
				consume,
				createInvalidation("daymar_ienc"),
				newPointFeature(func() interface{} { return new(daymarIencProperties) }),
			),
		})

	RegisterJobCreator(LIGHTSJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks lights",
			depends:     [2][]string{{"fairway_marks_lights"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("lights",
						"colour", "condtn", "orient",
						"catlit", "exclit", "litchr",
						"litvis", "mltylt", "sectr1",
						"sectr2", "siggrp", "sigper",
						"sigseq", "status"),
				),
				consume,
				createInvalidation("lights"),
				newPointFeature(func() interface{} { return new(lightsProperties) }),
			),
		})

	RegisterJobCreator(NOTMRKJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks notmrk",
			depends:     [2][]string{{"fairway_marks_lights"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("notmrk",
						"condtn", "marsys", "orient",
						"status", "addmrk", "catnmk",
						"disipd", "disipu", "disbk1",
						"disbk2", "fnctnm", "bnkwtw"),
					insertNotmrkDirimpSQL,
				),
				consume,
				createInvalidation("notmrk"),
				newPointFeature(func() interface{} { return new(notmrkProperties) }),
			),
		})

	RegisterJobCreator(RTPBCNJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks rtpbcn",
			depends:     [2][]string{{"fairway_marks_rtpbcn"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("rtpbcn",
						"condtn", "siggrp", "catrtb", "radwal"),
				),
				consume,
				createInvalidation("rtpbcn"),
				newPointFeature(func() interface{} { return new(rtpbcnProperties) }),
			),
		})

	RegisterJobCreator(TOPMARJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks topmar",
			depends:     [2][]string{{"fairway_marks_topmar"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("topmar",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("topmar"),
				newPointFeature(func() interface{} { return new(topmarProperties) }),
			),
		})
}

const (
	// Format string to be completed with type and additional attributes
	/* Instead of the row comparisons in the WHERE clauses
	of the CTE with the UPDATE and the INSERT ... SELECT, we could have
	used the row-based UNIQUE indexes as arbiter indexes
	in an INSERT ... ON CONFLICT ... DO UPDATE, but that turned out
	to be able to bypass the UNIQUE index in some cases.
	*/
	insertFMSQLtmpl = `
WITH a AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT newfm
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
),
t AS (
  -- Currently valid and otherwise identical entry's validity.
  /* If there are no intermittent updates of validity,
     there will always be only one currently valid and
     otherwise identical entry. */
  UPDATE waterway.fairway_marks_%[1]s SET last_found = current_timestamp
  WHERE validity @> current_timestamp
    AND (geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT newfm FROM g),
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s)
  RETURNING 1
)
INSERT INTO waterway.fairway_marks_%[1]s (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  %[2]s
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
    %[3]s
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`

	// Assume validity ended for all entries not found in data source
	invalidateFairwayMarksSQLtmpl = `
WITH
  a AS (
    SELECT users.current_user_area_utm() AS a
  )
UPDATE waterway.fairway_marks_%s
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Intersects((select a from a),
        ST_Transform(CAST(geom AS geometry), (select ST_SRID(a) from a))))
 `

	insertBcnlatDirimpSQL = `
INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
  VALUES ($1, $2)
`

	insertDaymarDirimpSQL = `
INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
  VALUES ($1, $2)
`

	insertNotmrkDirimpSQL = `
INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
  VALUES ($1, $2)
`
)

func createInvalidation(fmType string) func(*SQLGeometryConsumer) error {

	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)

	return func(spc *SQLGeometryConsumer) error {
		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
		if err != nil {
			return err
		}
		old, err := res.RowsAffected()
		if err != nil {
			return err
		}
		if old == 0 {
			return ErrFeaturesUnmodified
		}
		spc.feedback.Info("Number of features removed from data source: %d", old)
		return nil
	}
}

// Create INSERT statement for specific fairway marks type
func createInsertFMSQL(fmType string, attributes ...string) string {
	attNums := "$16"
	for i := 1; i < len(attributes); i++ {
		attNums += fmt.Sprintf(",$%d", 16+i)
	}

	return fmt.Sprintf(
		insertFMSQLtmpl,
		fmType,
		strings.Join(attributes, ","),
		attNums,
	)
}

func storeAttribs(spc *SQLGeometryConsumer, id int64, attrs *string) {
	if attrs == nil || *attrs == "" {
		return
	}
	dirimps := strings.Split(*attrs, ",")
	for _, dirimp := range dirimps {
		if err := spc.savepoint(func() error {
			_, err := spc.stmts[1].ExecContext(
				spc.ctx, id, dirimp)
			return err
		}); err != nil {
			spc.feedback.Warn(
				pgxutils.ReadableError{Err: err}.Error())
			spc.feedback.Info(
				"Tried to import '%s' as dirimp value",
				dirimp)
		}
	}
}

func consume(
	spc *SQLGeometryConsumer,
	geom, properties interface{},
	epsg int,
) error {
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			append(
				[]interface{}{
					geom.(interface{ asWKB() []byte }).asWKB(),
					epsg,
				},
				structs.Values(properties)...)...,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		if attr, ok := properties.(interface{ attrib() *string }); ok {
			storeAttribs(spc, fmid, attr.attrib())
		}
	}
	return nil
}