# HG changeset patch # User Thomas Junk # Date 1580897642 -3600 # Node ID b2d88c6806911f814d9b40862585741e74cdf26d # Parent 79a5d0c0d2f50bf517784c4599a5912fa9b915bc# Parent af38a19f615aaa60cc9ed834c3a86770dd1c6fdd merge with default diff -r af38a19f615a -r b2d88c680691 pkg/controllers/routes.go --- a/pkg/controllers/routes.go Tue Feb 04 16:29:49 2020 +0100 +++ b/pkg/controllers/routes.go Wed Feb 05 11:14:02 2020 +0100 @@ -4,7 +4,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // -// Copyright (C) 2018 by via donau +// Copyright (C) 2018, 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // @@ -241,7 +241,7 @@ kinds := strings.Join([]string{ "bn", "gm", "fa", "wx", "wa", - "wg", "dmv", "fd", "dma", + "wg", "dmv", "fd", "dma", "fm", "sec", "dsec", "dst", "dsr", }, "|") diff -r af38a19f615a -r b2d88c680691 pkg/imports/fm.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/fm.go Wed Feb 05 11:14:02 2020 +0100 @@ -0,0 +1,342 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2020 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Tom Gottfried + +package imports + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "strconv" + "time" + + "gemma.intevation.de/gemma/pkg/pgxutils" + "gemma.intevation.de/gemma/pkg/wfs" +) + +// FairwayMarks is an import job to import +// fairway marks in form of point geometries +// and attribute data from a WFS service. +type FairwayMarks struct { + // URL the GetCapabilities URL of the WFS service. + URL string `json:"url"` + // FeatureType selects the feature type of the WFS service. + FeatureType string `json:"feature-type"` + // SortBy works around misconfigured services to + // establish a sort order to get the features. + SortBy string `json:"sort-by"` + // User is an optional username for Basic Auth. + User string `json:"user,omitempty"` + // Password is an optional password for Basic Auth. + Password string `json:"password,omitempty"` +} + +// Description gives a short info about relevant facts of this import. +func (fm *FairwayMarks) Description() (string, error) { + return fm.URL + "|" + fm.FeatureType, nil +} + +// FMJobKind is the import queue type identifier. +const FMJobKind JobKind = "fm" + +type fmJobCreator struct{} + +func init() { + RegisterJobCreator(FMJobKind, fmJobCreator{}) +} + +func (fmJobCreator) Description() string { return "fairway marks" } + +func (fmJobCreator) AutoAccept() bool { return true } + +func (fmJobCreator) Create() Job { return new(FairwayMarks) } + +func (fmJobCreator) Depends() [2][]string { + return [2][]string{ + {"fairway_marks"}, + {}, + } +} + +// StageDone is a NOP for fairway marks imports. +func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +// CleanUp for fairway marks imports is a NOP. +func (*FairwayMarks) CleanUp() error { return nil } + +type fairwayMarksProperties struct { + Datsta *string `json:"hydro_datsta"` + Datend *string `json:"hydro_datend"` + Persta *string `json:"hydro_persta"` + Perend *string `json:"hydro_perend"` + Objnam *string `json:"hydro_objnam"` + Nobjnm *string `json:"hydro_nobjnm"` + Inform *string `json:"hydro_inform"` + Ninfom *string `json:"hydro_ninfom"` + Scamin *int `json:"hydro_scamin"` + Picrep *string `json:"hydro_picrep"` + Txtdsc *string `json:"hydro_txtdsc"` + Sordat *string `json:"hydro_sordat"` + Sorind *string `json:"hydro_sorind"` + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Condtn *int `json:"hydro_condtn"` + Bcnshp *int `json:"hydro_bcnshp"` + HydroCatlam *int64 `json:"hydro_catlam,omitempty"` + IENCCatlam *int64 `json:"ienc_catlam,omitempty"` + Dirimp *string `json:"ienc_dirimp,omitempty"` +} + +const ( + insertFairwayMarksSQL = ` +with a as ( + select users.current_user_area_utm() AS a +) +INSERT INTO waterway.fairway_marks ( + geom, + datsta, + datend, + persta, + perend, + objnam, + nobjnm, + inform, + ninfom, + scamin, + picrep, + txtdsc, + sordat, + sorind, + colour, + colpat, + condtn, + bcnshp, + catlam, + dirimp +) +SELECT newfm, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21 + FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm) + WHERE pg_has_role('sys_admin', 'MEMBER') + OR ST_Intersects((select a from a), + ST_Transform(newfm, (select ST_SRID(a) from a))) +ON CONFLICT ( + CAST((0, geom, + datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom, + scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn, + bcnshp, catlam, dirimp) AS waterway.fairway_marks) + ) + DO NOTHING +RETURNING id +` +) + +// Do executes the actual fairway marks import. +func (fm *FairwayMarks) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + + feedback.Info("Import fairway marks") + + feedback.Info("Loading capabilities from %s", fm.URL) + caps, err := wfs.GetCapabilities(fm.URL) + if err != nil { + feedback.Error("Loading capabilities failed: %v", err) + return nil, err + } + + ft := caps.FindFeatureType(fm.FeatureType) + if ft == nil { + return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType) + } + + feedback.Info("Found feature type '%s", fm.FeatureType) + + epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + if err != nil { + feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) + return nil, err + } + + if fm.SortBy != "" { + feedback.Info("Features will be sorted by '%s'", fm.SortBy) + } + + dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy) + if err != nil { + feedback.Error("Cannot create GetFeature URLs. %v", err) + return nil, err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + var ( + unsupported = stringCounter{} + missingProperties int + badProperties int + outsideOrDup int + features int + ) + + if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error { + feedback.Info("Get features from: '%s'", url) + rfc, err := wfs.ParseRawFeatureCollection(r) + if err != nil { + return fmt.Errorf("parsing GetFeature document failed: %v", err) + } + if rfc.CRS != nil { + crsName := rfc.CRS.Properties.Name + if epsg, err = wfs.CRSToEPSG(crsName); err != nil { + feedback.Error("Unsupported CRS: %d", crsName) + return err + } + } + + // No features -> ignore. + if rfc.Features == nil { + return nil + } + + feedback.Info("Using EPSG: %d", epsg) + + savepoint := Savepoint(ctx, tx, "feature") + + for _, feature := range rfc.Features { + if feature.Properties == nil || feature.Geometry.Coordinates == nil { + missingProperties++ + continue + } + + var props fairwayMarksProperties + + if err := json.Unmarshal(*feature.Properties, &props); err != nil { + badProperties++ + continue + } + + var catlam sql.NullInt64 + if props.HydroCatlam != nil { + catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true} + } else if props.IENCCatlam != nil { + catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true} + } + + var dirimp sql.NullInt64 + if props.Dirimp != nil { + if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil { + dirimp = sql.NullInt64{Int64: value, Valid: true} + } + } + + switch feature.Geometry.Type { + case "Point": + var p pointSlice + if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { + return err + } + var fmid int64 + err := savepoint(func() error { + err := insertStmt.QueryRowContext( + ctx, + p.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Condtn, + props.Bcnshp, + catlam, + dirimp, + ).Scan(&fmid) + return err + }) + switch { + case err == sql.ErrNoRows: + outsideOrDup++ + // ignore -> filtered by responsibility_areas + case err != nil: + feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + default: + features++ + } + default: + unsupported[feature.Geometry.Type]++ + } + } + return nil + }); err != nil { + return nil, err + } + + if badProperties > 0 { + feedback.Warn("Bad properties: %d", badProperties) + } + + if missingProperties > 0 { + feedback.Warn("Missing properties: %d", missingProperties) + } + + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) + } + + if outsideOrDup > 0 { + feedback.Info( + "Features outside responsibility area and duplicates: %d", + outsideOrDup) + } + + if features == 0 { + err := errors.New("no valid new features found") + return nil, err + } + + if err = tx.Commit(); err == nil { + feedback.Info("Storing %d features took %s", + features, time.Since(start)) + } + + return nil, err +} diff -r af38a19f615a -r b2d88c680691 pkg/imports/modelconvert.go --- a/pkg/imports/modelconvert.go Tue Feb 04 16:29:49 2020 +0100 +++ b/pkg/imports/modelconvert.go Wed Feb 05 11:14:02 2020 +0100 @@ -4,7 +4,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // -// Copyright (C) 2018 by via donau +// Copyright (C) 2018, 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // @@ -27,6 +27,7 @@ DMVJobKind: func() interface{} { return new(models.DistanceMarksVirtualImport) }, FDJobKind: func() interface{} { return new(models.FairwayDimensionImport) }, DMAJobKind: func() interface{} { return new(models.DistanceMarksAshoreImport) }, + FMJobKind: func() interface{} { return new(models.FairwayMarksImport) }, STJobKind: func() interface{} { return new(models.StretchImport) }, SECJobKind: func() interface{} { return new(models.SectionImport) }, DSECJobKind: func() interface{} { return new(models.SectionDelete) }, @@ -136,6 +137,17 @@ } }, + FMJobKind: func(input interface{}) interface{} { + fmi := input.(*models.FairwayMarksImport) + return &FairwayMarks{ + URL: fmi.URL, + FeatureType: fmi.FeatureType, + SortBy: nilString(fmi.SortBy), + User: nilString(fmi.User), + Password: nilString(fmi.Password), + } + }, + STJobKind: func(input interface{}) interface{} { sti := input.(*models.StretchImport) return &Stretch{ diff -r af38a19f615a -r b2d88c680691 pkg/models/imports.go --- a/pkg/models/imports.go Tue Feb 04 16:29:49 2020 +0100 +++ b/pkg/models/imports.go Wed Feb 05 11:14:02 2020 +0100 @@ -4,7 +4,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // -// Copyright (C) 2018, 2019 by via donau +// Copyright (C) 2018, 2019, 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // @@ -78,6 +78,11 @@ WFSImport } + // FairwayMarksImport specifies an import of fairway marks. + FairwayMarksImport struct { + WFSImport + } + // FairwayDimensionImport specifies an import of the waterway axis. FairwayDimensionImport struct { WFSImport diff -r af38a19f615a -r b2d88c680691 schema/gemma.sql --- a/schema/gemma.sql Tue Feb 04 16:29:49 2020 +0100 +++ b/schema/gemma.sql Wed Feb 05 11:14:02 2020 +0100 @@ -840,6 +840,38 @@ CHECK(measure_type = 'minimum guaranteed' OR value_lifetime IS NOT NULL) ) + + CREATE TABLE fairway_marks ( + id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + geom geography(POINT, 4326) NOT NULL, + datsta varchar, + datend varchar, + persta varchar, + perend varchar, + objnam varchar, + nobjnm varchar, + inform varchar, + ninfom varchar, + scamin int, + picrep varchar, + txtdsc varchar, + sordat varchar, + sorind varchar, + colour varchar, + colpat varchar, + condtn int, + bcnshp int, + catlam int, + dirimp smallint REFERENCES dirimps + ) + -- Prevent identical entries using composite type comparison + -- (i.e. considering two NULL values in a field equal): + CREATE UNIQUE INDEX fairway_marks_distinct_rows ON fairway_marks + ((CAST((0, geom, + datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom, + scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn, + bcnshp, catlam, dirimp) AS fairway_marks) + )) ;