# HG changeset patch # User Sascha L. Teichmann # Date 1547319211 -3600 # Node ID 614c6c766691aff676daa5106627b42f71edcc7c # Parent 724758455a4e44d25b096f00bcb353ceda0e0d57 Waterway area import: Implemented. diff -r 724758455a4e -r 614c6c766691 pkg/controllers/manualimports.go --- a/pkg/controllers/manualimports.go Fri Jan 11 19:46:21 2019 +0100 +++ b/pkg/controllers/manualimports.go Sat Jan 12 19:53:31 2019 +0100 @@ -76,6 +76,17 @@ return wx, due, retries, wxi.SendEmail } +func importWaterwayArea(input interface{}) (interface{}, time.Time, int, bool) { + wai := input.(*models.WaterwayAreaImport) + wa := &imports.WaterwayArea{ + URL: wai.URL, + FeatureType: wai.FeatureType, + SortBy: wai.SortBy, + } + due, retries := retry(wai.Attributes) + return wa, due, retries, wai.SendEmail +} + func manualImport( kind imports.JobKind, setup func(interface{}) (interface{}, time.Time, int, bool), diff -r 724758455a4e -r 614c6c766691 pkg/controllers/routes.go --- a/pkg/controllers/routes.go Fri Jan 11 19:46:21 2019 +0100 +++ b/pkg/controllers/routes.go Sat Jan 12 19:53:31 2019 +0100 @@ -199,6 +199,12 @@ NoConn: true, })).Methods(http.MethodPost) + api.Handle("/imports/waterwayarea", waterwayAdmin(&JSONHandler{ + Input: func() interface{} { return new(models.WaterwayAreaImport) }, + Handle: manualImport(imports.WAJobKind, importWaterwayArea), + NoConn: true, + })).Methods(http.MethodPost) + // Import scheduler configuration api.Handle("/imports/config/{id:[0-9]+}/run", waterwayAdmin(&JSONHandler{ diff -r 724758455a4e -r 614c6c766691 pkg/imports/misc.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/misc.go Sat Jan 12 19:53:31 2019 +0100 @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package imports + +import ( + "fmt" + "strings" +) + +type stringCounter map[string]int + +func (sc stringCounter) String() string { + var b strings.Builder + for t, c := range sc { + if b.Len() > 0 { + b.WriteString(", ") + } + b.WriteString(fmt.Sprintf("%s: %d", t, c)) + } + return b.String() +} diff -r 724758455a4e -r 614c6c766691 pkg/imports/polygon.go --- a/pkg/imports/polygon.go Fri Jan 11 19:46:21 2019 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,98 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2018 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Sascha L. Teichmann - -package imports - -import ( - "bytes" - "encoding/binary" - "fmt" - "math" - - shp "github.com/jonas-p/go-shp" -) - -type ( - point struct { - X float64 - Y float64 - } - lineString []point - polygon []lineString -) - -const ( - wkbNDR byte = 1 - wkbPolygon uint32 = 3 -) - -func shapeToPolygon(s shp.Shape) (polygon, error) { - switch p := s.(type) { - case *shp.Polygon: - return toPolygon(p.NumParts, p.Parts, p.Points), nil - case *shp.PolygonZ: - return toPolygon(p.NumParts, p.Parts, p.Points), nil - case *shp.PolygonM: - return toPolygon(p.NumParts, p.Parts, p.Points), nil - } - return nil, fmt.Errorf("Unsupported shape type %T", s) -} - -func toPolygon(numParts int32, parts []int32, points []shp.Point) polygon { - out := make(polygon, numParts) - var pos int32 - - for i := range out { - var howMany int32 - if i+1 >= len(parts) { - howMany = int32(len(points)) - pos - } else { - howMany = parts[i+1] - parts[i] - } - - line := make(lineString, howMany) - for j := int32(0); j < howMany; j, pos = j+1, pos+1 { - p := &points[pos] - line[j] = point{p.X, p.Y} - } - out[i] = line - } - return out -} - -func (p polygon) asWKB() []byte { - if p == nil { - return nil - } - // pre-calculate size to avoid reallocations. - size := 1 + 4 + 4 - for _, ring := range p { - size += 4 + len(ring)*2*8 - } - - buf := bytes.NewBuffer(make([]byte, 0, size)) - - binary.Write(buf, binary.LittleEndian, wkbNDR) - binary.Write(buf, binary.LittleEndian, wkbPolygon) - binary.Write(buf, binary.LittleEndian, uint32(len(p))) - - for _, ring := range p { - binary.Write(buf, binary.LittleEndian, uint32(len(ring))) - for _, v := range ring { - binary.Write(buf, binary.LittleEndian, math.Float64bits(v.X)) - binary.Write(buf, binary.LittleEndian, math.Float64bits(v.Y)) - } - } - - return buf.Bytes() -} diff -r 724758455a4e -r 614c6c766691 pkg/imports/scheduled.go --- a/pkg/imports/scheduled.go Fri Jan 11 19:46:21 2019 +0100 +++ b/pkg/imports/scheduled.go Sat Jan 12 19:53:31 2019 +0100 @@ -70,6 +70,23 @@ SortBy: sb, }, nil }, + + WAJobKind: func(cfg *IDConfig) (interface{}, error) { + log.Println("info: schedule 'wa' import") + ft, found := cfg.Attributes.Get("feature-type") + if !found { + return nil, errors.New("cannot find 'feature-type' attribute") + } + sb, found := cfg.Attributes.Get("sort-by") + if !found { + return nil, errors.New("cannot find 'sort-by' attribute") + } + return &WaterwayArea{ + URL: *cfg.URL, + FeatureType: ft, + SortBy: sb, + }, nil + }, } func init() { diff -r 724758455a4e -r 614c6c766691 pkg/imports/wa.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/wa.go Sat Jan 12 19:53:31 2019 +0100 @@ -0,0 +1,257 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package imports + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "strconv" + "time" + + "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/wfs" +) + +// WaterwayArea is an import job to import +// the waterway area in form of polygon geometries +// and attribute data from a WFS service. +type WaterwayArea struct { + // URL the GetCapabilities URL of the WFS service. + URL string `json:"url"` + // FeatureType selects the feature type of the WFS service. + FeatureType string `json:"feature-type"` + // SortBy works around misconfigured services to + // establish a sort order to get the features. + SortBy string `json:"sort-by"` +} + +// WAJobKind is the import queue type identifier. +const WAJobKind JobKind = "wa" + +type waJobCreator struct{} + +func init() { + RegisterJobCreator(WAJobKind, waJobCreator{}) +} + +func (waJobCreator) Description() string { return "waterway area" } + +func (waJobCreator) AutoAccept() bool { return true } + +func (waJobCreator) Create(_ JobKind, data string) (Job, error) { + wa := new(WaterwayArea) + if err := common.FromJSONString(data, wa); err != nil { + return nil, err + } + return wa, nil +} + +func (waJobCreator) Depends() []string { + return []string{ + "waterway_area", + } +} + +// StageDone is a NOP for waterway area imports. +func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +// CleanUp for waterway area imports is a NOP. +func (*WaterwayArea) CleanUp() error { return nil } + +type waterwayAreaProperties struct { + Catccl *string `json:"ienc_catccl"` + Dirimp *string `json:"ienc_dirimp"` +} + +const ( + deleteWaterwayAreaSQL = `DELETE FROM waterway.waterway_area` + insertWaterwayAreaSQL = ` +INSERT INTO waterway.waterway_area (area, catccl, dirimp) +VALUES ( + ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography, + $3, + $4 +)` +) + +// Do executes the actual waterway axis import. +func (wx *WaterwayArea) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + + feedback.Info("Import waterway area") + + feedback.Info("Loading capabilities from %s", wx.URL) + caps, err := wfs.GetCapabilities(wx.URL) + if err != nil { + feedback.Error("Loading capabilities failed: %v", err) + return nil, err + } + + ft := caps.FindFeatureType(wx.FeatureType) + if ft == nil { + return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) + } + + feedback.Info("Found feature type '%s", wx.FeatureType) + + epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + if err != nil { + feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) + return nil, err + } + + if wx.SortBy != "" { + feedback.Info("Features will be sorted by '%s'", wx.SortBy) + } + + urls, err := wfs.GetFeaturesGET( + caps, wx.FeatureType, "application/json", wx.SortBy) + if err != nil { + feedback.Error("Cannot create GetFeature URLs. %v", err) + return nil, err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + // Delete the old features. + if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil { + return nil, err + } + + var ( + unsupported = stringCounter{} + missingProperties int + badProperties int + features int + ) + + if err := wfs.DownloadURLs(urls, func(r io.Reader) error { + rfc, err := wfs.ParseRawFeatureCollection(r) + if err != nil { + return fmt.Errorf("parsing GetFeature document failed: %v", err) + } + if rfc.CRS != nil { + crsName := rfc.CRS.Properties.Name + if epsg, err = wfs.CRSToEPSG(crsName); err != nil { + feedback.Error("Unsupported CRS: %d", crsName) + return err + } + } + + // No features -> ignore. + if rfc.Features == nil { + return nil + } + + feedback.Info("Using EPSG: %d", epsg) + + for _, feature := range rfc.Features { + if feature.Properties == nil || feature.Geometry.Coordinates == nil { + missingProperties++ + continue + } + + var props waterwayAreaProperties + + if err := json.Unmarshal(*feature.Properties, &props); err != nil { + badProperties++ + continue + } + + var catccl sql.NullInt64 + if props.Catccl != nil { + if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil { + catccl = sql.NullInt64{Int64: value, Valid: true} + } + } + var dirimp sql.NullInt64 + if props.Dirimp != nil { + if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil { + dirimp = sql.NullInt64{Int64: value, Valid: true} + } + } + + switch feature.Geometry.Type { + case "Polygon": + var p polygon + if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { + return err + } + if _, err := insertStmt.ExecContext( + ctx, + p.asWKB(), + epsg, + catccl, + dirimp, + ); err != nil { + return err + } + features++ + default: + unsupported[feature.Geometry.Type]++ + } + } + return nil + }); err != nil { + feedback.Error("Downloading features failed: %v", err) + return nil, err + } + + if features == 0 { + err := errors.New("No features found") + feedback.Error("%v", err) + return nil, err + } + + if badProperties > 0 { + feedback.Warn("Bad properties: %d", badProperties) + } + + if missingProperties > 0 { + feedback.Warn("Missing properties: %d", missingProperties) + } + + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) + } + + if err = tx.Commit(); err == nil { + feedback.Info("Storing %d features took %s", + features, time.Since(start)) + } + + return nil, err +} diff -r 724758455a4e -r 614c6c766691 pkg/imports/wkb.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/wkb.go Sat Jan 12 19:53:31 2019 +0100 @@ -0,0 +1,162 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package imports + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + + shp "github.com/jonas-p/go-shp" +) + +type ( + lineSlice [][]float64 + polygonSlice [][][]float64 + + point struct { + X float64 + Y float64 + } + lineString []point + polygon []lineString +) + +const ( + wkbNDR byte = 1 + + wkbLineString uint32 = 2 + wkbPolygon uint32 = 3 +) + +func (l lineSlice) asWKB() []byte { + + size := 1 + 4 + 4 + len(l)*(2*8) + + buf := bytes.NewBuffer(make([]byte, 0, size)) + + binary.Write(buf, binary.LittleEndian, wkbNDR) + binary.Write(buf, binary.LittleEndian, wkbLineString) + binary.Write(buf, binary.LittleEndian, uint32(len(l))) + + for _, c := range l { + var lat, lon float64 + if len(c) > 0 { + lat = c[0] + } + if len(c) > 1 { + lon = c[1] + } + binary.Write(buf, binary.LittleEndian, math.Float64bits(lat)) + binary.Write(buf, binary.LittleEndian, math.Float64bits(lon)) + } + + return buf.Bytes() +} + +func (p polygonSlice) asWKB() []byte { + if p == nil { + return nil + } + // pre-calculate size to avoid reallocations. + size := 1 + 4 + 4 + for _, ring := range p { + size += 4 + len(ring)*2*8 + } + + buf := bytes.NewBuffer(make([]byte, 0, size)) + + binary.Write(buf, binary.LittleEndian, wkbNDR) + binary.Write(buf, binary.LittleEndian, wkbPolygon) + binary.Write(buf, binary.LittleEndian, uint32(len(p))) + + for _, ring := range p { + binary.Write(buf, binary.LittleEndian, uint32(len(ring))) + for _, v := range ring { + var lat, lon float64 + if len(v) > 0 { + lat = v[0] + } + if len(v) > 1 { + lon = v[1] + } + binary.Write(buf, binary.LittleEndian, math.Float64bits(lat)) + binary.Write(buf, binary.LittleEndian, math.Float64bits(lon)) + } + } + + return buf.Bytes() +} + +func shapeToPolygon(s shp.Shape) (polygon, error) { + switch p := s.(type) { + case *shp.Polygon: + return toPolygon(p.NumParts, p.Parts, p.Points), nil + case *shp.PolygonZ: + return toPolygon(p.NumParts, p.Parts, p.Points), nil + case *shp.PolygonM: + return toPolygon(p.NumParts, p.Parts, p.Points), nil + } + return nil, fmt.Errorf("Unsupported shape type %T", s) +} + +func toPolygon(numParts int32, parts []int32, points []shp.Point) polygon { + out := make(polygon, numParts) + var pos int32 + + for i := range out { + var howMany int32 + if i+1 >= len(parts) { + howMany = int32(len(points)) - pos + } else { + howMany = parts[i+1] - parts[i] + } + + line := make(lineString, howMany) + for j := int32(0); j < howMany; j, pos = j+1, pos+1 { + p := &points[pos] + line[j] = point{p.X, p.Y} + } + out[i] = line + } + return out +} + +func (p polygon) asWKB() []byte { + if p == nil { + return nil + } + // pre-calculate size to avoid reallocations. + size := 1 + 4 + 4 + for _, ring := range p { + size += 4 + len(ring)*2*8 + } + + buf := bytes.NewBuffer(make([]byte, 0, size)) + + binary.Write(buf, binary.LittleEndian, wkbNDR) + binary.Write(buf, binary.LittleEndian, wkbPolygon) + binary.Write(buf, binary.LittleEndian, uint32(len(p))) + + for _, ring := range p { + binary.Write(buf, binary.LittleEndian, uint32(len(ring))) + for _, v := range ring { + binary.Write(buf, binary.LittleEndian, math.Float64bits(v.X)) + binary.Write(buf, binary.LittleEndian, math.Float64bits(v.Y)) + } + } + + return buf.Bytes() +} diff -r 724758455a4e -r 614c6c766691 pkg/imports/wx.go --- a/pkg/imports/wx.go Fri Jan 11 19:46:21 2019 +0100 +++ b/pkg/imports/wx.go Sat Jan 12 19:53:31 2019 +0100 @@ -14,16 +14,12 @@ package imports import ( - "bytes" "context" "database/sql" - "encoding/binary" "encoding/json" "errors" "fmt" "io" - "math" - "strings" "time" "gemma.intevation.de/gemma/pkg/common" @@ -75,7 +71,7 @@ return nil } -// CleanUp for waterway imports is a NOP. +// CleanUp for waterway axis imports is a NOP. func (*WaterwayAxis) CleanUp() error { return nil } type waterwayAxisProperties struct { @@ -83,35 +79,6 @@ NObjNnm *string `json:"hydro_nobjnm"` } -type line [][]float64 - -const wkbLineString uint32 = 2 - -func (l line) asWKB() []byte { - - size := 1 + 4 + 4 + len(l)*(2*8) - - buf := bytes.NewBuffer(make([]byte, 0, size)) - - binary.Write(buf, binary.LittleEndian, wkbNDR) - binary.Write(buf, binary.LittleEndian, wkbLineString) - binary.Write(buf, binary.LittleEndian, uint32(len(l))) - - for _, c := range l { - var lat, lon float64 - if len(c) > 0 { - lat = c[0] - } - if len(c) > 1 { - lon = c[1] - } - binary.Write(buf, binary.LittleEndian, math.Float64bits(lat)) - binary.Write(buf, binary.LittleEndian, math.Float64bits(lon)) - } - - return buf.Bytes() -} - const ( deleteWaterwayAxisSQL = `DELETE FROM waterway.waterway_axis` insertWaterwayAxisSQL = ` @@ -123,7 +90,7 @@ )` ) -// Do executes the actual waterway exis import. +// Do executes the actual waterway axis import. func (wx *WaterwayAxis) Do( ctx context.Context, importID int64, @@ -184,7 +151,7 @@ } var ( - unsupportedTypes = map[string]int{} + unsupported = stringCounter{} missingProperties int badProperties int features int @@ -230,7 +197,7 @@ switch feature.Geometry.Type { case "LineString": - var l line + var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } @@ -245,7 +212,7 @@ } features++ case "MultiLineString": - var ls []line + var ls []lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil { return err } @@ -262,7 +229,7 @@ features++ } default: - unsupportedTypes[feature.Geometry.Type]++ + unsupported[feature.Geometry.Type]++ } } return nil @@ -285,15 +252,8 @@ feedback.Warn("Missing properties: %d", missingProperties) } - if len(unsupportedTypes) != 0 { - var b strings.Builder - for t, c := range unsupportedTypes { - if b.Len() > 0 { - b.WriteString(", ") - } - b.WriteString(fmt.Sprintf("%s: %d", t, c)) - } - feedback.Warn("Unsupported types found: %s", b.String()) + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) } if err = tx.Commit(); err == nil { diff -r 724758455a4e -r 614c6c766691 pkg/models/waterway.go --- a/pkg/models/waterway.go Fri Jan 11 19:46:21 2019 +0100 +++ b/pkg/models/waterway.go Sat Jan 12 19:53:31 2019 +0100 @@ -15,17 +15,34 @@ import "gemma.intevation.de/gemma/pkg/common" -// WaterwayAxisImport specifies an import of the waterway axis. -type WaterwayAxisImport struct { - // URL is the capabilities URL of the WFS. - URL string `json:"url"` - // FeatureType is the layer to use. - FeatureType string `json:"feature-type"` - // SortBy sorts the feature by this key. - SortBy string `json:"sort-by"` - // SendEmail is set to true if an email should be send after - // importing the axis. - SendEmail bool `json:"send-email"` - // Attributes are optional attributes. - Attributes common.Attributes `json:"attributes,omitempty"` -} +type ( + // WaterwayAxisImport specifies an import of the waterway axis. + WaterwayAxisImport struct { + // URL is the capabilities URL of the WFS. + URL string `json:"url"` + // FeatureType is the layer to use. + FeatureType string `json:"feature-type"` + // SortBy sorts the feature by this key. + SortBy string `json:"sort-by"` + // SendEmail is set to true if an email should be send after + // importing the axis. + SendEmail bool `json:"send-email"` + // Attributes are optional attributes. + Attributes common.Attributes `json:"attributes,omitempty"` + } + + // WaterwayAreaImport specifies an import of the waterway area. + WaterwayAreaImport struct { + // URL is the capabilities URL of the WFS. + URL string `json:"url"` + // FeatureType is the layer to use. + FeatureType string `json:"feature-type"` + // SortBy sorts the feature by this key. + SortBy string `json:"sort-by"` + // SendEmail is set to true if an email should be send after + // importing the axis. + SendEmail bool `json:"send-email"` + // Attributes are optional attributes. + Attributes common.Attributes `json:"attributes,omitempty"` + } +)