view pkg/imports/fm.go @ 5718:3d497077f888 uploadwg

Implemented direct file upload as alternative import method for WG. For testing and data corrections it is useful to be able to import waterway gauges data directly by uploading a xml file.
author Sascha Wilde <wilde@sha-bang.de>
date Thu, 18 Apr 2024 19:23:19 +0200
parents 6270951dda28
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"database/sql"
	"fmt"
	"strings"

	"gemma.intevation.de/gemma/pkg/pgxutils"
	"github.com/fatih/structs"
)

type (
	// FairwayMarksProperties are common to all types of fairway marks
	FairwayMarksProperties struct {
		Datsta *string `json:"hydro_datsta"`
		Datend *string `json:"hydro_datend"`
		Persta *string `json:"hydro_persta"`
		Perend *string `json:"hydro_perend"`
		Objnam *string `json:"hydro_objnam"`
		Nobjnm *string `json:"hydro_nobjnm"`
		Inform *string `json:"hydro_inform"`
		Ninfom *string `json:"hydro_ninfom"`
		Scamin *int    `json:"hydro_scamin"`
		Picrep *string `json:"hydro_picrep"`
		Txtdsc *string `json:"hydro_txtdsc"`
		Sordat *string `json:"hydro_sordat"`
		Sorind *string `json:"hydro_sorind"`
	}

	// BcnlatProperties are embedded in other properties.
	BcnlatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Bcnshp *int    `json:"hydro_bcnshp"`
	}

	bcnlatHydroProperties struct {
		BcnlatProperties
		Catlam *int64 `json:"hydro_catlam"`
	}

	bcnlatIencProperties struct {
		BcnlatProperties
		Catlam *int64  `json:"ienc_catlam"`
		Dirimp *string `json:"ienc_dirimp" structs:"-"`
	}

	// BoylatProperties are embedded in other properties.
	BoylatProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boylatHydroProperties struct {
		BoylatProperties
		Marsys *int64 `json:"hydro_marsys"`
		Catlam *int64 `json:"hydro_catlam"`
	}

	boylatIencProperties struct {
		BoylatProperties
		Marsys *int64 `json:"ienc_marsys"`
		Catlam *int64 `json:"ienc_catlam"`
	}

	boycarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int    `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catcam *int    `json:"hydro_catcam"`
	}

	boysawProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
	}

	boysppProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Conrad *int    `json:"hydro_conrad"`
		Marsys *int64  `json:"hydro_marsys"`
		Boyshp *int    `json:"hydro_boyshp"`
		Catspm *string `json:"hydro_catspm"`
	}

	// DaymarProperties are embedded in other properties.
	DaymarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}

	daymarHydroProperties struct {
		DaymarProperties
	}

	daymarIencProperties struct {
		DaymarProperties
		Dirimp *string  `json:"ienc_dirimp" structs:"-"`
		Orient *float64 `json:"hydro_orient"`
	}

	lightsProperties struct {
		FairwayMarksProperties
		Colour *string  `json:"hydro_colour"`
		Condtn *int     `json:"hydro_condtn"`
		Orient *float64 `json:"hydro_orient"`
		Catlit *string  `json:"hydro_catlit"`
		Exclit *int     `json:"hydro_exclit"`
		Litchr *int     `json:"hydro_litchr"`
		Litvis *string  `json:"hydro_litvis"`
		Mltylt *int     `json:"hydro_mltylt"`
		Sectr1 *float64 `json:"hydro_sectr1"`
		Sectr2 *float64 `json:"hydro_sectr2"`
		Siggrp *string  `json:"hydro_siggrp"`
		Sigper *float64 `json:"hydro_sigper"`
		Sigseq *string  `json:"hydro_sigseq"`
		Status *string  `json:"hydro_status"`
	}

	notmrkProperties struct {
		FairwayMarksProperties
		Condtn *int     `json:"hydro_condtn"`
		Marsys *int     `json:"hydro_bcnshp"`
		Dirimp *string  `json:"ienc_dirimp" structs:"-"`
		Orient *float64 `json:"hydro_orient"`
		Status *string  `json:"hydro_status"`
		Addmrk *string  `json:"ienc_addmrk"`
		Catnmk *int     `json:"ienc_catnmk"`
		Disipd *float64 `json:"ienc_disipd"`
		Disipu *float64 `json:"ienc_disipu"`
		Disbk1 *float64 `json:"ienc_disbk1"`
		Disbk2 *float64 `json:"ienc_disbk2"`
		Fnctnm *int     `json:"ienc_fnctnm"`
		Bnkwtw *int     `json:"ienc_bnkwtw"`
	}

	rtpbcnProperties struct {
		FairwayMarksProperties
		Condtn *int    `json:"hydro_condtn"`
		Siggrp *string `json:"hydro_siggrp"`
		Catrtb *int    `json:"hydro_catrtb"`
		Radwal *string `json:"hydro_radwal"`
	}

	topmarProperties struct {
		FairwayMarksProperties
		Colour *string `json:"hydro_colour"`
		Colpat *string `json:"hydro_colpat"`
		Condtn *int    `json:"hydro_condtn"`
		Topshp *int    `json:"hydro_topshp"`
	}
)

func (props *bcnlatIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *daymarIencProperties) attrib() *string {
	return props.Dirimp
}

func (props *notmrkProperties) attrib() *string {
	return props.Dirimp
}

const (
	// BCNLATHYDROJobKind is the unique name for the respective import.
	BCNLATHYDROJobKind JobKind = "fm_bcnlat_hydro"
	// BCNLATIENCJobKind is the unique name for the respective import.
	BCNLATIENCJobKind JobKind = "fm_bcnlat_ienc"
	// BOYLATHYDROJobKind is the unique name for the respective import.
	BOYLATHYDROJobKind JobKind = "fm_boylat_hydro"
	// BOYLATIENCJobKind is the unique name for the respective import.
	BOYLATIENCJobKind JobKind = "fm_boylat_ienc"
	// BOYCARJobKind is the unique name for the respective import.
	BOYCARJobKind JobKind = "fm_boycar"
	// BOYSAWJobKind is the unique name for the respective import.
	BOYSAWJobKind JobKind = "fm_boysaw"
	// BOYSPPJobKind is the unique name for the respective import.
	BOYSPPJobKind JobKind = "fm_boyspp"
	// DAYMARHYDROJobKind is the unique name for the respective import.
	DAYMARHYDROJobKind JobKind = "fm_daymar_hydro"
	// DAYMARIENCJobKind is the unique name for the respective import.
	DAYMARIENCJobKind JobKind = "fm_daymar_ienc"
	// LIGHTSJobKind is the unique name for the respective import.
	LIGHTSJobKind JobKind = "fm_lights"
	// NOTMRKJobKind is the unique name for the respective import.
	NOTMRKJobKind JobKind = "fm_notmrk"
	// RTPBCNJobKind is the unique name for the respective import.
	RTPBCNJobKind JobKind = "fm_rtpbcn"
	// TOPMARJobKind is the unique name for the respective import.
	TOPMARJobKind JobKind = "fm_topmar"
)

func init() {
	RegisterJobCreator(BCNLATHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks bcnlat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_hydro",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
				),
				consume,
				createInvalidation("bcnlat_hydro"),
				newPointFeature(func() any { return new(bcnlatHydroProperties) }),
			),
		})

	RegisterJobCreator(BCNLATIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks bcnlat (IENC)",
			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("bcnlat_ienc",
						"colour", "colpat", "condtn", "bcnshp", "catlam"),
					insertBcnlatDirimpSQL,
				),
				consume,
				createInvalidation("bcnlat_ienc"),
				newPointFeature(func() any { return new(bcnlatIencProperties) }),
			),
		})

	RegisterJobCreator(BOYLATHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boylat (HYDRO)",
			depends:     [2][]string{{"fairway_marks_boylat_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_hydro",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_hydro"),
				newPointFeature(func() any { return new(boylatHydroProperties) }),
			),
		})

	RegisterJobCreator(BOYLATIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boylat (IENC)",
			depends:     [2][]string{{"fairway_marks_boylat_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boylat_ienc",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catlam"),
				),
				consume,
				createInvalidation("boylat_ienc"),
				newPointFeature(func() any { return new(boylatIencProperties) }),
			),
		})

	RegisterJobCreator(BOYCARJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boycar",
			depends:     [2][]string{{"fairway_marks_boycar"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boycar",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catcam"),
				),
				consume,
				createInvalidation("boycar"),
				newPointFeature(func() any { return new(boycarProperties) }),
			),
		})

	RegisterJobCreator(BOYSAWJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boysaw",
			depends:     [2][]string{{"fairway_marks_boysaw"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boysaw",
						"colour", "colpat", "conrad", "marsys", "boyshp"),
				),
				consume,
				createInvalidation("boysaw"),
				newPointFeature(func() any { return new(boysawProperties) }),
			),
		})

	RegisterJobCreator(BOYSPPJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks boyspp",
			depends:     [2][]string{{"fairway_marks_boyspp"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("boyspp",
						"colour", "colpat", "conrad",
						"marsys", "boyshp", "catspm"),
				),
				consume,
				createInvalidation("boyspp"),
				newPointFeature(func() any { return new(boysppProperties) }),
			),
		})

	RegisterJobCreator(DAYMARHYDROJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks daymar (HYDRO)",
			depends:     [2][]string{{"fairway_marks_daymar_hydro"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_hydro",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("daymar_hydro"),
				newPointFeature(func() any { return new(daymarHydroProperties) }),
			),
		})

	RegisterJobCreator(DAYMARIENCJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks daymar (IENC)",
			depends:     [2][]string{{"fairway_marks_daymar_ienc"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("daymar_ienc",
						"colour", "colpat", "condtn", "topshp", "orient"),
					insertDaymarDirimpSQL,
				),
				consume,
				createInvalidation("daymar_ienc"),
				newPointFeature(func() any { return new(daymarIencProperties) }),
			),
		})

	RegisterJobCreator(LIGHTSJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks lights",
			depends:     [2][]string{{"fairway_marks_lights"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("lights",
						"colour", "condtn", "orient",
						"catlit", "exclit", "litchr",
						"litvis", "mltylt", "sectr1",
						"sectr2", "siggrp", "sigper",
						"sigseq", "status"),
				),
				consume,
				createInvalidation("lights"),
				newPointFeature(func() any { return new(lightsProperties) }),
			),
		})

	RegisterJobCreator(NOTMRKJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks notmrk",
			depends:     [2][]string{{"fairway_marks_lights"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("notmrk",
						"condtn", "marsys", "orient",
						"status", "addmrk", "catnmk",
						"disipd", "disipu", "disbk1",
						"disbk2", "fnctnm", "bnkwtw"),
					insertNotmrkDirimpSQL,
				),
				consume,
				createInvalidation("notmrk"),
				newPointFeature(func() any { return new(notmrkProperties) }),
			),
		})

	RegisterJobCreator(RTPBCNJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks rtpbcn",
			depends:     [2][]string{{"fairway_marks_rtpbcn"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("rtpbcn",
						"condtn", "siggrp", "catrtb", "radwal"),
				),
				consume,
				createInvalidation("rtpbcn"),
				newPointFeature(func() any { return new(rtpbcnProperties) }),
			),
		})

	RegisterJobCreator(TOPMARJobKind,
		&WFSFeatureJobCreator{
			description: "fairway marks topmar",
			depends:     [2][]string{{"fairway_marks_topmar"}},
			newConsumer: newSQLConsumer(
				prepareStmnts(
					createInsertFMSQL("topmar",
						"colour", "colpat", "condtn", "topshp"),
				),
				consume,
				createInvalidation("topmar"),
				newPointFeature(func() any { return new(topmarProperties) }),
			),
		})
}

const (
	// Format string to be completed with type and additional attributes
	/* Instead of the row comparisons in the WHERE clauses
	of the CTE with the UPDATE and the INSERT ... SELECT, we could have
	used the row-based UNIQUE indexes as arbiter indexes
	in an INSERT ... ON CONFLICT ... DO UPDATE, but that turned out
	to be able to bypass the UNIQUE index in some cases.
	*/
	insertFMSQLtmpl = `
WITH a AS (
  SELECT users.current_user_area_utm() AS a
),
g AS (
  SELECT newfm
  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
  WHERE pg_has_role('sys_admin', 'MEMBER')
    OR ST_Intersects((select a from a),
      ST_Transform(newfm, (select ST_SRID(a) from a)))
),
t AS (
  -- Currently valid and otherwise identical entry's validity.
  /* If there are no intermittent updates of validity,
     there will always be only one currently valid and
     otherwise identical entry. */
  UPDATE waterway.fairway_marks_%[1]s SET last_found = current_timestamp
  WHERE validity @> current_timestamp
    AND (geom,
        datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
        scamin, picrep, txtdsc, sordat, sorind,
        %[2]s
      ) IS NOT DISTINCT FROM (
        (SELECT newfm FROM g),
        $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
        %[3]s)
  RETURNING 1
)
INSERT INTO waterway.fairway_marks_%[1]s (
  geom,
  datsta,
  datend,
  persta,
  perend,
  objnam,
  nobjnm,
  inform,
  ninfom,
  scamin,
  picrep,
  txtdsc,
  sordat,
  sorind,
  %[2]s
)
SELECT newfm, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
    %[3]s
  FROM g
  WHERE NOT EXISTS(SELECT 1 FROM t)
RETURNING id
`

	// Assume validity ended for all entries not found in data source
	invalidateFairwayMarksSQLtmpl = `
WITH
  a AS (
    SELECT users.current_user_area_utm() AS a
  )
UPDATE waterway.fairway_marks_%s
  SET validity = tstzrange(lower(validity), current_timestamp)
  WHERE validity @> current_timestamp
    AND last_found < current_timestamp
    AND (pg_has_role('sys_admin', 'MEMBER')
      OR ST_Intersects((select a from a),
        ST_Transform(CAST(geom AS geometry), (select ST_SRID(a) from a))))
 `

	insertBcnlatDirimpSQL = `
INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp)
  VALUES ($1, $2)
`

	insertDaymarDirimpSQL = `
INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp)
  VALUES ($1, $2)
`

	insertNotmrkDirimpSQL = `
INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp)
  VALUES ($1, $2)
`
)

func createInvalidation(fmType string) func(*SQLGeometryConsumer) error {

	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)

	return func(spc *SQLGeometryConsumer) error {
		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
		if err != nil {
			return err
		}
		old, err := res.RowsAffected()
		if err != nil {
			return err
		}
		if old == 0 {
			return ErrFeaturesUnmodified
		}
		spc.feedback.Info("Number of features removed from data source: %d", old)
		return nil
	}
}

// Create INSERT statement for specific fairway marks type
func createInsertFMSQL(fmType string, attributes ...string) string {
	attNums := "$16"
	for i := 1; i < len(attributes); i++ {
		attNums += fmt.Sprintf(",$%d", 16+i)
	}

	return fmt.Sprintf(
		insertFMSQLtmpl,
		fmType,
		strings.Join(attributes, ","),
		attNums,
	)
}

func storeAttribs(spc *SQLGeometryConsumer, id int64, attrs *string) {
	if attrs == nil || *attrs == "" {
		return
	}
	dirimps := strings.Split(*attrs, ",")
	for _, dirimp := range dirimps {
		if err := spc.savepoint(func() error {
			_, err := spc.stmts[1].ExecContext(
				spc.ctx, id, dirimp)
			return err
		}); err != nil {
			spc.feedback.Warn(
				pgxutils.ReadableError{Err: err}.Error())
			spc.feedback.Info(
				"Tried to import '%s' as dirimp value",
				dirimp)
		}
	}
}

func consume(
	spc *SQLGeometryConsumer,
	geom, properties any,
	epsg int,
) error {
	var fmid int64
	err := spc.savepoint(func() error {
		return spc.stmts[0].QueryRowContext(
			spc.ctx,
			append(
				[]any{
					geom.(interface{ asWKB() []byte }).asWKB(),
					epsg,
				},
				structs.Values(properties)...)...,
		).Scan(&fmid)
	})
	switch {
	case err == sql.ErrNoRows:
		return ErrFeatureDuplicated
		// ignore -> filtered by responsibility area or a duplicate
	case err != nil:
		spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return ErrFeatureIgnored
	default:
		if attr, ok := properties.(interface{ attrib() *string }); ok {
			storeAttribs(spc, fmid, attr.attrib())
		}
	}
	return nil
}