changeset 4894:8eb36d0d5bdf fairway-marks-import

Draft implementation of fairway marks import Currently works (at least) for IENC feature BCNLAT. In order to be able to store different types of fairway marks, more tables and import types are needed. Currently stupidly stores every entry received from the data source. Sorting out duplicates and invalidating entries removed from the data source needs to implemented.
author Tom Gottfried <tom@intevation.de>
date Tue, 14 Jan 2020 15:01:42 +0100
parents b55120fa8913
children 9f799077a3e6
files pkg/controllers/routes.go pkg/imports/fm.go pkg/imports/modelconvert.go pkg/models/imports.go schema/gemma.sql
diffstat 5 files changed, 379 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/routes.go	Thu Jan 09 17:23:54 2020 +0100
+++ b/pkg/controllers/routes.go	Tue Jan 14 15:01:42 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018 by via donau
+// Copyright (C) 2018, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -241,7 +241,7 @@
 
 	kinds := strings.Join([]string{
 		"bn", "gm", "fa", "wx", "wa",
-		"wg", "dmv", "fd", "dma",
+		"wg", "dmv", "fd", "dma", "fm",
 		"sec", "dsec", "dst", "dsr",
 	}, "|")
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/fm.go	Tue Jan 14 15:01:42 2020 +0100
@@ -0,0 +1,334 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"strconv"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/pgxutils"
+	"gemma.intevation.de/gemma/pkg/wfs"
+)
+
+// FairwayMarks is an import job to import
+// fairway marks in form of point geometries
+// and attribute data from a WFS service.
+type FairwayMarks struct {
+	// URL the GetCapabilities URL of the WFS service.
+	URL string `json:"url"`
+	// FeatureType selects the feature type of the WFS service.
+	FeatureType string `json:"feature-type"`
+	// SortBy works around misconfigured services to
+	// establish a sort order to get the features.
+	SortBy string `json:"sort-by"`
+	// User is an optional username for Basic Auth.
+	User string `json:"user,omitempty"`
+	// Password is an optional password for Basic Auth.
+	Password string `json:"password,omitempty"`
+}
+
+// Description gives a short info about relevant facts of this import.
+func (fm *FairwayMarks) Description() (string, error) {
+	return fm.URL + "|" + fm.FeatureType, nil
+}
+
+// FMJobKind is the import queue type identifier.
+const FMJobKind JobKind = "fm"
+
+type fmJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(FMJobKind, fmJobCreator{})
+}
+
+func (fmJobCreator) Description() string { return "fairway marks" }
+
+func (fmJobCreator) AutoAccept() bool { return true }
+
+func (fmJobCreator) Create() Job { return new(FairwayMarks) }
+
+func (fmJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"fairway_marks"},
+		{},
+	}
+}
+
+// StageDone is a NOP for fairway marks imports.
+func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+// CleanUp for fairway marks imports is a NOP.
+func (*FairwayMarks) CleanUp() error { return nil }
+
+type fairwayMarksProperties struct {
+	Datsta      *string `json:"hydro_datsta"`
+	Datend      *string `json:"hydro_datend"`
+	Persta      *string `json:"hydro_persta"`
+	Perend      *string `json:"hydro_perend"`
+	Objnam      *string `json:"hydro_objnam"`
+	Nobjnm      *string `json:"hydro_nobjnm"`
+	Inform      *string `json:"hydro_inform"`
+	Ninfom      *string `json:"hydro_ninfom"`
+	Scamin      *int    `json:"hydro_scamin"`
+	Picrep      *string `json:"hydro_picrep"`
+	Txtdsc      *string `json:"hydro_txtdsc"`
+	Sordat      *string `json:"hydro_sordat"`
+	Sorind      *string `json:"hydro_sorind"`
+	Colour      *string `json:"hydro_colour"`
+	Colpat      *string `json:"hydro_colpat"`
+	Condtn      *int    `json:"hydro_condtn"`
+	Bcnshp      *int    `json:"hydro_bcnshp"`
+	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
+	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
+	Dirimp      *string `json:"ienc_dirimp,omitempty"`
+}
+
+const (
+	insertFairwayMarksSQL = `
+with a as (
+  select users.current_user_area_utm() AS a
+)
+INSERT INTO waterway.fairway_marks (
+  geom,
+  datsta,
+  datend,
+  persta,
+  perend,
+  objnam,
+  nobjnm,
+  inform,
+  ninfom,
+  scamin,
+  picrep,
+  txtdsc,
+  sordat,
+  sorind,
+  colour,
+  colpat,
+  condtn,
+  bcnshp,
+  catlam,
+  dirimp
+)
+SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
+    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
+  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
+  WHERE pg_has_role('sys_admin', 'MEMBER')
+    OR ST_Intersects((select a from a),
+      ST_Transform(newfm, (select ST_SRID(a) from a)))
+RETURNING id
+`
+)
+
+// Do executes the actual fairway marks import.
+func (fm *FairwayMarks) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import fairway marks")
+
+	feedback.Info("Loading capabilities from %s", fm.URL)
+	caps, err := wfs.GetCapabilities(fm.URL)
+	if err != nil {
+		feedback.Error("Loading capabilities failed: %v", err)
+		return nil, err
+	}
+
+	ft := caps.FindFeatureType(fm.FeatureType)
+	if ft == nil {
+		return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
+	}
+
+	feedback.Info("Found feature type '%s", fm.FeatureType)
+
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
+	if fm.SortBy != "" {
+		feedback.Info("Features will be sorted by '%s'", fm.SortBy)
+	}
+
+	dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy)
+	if err != nil {
+		feedback.Error("Cannot create GetFeature URLs. %v", err)
+		return nil, err
+	}
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	var (
+		unsupported       = stringCounter{}
+		missingProperties int
+		badProperties     int
+		outside           int
+		features          int
+	)
+
+	if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
+		feedback.Info("Get features from: '%s'", url)
+		rfc, err := wfs.ParseRawFeatureCollection(r)
+		if err != nil {
+			return fmt.Errorf("parsing GetFeature document failed: %v", err)
+		}
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
+		}
+
+		// No features -> ignore.
+		if rfc.Features == nil {
+			return nil
+		}
+
+		feedback.Info("Using EPSG: %d", epsg)
+
+		savepoint := Savepoint(ctx, tx, "feature")
+
+		for _, feature := range rfc.Features {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
+				missingProperties++
+				continue
+			}
+
+			var props fairwayMarksProperties
+
+			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
+				badProperties++
+				continue
+			}
+
+			var catlam sql.NullInt64
+			if props.HydroCatlam != nil {
+				catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true}
+			} else if props.IENCCatlam != nil {
+				catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true}
+			}
+
+			var dirimp sql.NullInt64
+			if props.Dirimp != nil {
+				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
+					dirimp = sql.NullInt64{Int64: value, Valid: true}
+				}
+			}
+
+			switch feature.Geometry.Type {
+			case "Point":
+				var p pointSlice
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+					return err
+				}
+				var fmid int64
+				err := savepoint(func() error {
+					err := insertStmt.QueryRowContext(
+						ctx,
+						p.asWKB(),
+						epsg,
+						props.Datsta,
+						props.Datend,
+						props.Persta,
+						props.Perend,
+						props.Objnam,
+						props.Nobjnm,
+						props.Inform,
+						props.Ninfom,
+						props.Scamin,
+						props.Picrep,
+						props.Txtdsc,
+						props.Sordat,
+						props.Sorind,
+						props.Colour,
+						props.Colpat,
+						props.Condtn,
+						props.Bcnshp,
+						catlam,
+						dirimp,
+					).Scan(&fmid)
+					return err
+				})
+				switch {
+				case err == sql.ErrNoRows:
+					outside++
+					// ignore -> filtered by responsibility_areas
+				case err != nil:
+					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+				default:
+					features++
+				}
+			default:
+				unsupported[feature.Geometry.Type]++
+			}
+		}
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	if badProperties > 0 {
+		feedback.Warn("Bad properties: %d", badProperties)
+	}
+
+	if missingProperties > 0 {
+		feedback.Warn("Missing properties: %d", missingProperties)
+	}
+
+	if len(unsupported) != 0 {
+		feedback.Warn("Unsupported types found: %s", unsupported)
+	}
+
+	if outside > 0 {
+		feedback.Info("Features outside responsibility area: %d", outside)
+	}
+
+	if features == 0 {
+		err := errors.New("no features found")
+		feedback.Error("%v", err)
+		return nil, err
+	}
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, err
+}
--- a/pkg/imports/modelconvert.go	Thu Jan 09 17:23:54 2020 +0100
+++ b/pkg/imports/modelconvert.go	Tue Jan 14 15:01:42 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018 by via donau
+// Copyright (C) 2018, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -27,6 +27,7 @@
 	DMVJobKind:  func() interface{} { return new(models.DistanceMarksVirtualImport) },
 	FDJobKind:   func() interface{} { return new(models.FairwayDimensionImport) },
 	DMAJobKind:  func() interface{} { return new(models.DistanceMarksAshoreImport) },
+	FMJobKind:   func() interface{} { return new(models.FairwayMarksImport) },
 	STJobKind:   func() interface{} { return new(models.StretchImport) },
 	SECJobKind:  func() interface{} { return new(models.SectionImport) },
 	DSECJobKind: func() interface{} { return new(models.SectionDelete) },
@@ -136,6 +137,17 @@
 		}
 	},
 
+	FMJobKind: func(input interface{}) interface{} {
+		fmi := input.(*models.FairwayMarksImport)
+		return &FairwayMarks{
+			URL:         fmi.URL,
+			FeatureType: fmi.FeatureType,
+			SortBy:      nilString(fmi.SortBy),
+			User:        nilString(fmi.User),
+			Password:    nilString(fmi.Password),
+		}
+	},
+
 	STJobKind: func(input interface{}) interface{} {
 		sti := input.(*models.StretchImport)
 		return &Stretch{
--- a/pkg/models/imports.go	Thu Jan 09 17:23:54 2020 +0100
+++ b/pkg/models/imports.go	Tue Jan 14 15:01:42 2020 +0100
@@ -4,7 +4,7 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 // License-Filename: LICENSES/AGPL-3.0.txt
 //
-// Copyright (C) 2018, 2019 by via donau
+// Copyright (C) 2018, 2019, 2020 by via donau
 //   – Österreichische Wasserstraßen-Gesellschaft mbH
 // Software engineering by Intevation GmbH
 //
@@ -78,6 +78,11 @@
 		WFSImport
 	}
 
+	// FairwayMarksImport specifies an import of fairway marks.
+	FairwayMarksImport struct {
+		WFSImport
+	}
+
 	// FairwayDimensionImport specifies an import of the waterway axis.
 	FairwayDimensionImport struct {
 		WFSImport
--- a/schema/gemma.sql	Thu Jan 09 17:23:54 2020 +0100
+++ b/schema/gemma.sql	Tue Jan 14 15:01:42 2020 +0100
@@ -840,6 +840,30 @@
         CHECK(measure_type = 'minimum guaranteed'
             OR value_lifetime IS NOT NULL)
     )
+
+    CREATE TABLE fairway_marks (
+        id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+        geom geography(POINT, 4326) NOT NULL,
+        datsta varchar,
+        datend varchar,
+        persta varchar,
+        perend varchar,
+        objnam varchar,
+        nobjnm varchar,
+        inform varchar,
+        ninfom varchar,
+        scamin int,
+        picrep varchar,
+        txtdsc varchar,
+        sordat varchar,
+        sorind varchar,
+        colour varchar,
+        colpat varchar,
+        condtn int,
+        bcnshp int,
+        catlam int,
+        dirimp smallint REFERENCES dirimps
+    )
 ;