# HG changeset patch # User Sascha L. Teichmann # Date 1583255133 -3600 # Node ID 4bf6cde2d996a1138ae435ecbdd88e7be0e75ab5 # Parent ff965141d085fe04a54b47d7bfa08f1bf6b50a79 Generalize WFS point import job to be able to import other geometry types, too. diff -r ff965141d085 -r 4bf6cde2d996 pkg/imports/fm.go --- a/pkg/imports/fm.go Tue Mar 03 15:33:04 2020 +0100 +++ b/pkg/imports/fm.go Tue Mar 03 18:05:33 2020 +0100 @@ -209,7 +209,7 @@ func init() { RegisterJobCreator(BCNLATHYDROJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks bcnlat (HYDRO)", depends: [2][]string{{"fairway_marks_bcnlat_hydro"}, {}}, newConsumer: newSQLConsumer( @@ -219,12 +219,12 @@ ), consume, createInvalidation("bcnlat_hydro"), - func() interface{} { return new(bcnlatHydroProperties) }, + newPointSlice(func() interface{} { return new(bcnlatHydroProperties) }), ), }) RegisterJobCreator(BCNLATIENCJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks bcnlat (IENC)", depends: [2][]string{{"fairway_marks_bcnlat_ienc"}, {}}, newConsumer: newSQLConsumer( @@ -235,12 +235,12 @@ ), consume, createInvalidation("bcnlat_ienc"), - func() interface{} { return new(bcnlatIencProperties) }, + newPointSlice(func() interface{} { return new(bcnlatIencProperties) }), ), }) RegisterJobCreator(BOYLATHYDROJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks boylat (HYDRO)", depends: [2][]string{{"fairway_marks_boylat_hydro"}, {}}, newConsumer: newSQLConsumer( @@ -251,12 +251,12 @@ ), consume, createInvalidation("boylat_hydro"), - func() interface{} { return new(boylatHydroProperties) }, + newPointSlice(func() interface{} { return new(boylatHydroProperties) }), ), }) RegisterJobCreator(BOYLATIENCJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks boylat (IENC)", depends: [2][]string{{"fairway_marks_boylat_ienc"}, {}}, newConsumer: newSQLConsumer( @@ -267,12 +267,12 @@ ), consume, createInvalidation("boylat_ienc"), - func() interface{} { return new(boylatIencProperties) }, + newPointSlice(func() interface{} { return new(boylatIencProperties) }), ), }) RegisterJobCreator(BOYCARJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks boycar", depends: [2][]string{{"fairway_marks_boycar"}, {}}, newConsumer: newSQLConsumer( @@ -283,12 +283,12 @@ ), consume, createInvalidation("boycar"), - func() interface{} { return new(boycarProperties) }, + newPointSlice(func() interface{} { return new(boycarProperties) }), ), }) RegisterJobCreator(BOYSAWJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks boysaw", depends: [2][]string{{"fairway_marks_boysaw"}, {}}, newConsumer: newSQLConsumer( @@ -298,12 +298,12 @@ ), consume, createInvalidation("boysaw"), - func() interface{} { return new(boysawProperties) }, + newPointSlice(func() interface{} { return new(boysawProperties) }), ), }) RegisterJobCreator(BOYSPPJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks boyspp", depends: [2][]string{{"fairway_marks_boyspp"}, {}}, newConsumer: newSQLConsumer( @@ -314,12 +314,12 @@ ), consume, createInvalidation("boyspp"), - func() interface{} { return new(boysppProperties) }, + newPointSlice(func() interface{} { return new(boysppProperties) }), ), }) RegisterJobCreator(DAYMARHYDROJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks daymar (HYDRO)", depends: [2][]string{{"fairway_marks_daymar_hydro"}, {}}, newConsumer: newSQLConsumer( @@ -329,12 +329,12 @@ ), consume, createInvalidation("daymar_hydro"), - func() interface{} { return new(daymarHydroProperties) }, + newPointSlice(func() interface{} { return new(daymarHydroProperties) }), ), }) RegisterJobCreator(DAYMARIENCJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks daymar (IENC)", depends: [2][]string{{"fairway_marks_daymar_ienc"}, {}}, newConsumer: newSQLConsumer( @@ -345,12 +345,12 @@ ), consume, createInvalidation("daymar_ienc"), - func() interface{} { return new(daymarIencProperties) }, + newPointSlice(func() interface{} { return new(daymarIencProperties) }), ), }) RegisterJobCreator(LIGHTSJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks lights", depends: [2][]string{{"fairway_marks_lights"}, {}}, newConsumer: newSQLConsumer( @@ -364,12 +364,12 @@ ), consume, createInvalidation("lights"), - func() interface{} { return new(lightsProperties) }, + newPointSlice(func() interface{} { return new(lightsProperties) }), ), }) RegisterJobCreator(NOTMRKJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks notmrk", depends: [2][]string{{"fairway_marks_lights"}, {}}, newConsumer: newSQLConsumer( @@ -383,12 +383,12 @@ ), consume, createInvalidation("notmrk"), - func() interface{} { return new(notmrkProperties) }, + newPointSlice(func() interface{} { return new(notmrkProperties) }), ), }) RegisterJobCreator(RTPBCNJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks rtpbcn", depends: [2][]string{{"fairway_marks_rtpbcn"}, {}}, newConsumer: newSQLConsumer( @@ -398,12 +398,12 @@ ), consume, createInvalidation("rtpbcn"), - func() interface{} { return new(rtpbcnProperties) }, + newPointSlice(func() interface{} { return new(rtpbcnProperties) }), ), }) RegisterJobCreator(TOPMARJobKind, - &PointWFSJobCreator{ + &GeometryWFSJobCreator{ description: "fairway marks topmar", depends: [2][]string{{"fairway_marks_topmar"}, {}}, newConsumer: newSQLConsumer( @@ -413,7 +413,7 @@ ), consume, createInvalidation("topmar"), - func() interface{} { return new(topmarProperties) }, + newPointSlice(func() interface{} { return new(topmarProperties) }), ), }) } @@ -509,11 +509,11 @@ ` ) -func createInvalidation(fmType string) func(*SQLPointConsumer) error { +func createInvalidation(fmType string) func(*SQLGeometryConsumer) error { invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType) - return func(spc *SQLPointConsumer) error { + return func(spc *SQLGeometryConsumer) error { res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL) if err != nil { return err @@ -545,7 +545,7 @@ ) } -func storeAttribs(spc *SQLPointConsumer, id int64, attrs *string) { +func storeAttribs(spc *SQLGeometryConsumer, id int64, attrs *string) { if attrs == nil || *attrs == "" { return } @@ -566,9 +566,8 @@ } func consume( - spc *SQLPointConsumer, - points pointSlice, - properties interface{}, + spc *SQLGeometryConsumer, + points, properties interface{}, epsg int, ) error { var fmid int64 @@ -577,7 +576,7 @@ spc.ctx, append( []interface{}{ - points.asWKB(), + points.(*pointSlice).asWKB(), epsg, }, structs.Values(properties)...)..., diff -r ff965141d085 -r 4bf6cde2d996 pkg/imports/pointwfs.go --- a/pkg/imports/pointwfs.go Tue Mar 03 15:33:04 2020 +0100 +++ b/pkg/imports/pointwfs.go Tue Mar 03 18:05:33 2020 +0100 @@ -34,59 +34,60 @@ ) type ( - WFSPointConsumer interface { + WFSGeometryConsumer interface { Commit() error Rollback() error - NewProperties() interface{} - Consume(points pointSlice, properties interface{}, epsg int) error + NewFeature() (string, interface{}, interface{}) + + Consume(geom, properties interface{}, epsg int) error } - PointWFSJobCreator struct { + GeometryWFSJobCreator struct { description string depends [2][]string - newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) + newConsumer func(context.Context, *sql.Conn, Feedback) (WFSGeometryConsumer, error) } - PointWFSJob struct { + GeometryWFSJob struct { models.WFSImport - creator *PointWFSJobCreator + creator *GeometryWFSJobCreator } ) -func (pwjc *PointWFSJobCreator) Description() string { - return pwjc.description +func (gwjc *GeometryWFSJobCreator) Description() string { + return gwjc.description } -func (pwjc *PointWFSJobCreator) Depends() [2][]string { - return pwjc.depends +func (gwjc *GeometryWFSJobCreator) Depends() [2][]string { + return gwjc.depends } -func (*PointWFSJobCreator) AutoAccept() bool { +func (*GeometryWFSJobCreator) AutoAccept() bool { return true } // StageDone is a NOP for WFS imports. -func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error { +func (*GeometryWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } -func (pwjc *PointWFSJobCreator) Create() Job { - return &PointWFSJob{creator: pwjc} +func (gwjc *GeometryWFSJobCreator) Create() Job { + return &GeometryWFSJob{creator: gwjc} } // Description gives a short info about relevant facts of this import. -func (pwj *PointWFSJob) Description() (string, error) { - return pwj.URL + "|" + pwj.FeatureType, nil +func (gwj *GeometryWFSJob) Description() (string, error) { + return gwj.URL + "|" + gwj.FeatureType, nil } // CleanUp for WFS imports is a NOP. -func (*PointWFSJob) CleanUp() error { +func (*GeometryWFSJob) CleanUp() error { return nil } -func (pwj *PointWFSJob) Do( +func (gwj *GeometryWFSJob) Do( ctx context.Context, importID int64, conn *sql.Conn, @@ -95,21 +96,21 @@ start := time.Now() - feedback.Info("Import %s", pwj.creator.Description()) + feedback.Info("Import %s", gwj.creator.Description()) - feedback.Info("Loading capabilities from %s", pwj.URL) - caps, err := wfs.GetCapabilities(pwj.URL) + feedback.Info("Loading capabilities from %s", gwj.URL) + caps, err := wfs.GetCapabilities(gwj.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } - ft := caps.FindFeatureType(pwj.FeatureType) + ft := caps.FindFeatureType(gwj.FeatureType) if ft == nil { - return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType) + return nil, fmt.Errorf("unknown feature type '%s'", gwj.FeatureType) } - feedback.Info("Found feature type '%s'", pwj.FeatureType) + feedback.Info("Found feature type '%s'", gwj.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { @@ -117,11 +118,11 @@ return nil, err } - if nilString(pwj.SortBy) != "" { - feedback.Info("Features will be sorted by '%s'", *pwj.SortBy) + if nilString(gwj.SortBy) != "" { + feedback.Info("Features will be sorted by '%s'", *gwj.SortBy) } - dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy)) + dl, err := wfs.GetFeatures(caps, gwj.FeatureType, nilString(gwj.SortBy)) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err @@ -135,13 +136,13 @@ features int ) - consumer, err := pwj.creator.newConsumer(ctx, conn, feedback) + consumer, err := gwj.creator.newConsumer(ctx, conn, feedback) if err != nil { return nil, err } defer consumer.Rollback() - if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error { + if err := dl.Download(nilString(gwj.User), nilString(gwj.Password), func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { @@ -171,20 +172,19 @@ continue } - props := consumer.NewProperties() + kind, geom, props := consumer.NewFeature() + if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } - switch feature.Geometry.Type { - case "Point": - var p pointSlice - if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { + if feature.Geometry.Type == kind { + if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil { return err } - err := consumer.Consume(p, props, epsg) + err := consumer.Consume(geom, props, epsg) switch { case err == ErrFeatureDuplicated: dupes++ @@ -195,8 +195,7 @@ default: features++ } - - default: + } else { unsupported[feature.Geometry.Type]++ } } @@ -243,37 +242,37 @@ } type ( - SQLPointConsumer struct { - ctx context.Context - tx *sql.Tx - feedback Feedback - newProperties func() interface{} - consume func(*SQLPointConsumer, pointSlice, interface{}, int) error - preCommit func(*SQLPointConsumer) error - savepoint func(func() error) error - stmts []*sql.Stmt + SQLGeometryConsumer struct { + ctx context.Context + tx *sql.Tx + feedback Feedback + consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error + newFeature func() (string, interface{}, interface{}) + preCommit func(*SQLGeometryConsumer) error + savepoint func(func() error) error + stmts []*sql.Stmt } ) -func (spc *SQLPointConsumer) Rollback() error { - if tx := spc.tx; tx != nil { - spc.releaseStmts() - spc.tx = nil - spc.ctx = nil +func (sgc *SQLGeometryConsumer) Rollback() error { + if tx := sgc.tx; tx != nil { + sgc.releaseStmts() + sgc.tx = nil + sgc.ctx = nil return tx.Rollback() } return nil } -func (spc *SQLPointConsumer) Commit() error { +func (sgc *SQLGeometryConsumer) Commit() error { var err error - if tx := spc.tx; tx != nil { - if spc.preCommit != nil { - err = spc.preCommit(spc) + if tx := sgc.tx; tx != nil { + if sgc.preCommit != nil { + err = sgc.preCommit(sgc) } - spc.releaseStmts() - spc.tx = nil - spc.ctx = nil + sgc.releaseStmts() + sgc.tx = nil + sgc.ctx = nil if err2 := tx.Commit(); err2 != nil { // A real error on commit overrules the first. err = err2 @@ -282,63 +281,70 @@ return err } -func (spc *SQLPointConsumer) NewProperties() interface{} { - return spc.newProperties() +func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) { + return sgc.newFeature() } -func (spc *SQLPointConsumer) Consume( - points pointSlice, +func (sgc *SQLGeometryConsumer) Consume( + geom, properties interface{}, + epsg int, +) error { + return sgc.consume(sgc, geom, properties, epsg) +} + +func (sgc *SQLGeometryConsumer) ConsumePolygon( + polygon polygonSlice, properties interface{}, epsg int, ) error { - return spc.consume(spc, points, properties, epsg) + return sgc.consume(sgc, polygon, properties, epsg) } func newSQLConsumer( - init func(*SQLPointConsumer) error, - consume func(*SQLPointConsumer, pointSlice, interface{}, int) error, - preCommit func(*SQLPointConsumer) error, - newProperties func() interface{}, + init func(*SQLGeometryConsumer) error, + consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error, + preCommit func(*SQLGeometryConsumer) error, + newFeature func() (string, interface{}, interface{}), -) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) { - return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) { +) func(context.Context, *sql.Conn, Feedback) (WFSGeometryConsumer, error) { + return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSGeometryConsumer, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } - spc := &SQLPointConsumer{ - ctx: ctx, - tx: tx, - feedback: feedback, - newProperties: newProperties, - consume: consume, - preCommit: preCommit, - savepoint: Savepoint(ctx, tx, "feature"), + sgc := &SQLGeometryConsumer{ + ctx: ctx, + tx: tx, + feedback: feedback, + consume: consume, + newFeature: newFeature, + preCommit: preCommit, + savepoint: Savepoint(ctx, tx, "feature"), } - if err := init(spc); err != nil { + if err := init(sgc); err != nil { tx.Rollback() return nil, err } - return spc, nil + return sgc, nil } } -func (spc *SQLPointConsumer) releaseStmts() { - for i := len(spc.stmts); i > 0; i-- { - spc.stmts[i-1].Close() - spc.stmts[i-1] = nil +func (sgc *SQLGeometryConsumer) releaseStmts() { + for i := len(sgc.stmts); i > 0; i-- { + sgc.stmts[i-1].Close() + sgc.stmts[i-1] = nil } - spc.stmts = nil + sgc.stmts = nil } -func prepareStmnts(queries ...string) func(*SQLPointConsumer) error { - return func(spc *SQLPointConsumer) error { +func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error { + return func(sgc *SQLGeometryConsumer) error { for _, query := range queries { - stmt, err := spc.tx.PrepareContext(spc.ctx, query) + stmt, err := sgc.tx.PrepareContext(sgc.ctx, query) if err != nil { return err } - spc.stmts = append(spc.stmts, stmt) + sgc.stmts = append(sgc.stmts, stmt) } return nil } diff -r ff965141d085 -r 4bf6cde2d996 pkg/imports/wkb.go --- a/pkg/imports/wkb.go Tue Mar 03 15:33:04 2020 +0100 +++ b/pkg/imports/wkb.go Tue Mar 03 18:05:33 2020 +0100 @@ -30,6 +30,12 @@ polygonSlice [][][]float64 ) +func newPointSlice(newProperties func() interface{}) func() (string, interface{}, interface{}) { + return func() (string, interface{}, interface{}) { + return "Point", new(pointSlice), newProperties() + } +} + func (ls lineSlice) asWKB() []byte { size := 1 + 4 + 4 + len(ls)*(2*8)