changeset 4977:4bf6cde2d996

Generalize WFS point import job to be able to import other geometry types, too.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 03 Mar 2020 18:05:33 +0100
parents ff965141d085
children 35a3dc12050f
files pkg/imports/fm.go pkg/imports/pointwfs.go pkg/imports/wkb.go
diffstat 3 files changed, 133 insertions(+), 122 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fm.go	Tue Mar 03 15:33:04 2020 +0100
+++ b/pkg/imports/fm.go	Tue Mar 03 18:05:33 2020 +0100
@@ -209,7 +209,7 @@
 
 func init() {
 	RegisterJobCreator(BCNLATHYDROJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks bcnlat (HYDRO)",
 			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}, {}},
 			newConsumer: newSQLConsumer(
@@ -219,12 +219,12 @@
 				),
 				consume,
 				createInvalidation("bcnlat_hydro"),
-				func() interface{} { return new(bcnlatHydroProperties) },
+				newPointSlice(func() interface{} { return new(bcnlatHydroProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BCNLATIENCJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks bcnlat (IENC)",
 			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}, {}},
 			newConsumer: newSQLConsumer(
@@ -235,12 +235,12 @@
 				),
 				consume,
 				createInvalidation("bcnlat_ienc"),
-				func() interface{} { return new(bcnlatIencProperties) },
+				newPointSlice(func() interface{} { return new(bcnlatIencProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BOYLATHYDROJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks boylat (HYDRO)",
 			depends:     [2][]string{{"fairway_marks_boylat_hydro"}, {}},
 			newConsumer: newSQLConsumer(
@@ -251,12 +251,12 @@
 				),
 				consume,
 				createInvalidation("boylat_hydro"),
-				func() interface{} { return new(boylatHydroProperties) },
+				newPointSlice(func() interface{} { return new(boylatHydroProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BOYLATIENCJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks boylat (IENC)",
 			depends:     [2][]string{{"fairway_marks_boylat_ienc"}, {}},
 			newConsumer: newSQLConsumer(
@@ -267,12 +267,12 @@
 				),
 				consume,
 				createInvalidation("boylat_ienc"),
-				func() interface{} { return new(boylatIencProperties) },
+				newPointSlice(func() interface{} { return new(boylatIencProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BOYCARJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks boycar",
 			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
 			newConsumer: newSQLConsumer(
@@ -283,12 +283,12 @@
 				),
 				consume,
 				createInvalidation("boycar"),
-				func() interface{} { return new(boycarProperties) },
+				newPointSlice(func() interface{} { return new(boycarProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BOYSAWJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks boysaw",
 			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
 			newConsumer: newSQLConsumer(
@@ -298,12 +298,12 @@
 				),
 				consume,
 				createInvalidation("boysaw"),
-				func() interface{} { return new(boysawProperties) },
+				newPointSlice(func() interface{} { return new(boysawProperties) }),
 			),
 		})
 
 	RegisterJobCreator(BOYSPPJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks boyspp",
 			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
 			newConsumer: newSQLConsumer(
@@ -314,12 +314,12 @@
 				),
 				consume,
 				createInvalidation("boyspp"),
-				func() interface{} { return new(boysppProperties) },
+				newPointSlice(func() interface{} { return new(boysppProperties) }),
 			),
 		})
 
 	RegisterJobCreator(DAYMARHYDROJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks daymar (HYDRO)",
 			depends:     [2][]string{{"fairway_marks_daymar_hydro"}, {}},
 			newConsumer: newSQLConsumer(
@@ -329,12 +329,12 @@
 				),
 				consume,
 				createInvalidation("daymar_hydro"),
-				func() interface{} { return new(daymarHydroProperties) },
+				newPointSlice(func() interface{} { return new(daymarHydroProperties) }),
 			),
 		})
 
 	RegisterJobCreator(DAYMARIENCJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks daymar (IENC)",
 			depends:     [2][]string{{"fairway_marks_daymar_ienc"}, {}},
 			newConsumer: newSQLConsumer(
@@ -345,12 +345,12 @@
 				),
 				consume,
 				createInvalidation("daymar_ienc"),
-				func() interface{} { return new(daymarIencProperties) },
+				newPointSlice(func() interface{} { return new(daymarIencProperties) }),
 			),
 		})
 
 	RegisterJobCreator(LIGHTSJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks lights",
 			depends:     [2][]string{{"fairway_marks_lights"}, {}},
 			newConsumer: newSQLConsumer(
@@ -364,12 +364,12 @@
 				),
 				consume,
 				createInvalidation("lights"),
-				func() interface{} { return new(lightsProperties) },
+				newPointSlice(func() interface{} { return new(lightsProperties) }),
 			),
 		})
 
 	RegisterJobCreator(NOTMRKJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks notmrk",
 			depends:     [2][]string{{"fairway_marks_lights"}, {}},
 			newConsumer: newSQLConsumer(
@@ -383,12 +383,12 @@
 				),
 				consume,
 				createInvalidation("notmrk"),
-				func() interface{} { return new(notmrkProperties) },
+				newPointSlice(func() interface{} { return new(notmrkProperties) }),
 			),
 		})
 
 	RegisterJobCreator(RTPBCNJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks rtpbcn",
 			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
 			newConsumer: newSQLConsumer(
@@ -398,12 +398,12 @@
 				),
 				consume,
 				createInvalidation("rtpbcn"),
-				func() interface{} { return new(rtpbcnProperties) },
+				newPointSlice(func() interface{} { return new(rtpbcnProperties) }),
 			),
 		})
 
 	RegisterJobCreator(TOPMARJobKind,
-		&PointWFSJobCreator{
+		&GeometryWFSJobCreator{
 			description: "fairway marks topmar",
 			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
 			newConsumer: newSQLConsumer(
@@ -413,7 +413,7 @@
 				),
 				consume,
 				createInvalidation("topmar"),
-				func() interface{} { return new(topmarProperties) },
+				newPointSlice(func() interface{} { return new(topmarProperties) }),
 			),
 		})
 }
@@ -509,11 +509,11 @@
 `
 )
 
-func createInvalidation(fmType string) func(*SQLPointConsumer) error {
+func createInvalidation(fmType string) func(*SQLGeometryConsumer) error {
 
 	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)
 
-	return func(spc *SQLPointConsumer) error {
+	return func(spc *SQLGeometryConsumer) error {
 		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
 		if err != nil {
 			return err
@@ -545,7 +545,7 @@
 	)
 }
 
-func storeAttribs(spc *SQLPointConsumer, id int64, attrs *string) {
+func storeAttribs(spc *SQLGeometryConsumer, id int64, attrs *string) {
 	if attrs == nil || *attrs == "" {
 		return
 	}
@@ -566,9 +566,8 @@
 }
 
 func consume(
-	spc *SQLPointConsumer,
-	points pointSlice,
-	properties interface{},
+	spc *SQLGeometryConsumer,
+	points, properties interface{},
 	epsg int,
 ) error {
 	var fmid int64
@@ -577,7 +576,7 @@
 			spc.ctx,
 			append(
 				[]interface{}{
-					points.asWKB(),
+					points.(*pointSlice).asWKB(),
 					epsg,
 				},
 				structs.Values(properties)...)...,
--- a/pkg/imports/pointwfs.go	Tue Mar 03 15:33:04 2020 +0100
+++ b/pkg/imports/pointwfs.go	Tue Mar 03 18:05:33 2020 +0100
@@ -34,59 +34,60 @@
 )
 
 type (
-	WFSPointConsumer interface {
+	WFSGeometryConsumer interface {
 		Commit() error
 		Rollback() error
 
-		NewProperties() interface{}
-		Consume(points pointSlice, properties interface{}, epsg int) error
+		NewFeature() (string, interface{}, interface{})
+
+		Consume(geom, properties interface{}, epsg int) error
 	}
 
-	PointWFSJobCreator struct {
+	GeometryWFSJobCreator struct {
 		description string
 		depends     [2][]string
 
-		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error)
+		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSGeometryConsumer, error)
 	}
 
-	PointWFSJob struct {
+	GeometryWFSJob struct {
 		models.WFSImport
-		creator *PointWFSJobCreator
+		creator *GeometryWFSJobCreator
 	}
 )
 
-func (pwjc *PointWFSJobCreator) Description() string {
-	return pwjc.description
+func (gwjc *GeometryWFSJobCreator) Description() string {
+	return gwjc.description
 }
 
-func (pwjc *PointWFSJobCreator) Depends() [2][]string {
-	return pwjc.depends
+func (gwjc *GeometryWFSJobCreator) Depends() [2][]string {
+	return gwjc.depends
 }
 
-func (*PointWFSJobCreator) AutoAccept() bool {
+func (*GeometryWFSJobCreator) AutoAccept() bool {
 	return true
 }
 
 // StageDone is a NOP for WFS imports.
-func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (*GeometryWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
 	return nil
 }
 
-func (pwjc *PointWFSJobCreator) Create() Job {
-	return &PointWFSJob{creator: pwjc}
+func (gwjc *GeometryWFSJobCreator) Create() Job {
+	return &GeometryWFSJob{creator: gwjc}
 }
 
 // Description gives a short info about relevant facts of this import.
-func (pwj *PointWFSJob) Description() (string, error) {
-	return pwj.URL + "|" + pwj.FeatureType, nil
+func (gwj *GeometryWFSJob) Description() (string, error) {
+	return gwj.URL + "|" + gwj.FeatureType, nil
 }
 
 // CleanUp for WFS imports is a NOP.
-func (*PointWFSJob) CleanUp() error {
+func (*GeometryWFSJob) CleanUp() error {
 	return nil
 }
 
-func (pwj *PointWFSJob) Do(
+func (gwj *GeometryWFSJob) Do(
 	ctx context.Context,
 	importID int64,
 	conn *sql.Conn,
@@ -95,21 +96,21 @@
 
 	start := time.Now()
 
-	feedback.Info("Import %s", pwj.creator.Description())
+	feedback.Info("Import %s", gwj.creator.Description())
 
-	feedback.Info("Loading capabilities from %s", pwj.URL)
-	caps, err := wfs.GetCapabilities(pwj.URL)
+	feedback.Info("Loading capabilities from %s", gwj.URL)
+	caps, err := wfs.GetCapabilities(gwj.URL)
 	if err != nil {
 		feedback.Error("Loading capabilities failed: %v", err)
 		return nil, err
 	}
 
-	ft := caps.FindFeatureType(pwj.FeatureType)
+	ft := caps.FindFeatureType(gwj.FeatureType)
 	if ft == nil {
-		return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType)
+		return nil, fmt.Errorf("unknown feature type '%s'", gwj.FeatureType)
 	}
 
-	feedback.Info("Found feature type '%s'", pwj.FeatureType)
+	feedback.Info("Found feature type '%s'", gwj.FeatureType)
 
 	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
 	if err != nil {
@@ -117,11 +118,11 @@
 		return nil, err
 	}
 
-	if nilString(pwj.SortBy) != "" {
-		feedback.Info("Features will be sorted by '%s'", *pwj.SortBy)
+	if nilString(gwj.SortBy) != "" {
+		feedback.Info("Features will be sorted by '%s'", *gwj.SortBy)
 	}
 
-	dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy))
+	dl, err := wfs.GetFeatures(caps, gwj.FeatureType, nilString(gwj.SortBy))
 	if err != nil {
 		feedback.Error("Cannot create GetFeature URLs. %v", err)
 		return nil, err
@@ -135,13 +136,13 @@
 		features          int
 	)
 
-	consumer, err := pwj.creator.newConsumer(ctx, conn, feedback)
+	consumer, err := gwj.creator.newConsumer(ctx, conn, feedback)
 	if err != nil {
 		return nil, err
 	}
 	defer consumer.Rollback()
 
-	if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error {
+	if err := dl.Download(nilString(gwj.User), nilString(gwj.Password), func(url string, r io.Reader) error {
 		feedback.Info("Get features from: '%s'", url)
 		rfc, err := wfs.ParseRawFeatureCollection(r)
 		if err != nil {
@@ -171,20 +172,19 @@
 				continue
 			}
 
-			props := consumer.NewProperties()
+			kind, geom, props := consumer.NewFeature()
+
 			if err := json.Unmarshal(*feature.Properties, props); err != nil {
 				badProperties++
 				continue
 			}
 
-			switch feature.Geometry.Type {
-			case "Point":
-				var p pointSlice
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
+			if feature.Geometry.Type == kind {
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
 					return err
 				}
 
-				err := consumer.Consume(p, props, epsg)
+				err := consumer.Consume(geom, props, epsg)
 				switch {
 				case err == ErrFeatureDuplicated:
 					dupes++
@@ -195,8 +195,7 @@
 				default:
 					features++
 				}
-
-			default:
+			} else {
 				unsupported[feature.Geometry.Type]++
 			}
 		}
@@ -243,37 +242,37 @@
 }
 
 type (
-	SQLPointConsumer struct {
-		ctx           context.Context
-		tx            *sql.Tx
-		feedback      Feedback
-		newProperties func() interface{}
-		consume       func(*SQLPointConsumer, pointSlice, interface{}, int) error
-		preCommit     func(*SQLPointConsumer) error
-		savepoint     func(func() error) error
-		stmts         []*sql.Stmt
+	SQLGeometryConsumer struct {
+		ctx        context.Context
+		tx         *sql.Tx
+		feedback   Feedback
+		consume    func(*SQLGeometryConsumer, interface{}, interface{}, int) error
+		newFeature func() (string, interface{}, interface{})
+		preCommit  func(*SQLGeometryConsumer) error
+		savepoint  func(func() error) error
+		stmts      []*sql.Stmt
 	}
 )
 
-func (spc *SQLPointConsumer) Rollback() error {
-	if tx := spc.tx; tx != nil {
-		spc.releaseStmts()
-		spc.tx = nil
-		spc.ctx = nil
+func (sgc *SQLGeometryConsumer) Rollback() error {
+	if tx := sgc.tx; tx != nil {
+		sgc.releaseStmts()
+		sgc.tx = nil
+		sgc.ctx = nil
 		return tx.Rollback()
 	}
 	return nil
 }
 
-func (spc *SQLPointConsumer) Commit() error {
+func (sgc *SQLGeometryConsumer) Commit() error {
 	var err error
-	if tx := spc.tx; tx != nil {
-		if spc.preCommit != nil {
-			err = spc.preCommit(spc)
+	if tx := sgc.tx; tx != nil {
+		if sgc.preCommit != nil {
+			err = sgc.preCommit(sgc)
 		}
-		spc.releaseStmts()
-		spc.tx = nil
-		spc.ctx = nil
+		sgc.releaseStmts()
+		sgc.tx = nil
+		sgc.ctx = nil
 		if err2 := tx.Commit(); err2 != nil {
 			// A real error on commit overrules the first.
 			err = err2
@@ -282,63 +281,70 @@
 	return err
 }
 
-func (spc *SQLPointConsumer) NewProperties() interface{} {
-	return spc.newProperties()
+func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) {
+	return sgc.newFeature()
 }
 
-func (spc *SQLPointConsumer) Consume(
-	points pointSlice,
+func (sgc *SQLGeometryConsumer) Consume(
+	geom, properties interface{},
+	epsg int,
+) error {
+	return sgc.consume(sgc, geom, properties, epsg)
+}
+
+func (sgc *SQLGeometryConsumer) ConsumePolygon(
+	polygon polygonSlice,
 	properties interface{},
 	epsg int,
 ) error {
-	return spc.consume(spc, points, properties, epsg)
+	return sgc.consume(sgc, polygon, properties, epsg)
 }
 
 func newSQLConsumer(
-	init func(*SQLPointConsumer) error,
-	consume func(*SQLPointConsumer, pointSlice, interface{}, int) error,
-	preCommit func(*SQLPointConsumer) error,
-	newProperties func() interface{},
+	init func(*SQLGeometryConsumer) error,
+	consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
+	preCommit func(*SQLGeometryConsumer) error,
+	newFeature func() (string, interface{}, interface{}),
 
-) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) {
-	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) {
+) func(context.Context, *sql.Conn, Feedback) (WFSGeometryConsumer, error) {
+	return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSGeometryConsumer, error) {
 		tx, err := conn.BeginTx(ctx, nil)
 		if err != nil {
 			return nil, err
 		}
-		spc := &SQLPointConsumer{
-			ctx:           ctx,
-			tx:            tx,
-			feedback:      feedback,
-			newProperties: newProperties,
-			consume:       consume,
-			preCommit:     preCommit,
-			savepoint:     Savepoint(ctx, tx, "feature"),
+		sgc := &SQLGeometryConsumer{
+			ctx:        ctx,
+			tx:         tx,
+			feedback:   feedback,
+			consume:    consume,
+			newFeature: newFeature,
+			preCommit:  preCommit,
+			savepoint:  Savepoint(ctx, tx, "feature"),
 		}
-		if err := init(spc); err != nil {
+		if err := init(sgc); err != nil {
 			tx.Rollback()
 			return nil, err
 		}
-		return spc, nil
+		return sgc, nil
 	}
 }
 
-func (spc *SQLPointConsumer) releaseStmts() {
-	for i := len(spc.stmts); i > 0; i-- {
-		spc.stmts[i-1].Close()
-		spc.stmts[i-1] = nil
+func (sgc *SQLGeometryConsumer) releaseStmts() {
+	for i := len(sgc.stmts); i > 0; i-- {
+		sgc.stmts[i-1].Close()
+		sgc.stmts[i-1] = nil
 	}
-	spc.stmts = nil
+	sgc.stmts = nil
 }
 
-func prepareStmnts(queries ...string) func(*SQLPointConsumer) error {
-	return func(spc *SQLPointConsumer) error {
+func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
+	return func(sgc *SQLGeometryConsumer) error {
 		for _, query := range queries {
-			stmt, err := spc.tx.PrepareContext(spc.ctx, query)
+			stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
 			if err != nil {
 				return err
 			}
-			spc.stmts = append(spc.stmts, stmt)
+			sgc.stmts = append(sgc.stmts, stmt)
 		}
 		return nil
 	}
--- a/pkg/imports/wkb.go	Tue Mar 03 15:33:04 2020 +0100
+++ b/pkg/imports/wkb.go	Tue Mar 03 18:05:33 2020 +0100
@@ -30,6 +30,12 @@
 	polygonSlice [][][]float64
 )
 
+func newPointSlice(newProperties func() interface{}) func() (string, interface{}, interface{}) {
+	return func() (string, interface{}, interface{}) {
+		return "Point", new(pointSlice), newProperties()
+	}
+}
+
 func (ls lineSlice) asWKB() []byte {
 
 	size := 1 + 4 + 4 + len(ls)*(2*8)