changeset 4899:b2d88c680691 fairway-marks-import

merge with default
author Thomas Junk <thomas.junk@intevation.de>
date Wed, 05 Feb 2020 11:14:02 +0100
parents 79a5d0c0d2f5 (diff) af38a19f615a (current diff)
children bbcea42bba87
files
diffstat 5 files changed, 395 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/routes.go	Tue Feb 04 16:29:49 2020 +0100
+++ b/pkg/controllers/routes.go	Wed Feb 05 11:14:02 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018 by via donau
+// Copyright (C) 2018, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -241,7 +241,7 @@
 
 	kinds := strings.Join([]string{
 		"bn", "gm", "fa", "wx", "wa",
-		"wg", "dmv", "fd", "dma",
+		"wg", "dmv", "fd", "dma", "fm",
 		"sec", "dsec", "dst", "dsr",
 	}, "|")
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/fm.go	Wed Feb 05 11:14:02 2020 +0100
@@ -0,0 +1,342 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"strconv"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/pgxutils"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+// FairwayMarks is an import job to import
+// fairway marks in form of point geometries
+// and attribute data from a WFS service.
+type FairwayMarks struct {
+	// URL the GetCapabilities URL of the WFS service.
+	URL string `json:"url"`
+	// FeatureType selects the feature type of the WFS service.
+	FeatureType string `json:"feature-type"`
+	// SortBy works around misconfigured services to
+	// establish a sort order to get the features.
+	SortBy string `json:"sort-by"`
+	// User is an optional username for Basic Auth.
+	User string `json:"user,omitempty"`
+	// Password is an optional password for Basic Auth.
+	Password string `json:"password,omitempty"`
+}
+
+// Description gives a short info about relevant facts of this import.
+func (fm *FairwayMarks) Description() (string, error) {
+	return fm.URL + "|" + fm.FeatureType, nil
+}
+
+// FMJobKind is the import queue type identifier.
+const FMJobKind JobKind = "fm"
+
+type fmJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(FMJobKind, fmJobCreator{})
+}
+
+func (fmJobCreator) Description() string { return "fairway marks" }
+
+func (fmJobCreator) AutoAccept() bool { return true }
+
+func (fmJobCreator) Create() Job { return new(FairwayMarks) }
+
+func (fmJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"fairway_marks"},
+		{},
+	}
+}
+
+// StageDone is a NOP for fairway marks imports.
+func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+// CleanUp for fairway marks imports is a NOP.
+func (*FairwayMarks) CleanUp() error { return nil }
+
+type fairwayMarksProperties struct {
+	Datsta      *string `json:"hydro_datsta"`
+	Datend      *string `json:"hydro_datend"`
+	Persta      *string `json:"hydro_persta"`
+	Perend      *string `json:"hydro_perend"`
+	Objnam      *string `json:"hydro_objnam"`
+	Nobjnm      *string `json:"hydro_nobjnm"`
+	Inform      *string `json:"hydro_inform"`
+	Ninfom      *string `json:"hydro_ninfom"`
+	Scamin      *int    `json:"hydro_scamin"`
+	Picrep      *string `json:"hydro_picrep"`
+	Txtdsc      *string `json:"hydro_txtdsc"`
+	Sordat      *string `json:"hydro_sordat"`
+	Sorind      *string `json:"hydro_sorind"`
+	Colour      *string `json:"hydro_colour"`
+	Colpat      *string `json:"hydro_colpat"`
+	Condtn      *int    `json:"hydro_condtn"`
+	Bcnshp      *int    `json:"hydro_bcnshp"`
+	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
+	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
+	Dirimp      *string `json:"ienc_dirimp,omitempty"`
+}
+
+const (
+	insertFairwayMarksSQL = `
+with a as (
+  select users.current_user_area_utm() AS a
+)
+INSERT INTO waterway.fairway_marks (
+  geom,
+  datsta,
+  datend,
+  persta,
+  perend,
+  objnam,
+  nobjnm,
+  inform,
+  ninfom,
+  scamin,
+  picrep,
+  txtdsc,
+  sordat,
+  sorind,
+  colour,
+  colpat,
+  condtn,
+  bcnshp,
+  catlam,
+  dirimp
+)
+SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
+    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
+  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
+  WHERE pg_has_role('sys_admin', 'MEMBER')
+    OR ST_Intersects((select a from a),
+      ST_Transform(newfm, (select ST_SRID(a) from a)))
+ON CONFLICT (
+    CAST((0, geom,
+      datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
+      scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn,
+      bcnshp, catlam, dirimp) AS waterway.fairway_marks)
+  )
+  DO NOTHING
+RETURNING id
+`
+)
+
+// Do executes the actual fairway marks import.
+func (fm *FairwayMarks) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import fairway marks")
+
+	feedback.Info("Loading capabilities from %s", fm.URL)
+	caps, err := wfs.GetCapabilities(fm.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(fm.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", fm.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if fm.SortBy != "" {
+		feedback.Info("Features will be sorted by '%s'", fm.SortBy)
+	}
+
+	dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy)
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		outsideOrDup      int
+		features          int
+	)
+
+	if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
+		feedback.Info("Get features from: '%s'", url)
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		savepoint := Savepoint(ctx, tx, "feature")
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			var props fairwayMarksProperties
+
+			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
+				badProperties++
+				continue
+			}
+
+			var catlam sql.NullInt64
+			if props.HydroCatlam != nil {
+				catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true}
+			} else if props.IENCCatlam != nil {
+				catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true}
+			}
+
+			var dirimp sql.NullInt64
+			if props.Dirimp != nil {
+				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
+					dirimp = sql.NullInt64{Int64: value, Valid: true}
+				}
+			}
+
+			switch feature.Geometry.Type {
+			case "Point":
+				var p pointSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+				var fmid int64
+				err := savepoint(func() error {
+					err := insertStmt.QueryRowContext(
+						ctx,
+						p.asWKB(),
+						epsg,
+						props.Datsta,
+						props.Datend,
+						props.Persta,
+						props.Perend,
+						props.Objnam,
+						props.Nobjnm,
+						props.Inform,
+						props.Ninfom,
+						props.Scamin,
+						props.Picrep,
+						props.Txtdsc,
+						props.Sordat,
+						props.Sorind,
+						props.Colour,
+						props.Colpat,
+						props.Condtn,
+						props.Bcnshp,
+						catlam,
+						dirimp,
+					).Scan(&fmid)
+					return err
+				})
+				switch {
+				case err == sql.ErrNoRows:
+					outsideOrDup++
+					// ignore -> filtered by responsibility_areas
+				case err != nil:
+					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+				default:
+					features++
+				}
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if outsideOrDup > 0 {
+		feedback.Info(
+			"Features outside responsibility area and duplicates: %d",
+			outsideOrDup)
+	}
+
+	if features == 0 {
+		err := errors.New("no valid new features found")
+		return nil, err
+	}
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, err
+}
--- a/pkg/imports/modelconvert.go	Tue Feb 04 16:29:49 2020 +0100
+++ b/pkg/imports/modelconvert.go	Wed Feb 05 11:14:02 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018 by via donau
+// Copyright (C) 2018, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -27,6 +27,7 @@
 	DMVJobKind:  func() interface{} { return new(models.DistanceMarksVirtualImport) },
 	FDJobKind:   func() interface{} { return new(models.FairwayDimensionImport) },
 	DMAJobKind:  func() interface{} { return new(models.DistanceMarksAshoreImport) },
+	FMJobKind:   func() interface{} { return new(models.FairwayMarksImport) },
 	STJobKind:   func() interface{} { return new(models.StretchImport) },
 	SECJobKind:  func() interface{} { return new(models.SectionImport) },
 	DSECJobKind: func() interface{} { return new(models.SectionDelete) },
@@ -136,6 +137,17 @@
 		}
 	},
 
+	FMJobKind: func(input interface{}) interface{} {
+		fmi := input.(*models.FairwayMarksImport)
+		return &FairwayMarks{
+			URL:         fmi.URL,
+			FeatureType: fmi.FeatureType,
+			SortBy:      nilString(fmi.SortBy),
+			User:        nilString(fmi.User),
+			Password:    nilString(fmi.Password),
+		}
+	},
+
 	STJobKind: func(input interface{}) interface{} {
 		sti := input.(*models.StretchImport)
 		return &Stretch{
--- a/pkg/models/imports.go	Tue Feb 04 16:29:49 2020 +0100
+++ b/pkg/models/imports.go	Wed Feb 05 11:14:02 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018, 2019 by via donau
+// Copyright (C) 2018, 2019, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -78,6 +78,11 @@
 		WFSImport
 	}
 
+	// FairwayMarksImport specifies an import of fairway marks.
+	FairwayMarksImport struct {
+		WFSImport
+	}
+
 	// FairwayDimensionImport specifies an import of the waterway axis.
 	FairwayDimensionImport struct {
 		WFSImport
--- a/schema/gemma.sql	Tue Feb 04 16:29:49 2020 +0100
+++ b/schema/gemma.sql	Wed Feb 05 11:14:02 2020 +0100
@@ -840,6 +840,38 @@
         CHECK(measure_type = 'minimum guaranteed'
             OR value_lifetime IS NOT NULL)
     )
+
+    CREATE TABLE fairway_marks (
+        id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+        geom geography(POINT, 4326) NOT NULL,
+        datsta varchar,
+        datend varchar,
+        persta varchar,
+        perend varchar,
+        objnam varchar,
+        nobjnm varchar,
+        inform varchar,
+        ninfom varchar,
+        scamin int,
+        picrep varchar,
+        txtdsc varchar,
+        sordat varchar,
+        sorind varchar,
+        colour varchar,
+        colpat varchar,
+        condtn int,
+        bcnshp int,
+        catlam int,
+        dirimp smallint REFERENCES dirimps
+    )
+    -- Prevent identical entries using composite type comparison
+    -- (i.e. considering two NULL values in a field equal):
+    CREATE UNIQUE INDEX fairway_marks_distinct_rows ON fairway_marks
+        ((CAST((0, geom,
+            datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
+            scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn,
+            bcnshp, catlam, dirimp) AS fairway_marks)
+        ))
 ;