changeset 4904:53d929f658f3 fairway-marks-import

Separate code common to all types of fairway mark imports Currently the only implemented type of fairway marks is BCNLAT.
author Tom Gottfried <tom@intevation.de>
date Thu, 06 Feb 2020 19:09:14 +0100
parents 69cc3d3047ab
children 8cb201b551b3
files pkg/imports/fm.go pkg/imports/fm_bcnlat.go
diffstat 2 files changed, 283 insertions(+), 213 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fm.go	Wed Feb 05 17:00:29 2020 +0100
+++ b/pkg/imports/fm.go	Thu Feb 06 19:09:14 2020 +0100
@@ -14,22 +14,16 @@
 package imports
 
 import (
-	"context"
-	"database/sql"
 	"encoding/json"
-	"errors"
 	"fmt"
 	"io"
-	"strconv"
-	"time"
 
-	"gemma.intevation.de/gemma/pkg/pgxutils"
 	"gemma.intevation.de/gemma/pkg/wfs"
 )
 
-// FairwayMarks is an import job to import
-// fairway marks in form of point geometries
-// and attribute data from a WFS service.
+// FairwayMarks is a struct
+// to be used as the basis for imports of
+// specific types for fairway marks.
 type FairwayMarks struct {
 	// URL the GetCapabilities URL of the WFS service.
 	URL string `json:"url"`
@@ -44,138 +38,56 @@
 	Password string `json:"password,omitempty"`
 }
 
-// Description gives a short info about relevant facts of this import.
-func (fm *FairwayMarks) Description() (string, error) {
-	return fm.URL + "|" + fm.FeatureType, nil
-}
-
-// FMJobKind is the import queue type identifier.
-const FMJobKind JobKind = "fm"
-
-type fmJobCreator struct{}
-
-func init() {
-	RegisterJobCreator(FMJobKind, fmJobCreator{})
-}
-
-func (fmJobCreator) Description() string { return "fairway marks" }
-
-func (fmJobCreator) AutoAccept() bool { return true }
-
-func (fmJobCreator) Create() Job { return new(FairwayMarks) }
-
-func (fmJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"fairway_marks"},
-		{},
-	}
-}
-
-// StageDone is a NOP for fairway marks imports.
-func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
+// Properties common to all types of fairway marks
+type fairwayMarksProperties struct {
+	Datsta *string `json:"hydro_datsta"`
+	Datend *string `json:"hydro_datend"`
+	Persta *string `json:"hydro_persta"`
+	Perend *string `json:"hydro_perend"`
+	Objnam *string `json:"hydro_objnam"`
+	Nobjnm *string `json:"hydro_nobjnm"`
+	Inform *string `json:"hydro_inform"`
+	Ninfom *string `json:"hydro_ninfom"`
+	Scamin *int    `json:"hydro_scamin"`
+	Picrep *string `json:"hydro_picrep"`
+	Txtdsc *string `json:"hydro_txtdsc"`
+	Sordat *string `json:"hydro_sordat"`
+	Sorind *string `json:"hydro_sorind"`
 }
 
-// CleanUp for fairway marks imports is a NOP.
-func (*FairwayMarks) CleanUp() error { return nil }
-
-type fairwayMarksProperties struct {
-	Datsta      *string `json:"hydro_datsta"`
-	Datend      *string `json:"hydro_datend"`
-	Persta      *string `json:"hydro_persta"`
-	Perend      *string `json:"hydro_perend"`
-	Objnam      *string `json:"hydro_objnam"`
-	Nobjnm      *string `json:"hydro_nobjnm"`
-	Inform      *string `json:"hydro_inform"`
-	Ninfom      *string `json:"hydro_ninfom"`
-	Scamin      *int    `json:"hydro_scamin"`
-	Picrep      *string `json:"hydro_picrep"`
-	Txtdsc      *string `json:"hydro_txtdsc"`
-	Sordat      *string `json:"hydro_sordat"`
-	Sorind      *string `json:"hydro_sorind"`
-	Colour      *string `json:"hydro_colour"`
-	Colpat      *string `json:"hydro_colpat"`
-	Condtn      *int    `json:"hydro_condtn"`
-	Bcnshp      *int    `json:"hydro_bcnshp"`
-	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
-	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
-	Dirimp      *string `json:"ienc_dirimp,omitempty"`
-}
-
-const (
-	insertFairwayMarksSQL = `
-with a as (
-  select users.current_user_area_utm() AS a
-)
-INSERT INTO waterway.fairway_marks (
-  geom,
-  datsta,
-  datend,
-  persta,
-  perend,
-  objnam,
-  nobjnm,
-  inform,
-  ninfom,
-  scamin,
-  picrep,
-  txtdsc,
-  sordat,
-  sorind,
-  colour,
-  colpat,
-  condtn,
-  bcnshp,
-  catlam,
-  dirimp
-)
-SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
-    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
-  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
-  WHERE pg_has_role('sys_admin', 'MEMBER')
-    OR ST_Intersects((select a from a),
-      ST_Transform(newfm, (select ST_SRID(a) from a)))
-ON CONFLICT (
-    CAST((0, geom,
-      datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
-      scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn,
-      bcnshp, catlam, dirimp) AS waterway.fairway_marks)
-  )
-  DO NOTHING
-RETURNING id
-`
-)
-
-// Do executes the actual fairway marks import.
-func (fm *FairwayMarks) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
+// Common operation of FM imports to get features from WFS service
+func getFMFeatures(
 	feedback Feedback,
-) (interface{}, error) {
-
-	start := time.Now()
-
-	feedback.Info("Import fairway marks")
+	fm FairwayMarks,
+	// Pointer to a struct representing featuretype's properties
+	props interface{},
+) (
+	// Elements can be converted to []interface{}{p, fp} with
+	// p being a pointSlice and fp of the type of argument props.
+	fms [][]interface{},
+	epsg int,
+	err error,
+) {
 
 	feedback.Info("Loading capabilities from %s", fm.URL)
 	caps, err := wfs.GetCapabilities(fm.URL)
 	if err != nil {
 		feedback.Error("Loading capabilities failed: %v", err)
-		return nil, err
+		return
 	}
 
 	ft := caps.FindFeatureType(fm.FeatureType)
 	if ft == nil {
-		return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
+		err = fmt.Errorf("unknown feature type '%s'", fm.FeatureType)
+		return
 	}
 
 	feedback.Info("Found feature type '%s", fm.FeatureType)
 
-	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	epsg, err = wfs.CRSToEPSG(ft.DefaultCRS)
 	if err != nil {
 		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
-		return nil, err
+		return
 	}
 
 	if fm.SortBy != "" {
@@ -185,30 +97,16 @@
 	dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy)
 	if err != nil {
 		feedback.Error("Cannot create GetFeature URLs. %v", err)
-		return nil, err
-	}
-
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
+		return
 	}
-	defer tx.Rollback()
-
-	insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertStmt.Close()
 
 	var (
 		unsupported       = stringCounter{}
 		missingProperties int
 		badProperties     int
-		outsideOrDup      int
-		features          int
 	)
 
-	if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
+	err = dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error {
 		feedback.Info("Get features from: '%s'", url)
 		rfc, err := wfs.ParseRawFeatureCollection(r)
 		if err != nil {
@@ -229,85 +127,34 @@
 
 		feedback.Info("Using EPSG: %d", epsg)
 
-		savepoint := Savepoint(ctx, tx, "feature")
-
 		for _, feature := range rfc.Features {
 			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
 				missingProperties++
 				continue
 			}
 
-			var props fairwayMarksProperties
-
-			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
+			if err := json.Unmarshal(*feature.Properties, props); err != nil {
 				badProperties++
 				continue
 			}
 
-			var catlam sql.NullInt64
-			if props.HydroCatlam != nil {
-				catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true}
-			} else if props.IENCCatlam != nil {
-				catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true}
-			}
-
-			var dirimp sql.NullInt64
-			if props.Dirimp != nil {
-				if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
-					dirimp = sql.NullInt64{Int64: value, Valid: true}
-				}
-			}
-
 			switch feature.Geometry.Type {
 			case "Point":
 				var p pointSlice
 				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
 					return err
 				}
-				var fmid int64
-				err := savepoint(func() error {
-					err := insertStmt.QueryRowContext(
-						ctx,
-						p.asWKB(),
-						epsg,
-						props.Datsta,
-						props.Datend,
-						props.Persta,
-						props.Perend,
-						props.Objnam,
-						props.Nobjnm,
-						props.Inform,
-						props.Ninfom,
-						props.Scamin,
-						props.Picrep,
-						props.Txtdsc,
-						props.Sordat,
-						props.Sorind,
-						props.Colour,
-						props.Colpat,
-						props.Condtn,
-						props.Bcnshp,
-						catlam,
-						dirimp,
-					).Scan(&fmid)
-					return err
-				})
-				switch {
-				case err == sql.ErrNoRows:
-					outsideOrDup++
-					// ignore -> filtered by responsibility_areas
-				case err != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				default:
-					features++
-				}
+
+				f := []interface{}{p, props}
+				fms = append(fms, f)
 			default:
 				unsupported[feature.Geometry.Type]++
 			}
 		}
 		return nil
-	}); err != nil {
-		return nil, err
+	})
+	if err != nil {
+		return
 	}
 
 	if badProperties > 0 {
@@ -322,21 +169,7 @@
 		feedback.Warn("Unsupported types found: %s", unsupported)
 	}
 
-	if outsideOrDup > 0 {
-		feedback.Info(
-			"Features outside responsibility area and duplicates: %d",
-			outsideOrDup)
-	}
+	feedback.Info("Found %d usable features in data source", len(fms))
 
-	if features == 0 {
-		err := errors.New("no valid new features found")
-		return nil, err
-	}
-
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Storing %d features took %s",
-			features, time.Since(start))
-	}
-
-	return nil, err
+	return
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/imports/fm_bcnlat.go	Thu Feb 06 19:09:14 2020 +0100
@@ -0,0 +1,237 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2020 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Tom Gottfried <tom.gottfried@intevation.de>
+
+package imports
+
+import (
+	"context"
+	"database/sql"
+	"errors"
+	"strconv"
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/pgxutils"
+)
+
+// Bcnlat is an import job to import
+// fairway marks of type BCNLAT in form of point geometries
+// and attribute data from a WFS service.
+type Bcnlat struct {
+	FairwayMarks
+}
+
+// Description gives a short info about relevant facts of this import.
+func (bcnlat *Bcnlat) Description() (string, error) {
+	return bcnlat.URL + "|" + bcnlat.FeatureType, nil
+}
+
+// FMJobKind is the import queue type identifier.
+const FMJobKind JobKind = "fm"
+
+type bcnlatJobCreator struct{}
+
+func init() {
+	RegisterJobCreator(FMJobKind, bcnlatJobCreator{})
+}
+
+func (bcnlatJobCreator) Description() string { return "fairway marks bcnlat" }
+
+func (bcnlatJobCreator) AutoAccept() bool { return true }
+
+func (bcnlatJobCreator) Create() Job { return new(Bcnlat) }
+
+func (bcnlatJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"fairway_marks"},
+		{},
+	}
+}
+
+// StageDone is a NOP for fairway marks imports.
+func (bcnlatJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+	return nil
+}
+
+// CleanUp for fairway marks imports is a NOP.
+func (*Bcnlat) CleanUp() error { return nil }
+
+type bcnlatProperties struct {
+	fairwayMarksProperties
+	Colour      *string `json:"hydro_colour"`
+	Colpat      *string `json:"hydro_colpat"`
+	Condtn      *int    `json:"hydro_condtn"`
+	Bcnshp      *int    `json:"hydro_bcnshp"`
+	HydroCatlam *int64  `json:"hydro_catlam,omitempty"`
+	IENCCatlam  *int64  `json:"ienc_catlam,omitempty"`
+	Dirimp      *string `json:"ienc_dirimp,omitempty"`
+}
+
+const (
+	insertBCNLATSQL = `
+with a as (
+  select users.current_user_area_utm() AS a
+)
+INSERT INTO waterway.fairway_marks (
+  geom,
+  datsta,
+  datend,
+  persta,
+  perend,
+  objnam,
+  nobjnm,
+  inform,
+  ninfom,
+  scamin,
+  picrep,
+  txtdsc,
+  sordat,
+  sorind,
+  colour,
+  colpat,
+  condtn,
+  bcnshp,
+  catlam,
+  dirimp
+)
+SELECT newfm, $3, $4, $5, $6, $7, $8, $9,
+    $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
+  FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm)
+  WHERE pg_has_role('sys_admin', 'MEMBER')
+    OR ST_Intersects((select a from a),
+      ST_Transform(newfm, (select ST_SRID(a) from a)))
+ON CONFLICT (
+    CAST((0, geom,
+      datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom,
+      scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn,
+      bcnshp, catlam, dirimp) AS waterway.fairway_marks)
+  )
+  DO NOTHING
+RETURNING id
+`
+)
+
+// Do executes the actual import.
+func (fm *Bcnlat) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	start := time.Now()
+
+	feedback.Info("Import fairway marks of type BCNLAT/bcnlat")
+
+	var props bcnlatProperties
+	fms, epsg, err := getFMFeatures(
+		feedback,
+		fm.FairwayMarks,
+		&props,
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertBCNLATSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	savepoint := Savepoint(ctx, tx, "feature")
+
+	var outsideOrDup int
+
+	var features int
+	for _, fm := range fms {
+
+		p := fm[0].(pointSlice)
+		fp := fm[1].(*bcnlatProperties)
+
+		var catlam sql.NullInt64
+		if fp.HydroCatlam != nil {
+			catlam = sql.NullInt64{Int64: *fp.HydroCatlam, Valid: true}
+		} else if fp.IENCCatlam != nil {
+			catlam = sql.NullInt64{Int64: *fp.IENCCatlam, Valid: true}
+		}
+
+		var dirimp sql.NullInt64
+		if fp.Dirimp != nil {
+			if value, err := strconv.ParseInt(*fp.Dirimp, 10, 64); err == nil {
+				dirimp = sql.NullInt64{Int64: value, Valid: true}
+			}
+		}
+
+		var fmid int64
+		err := savepoint(func() error {
+			err := insertStmt.QueryRowContext(
+				ctx,
+				p.asWKB(),
+				epsg,
+				fp.Datsta,
+				fp.Datend,
+				fp.Persta,
+				fp.Perend,
+				fp.Objnam,
+				fp.Nobjnm,
+				fp.Inform,
+				fp.Ninfom,
+				fp.Scamin,
+				fp.Picrep,
+				fp.Txtdsc,
+				fp.Sordat,
+				fp.Sorind,
+				fp.Colour,
+				fp.Colpat,
+				fp.Condtn,
+				fp.Bcnshp,
+				catlam,
+				dirimp,
+			).Scan(&fmid)
+			return err
+		})
+		switch {
+		case err == sql.ErrNoRows:
+			outsideOrDup++
+			// ignore -> filtered by responsibility_areas
+		case err != nil:
+			feedback.Error(pgxutils.ReadableError{Err: err}.Error())
+		default:
+			features++
+		}
+	}
+
+	if outsideOrDup > 0 {
+		feedback.Info(
+			"Features outside responsibility area and duplicates: %d",
+			outsideOrDup)
+	}
+
+	if features == 0 {
+		err := errors.New("no valid new features found")
+		return nil, err
+	}
+
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
+	}
+
+	return nil, err
+}