comparison pkg/imports/wp.go @ 2086:6096ec4951f8

Waterway profiles imports: Download the geometries from WPS and join them with the CSV data.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 31 Jan 2019 18:42:57 +0100
parents ddbac0f22ffb
children 5d3d2e823314
comparison
equal deleted inserted replaced
2085:bca8bda0b805 2086:6096ec4951f8
16 import ( 16 import (
17 "bufio" 17 "bufio"
18 "context" 18 "context"
19 "database/sql" 19 "database/sql"
20 "encoding/csv" 20 "encoding/csv"
21 "encoding/json"
22 "errors"
21 "fmt" 23 "fmt"
22 "io" 24 "io"
23 "os" 25 "os"
24 "path/filepath" 26 "path/filepath"
25 "strconv" 27 "strconv"
27 "time" 29 "time"
28 30
29 "gemma.intevation.de/gemma/pkg/common" 31 "gemma.intevation.de/gemma/pkg/common"
30 "gemma.intevation.de/gemma/pkg/misc" 32 "gemma.intevation.de/gemma/pkg/misc"
31 "gemma.intevation.de/gemma/pkg/models" 33 "gemma.intevation.de/gemma/pkg/models"
34 "gemma.intevation.de/gemma/pkg/wfs"
32 "github.com/jackc/pgx/pgtype" 35 "github.com/jackc/pgx/pgtype"
33 ) 36 )
34 37
35 type WaterwayProfiles struct { 38 type WaterwayProfiles struct {
36 Dir string `json:"dir"` 39 Dir string `json:"dir"`
40 // URL the GetCapabilities URL of the WFS service.
41 URL string `json:"url"`
42 // FeatureType selects the feature type of the WFS service.
43 FeatureType string `json:"feature-type"`
44 // SortBy works around misconfigured services to
45 // establish a sort order to get the features.
46 SortBy string `json:"sort-by"`
37 } 47 }
38 48
39 const WPJobKind JobKind = "wp" 49 const WPJobKind JobKind = "wp"
40 50
41 type wpJobCreator struct{} 51 type wpJobCreator struct{}
63 "waterway_profiles", 73 "waterway_profiles",
64 } 74 }
65 } 75 }
66 76
67 const ( 77 const (
78 createGeomTempTableSQL = `
79 CREATE TEMP TABLE wp_geoms (
80 geom geometry(linestring, %d)
81 ) ON COMMIT DROP`
82
83 createTempIndexSQL = `
84 CREATE INDEX ON wp_geoms USING GIST(geom)`
85
86 analyzeTempTableSQL = `ANALYZE wp_geoms`
87
88 insertGeomTmpTableSQL = `
89 INSERT INTO wp_geoms (geom) VALUES (
90 ST_Transform(ST_GeomFromWKB($1, $2::int), %d)
91 )`
92
68 insertWaterwayProfileSQL = ` 93 insertWaterwayProfileSQL = `
94 WITH point AS (
95 SELECT ST_Transform(geom::geometry, $14::int) geom
96 FROM waterway.distance_marks_virtual
97 WHERE location_code =
98 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
99 )
69 INSERT INTO waterway.waterway_profiles ( 100 INSERT INTO waterway.waterway_profiles (
70 location, 101 location,
102 geom,
71 validity, 103 validity,
72 lnwl, 104 lnwl,
73 mwl, 105 mwl,
74 hnwl, 106 hnwl,
75 fe30, 107 fe30,
76 fe100, 108 fe100,
77 date_info, 109 date_info,
78 source_organization 110 source_organization
79 ) VALUES ( 111 ) VALUES (
80 ($1, $2, $3, $4, $5), 112 ($1, $2, $3, $4, $5),
113 ( SELECT ST_Transform(geom, 4326)::geography
114 FROM wp_geoms
115 WHERE geom && ( SELECT geom from point ) AND
116 ST_Intersects(geom, ( SELECT geom FROM point ))
117 ),
81 $6, 118 $6,
82 $7, 119 $7,
83 $8, 120 $8,
84 $9, 121 $9,
85 $10, 122 $10,
133 if err != nil { 170 if err != nil {
134 return nil, err 171 return nil, err
135 } 172 }
136 defer tx.Rollback() 173 defer tx.Rollback()
137 174
138 // TODO: Download profile geometries from WFS. 175 epsg, err := wp.downloadGeometries(
139 176 ctx, importID, tx, start, feedback)
140 summary, err := wp.processCSV(ctx, importID, tx, start, feedback) 177 if err != nil {
178 return nil, fmt.Errorf("error downloading geometries: %v", err)
179 }
180
181 summary, err := wp.processCSV(
182 ctx, importID, tx, start, epsg, feedback)
141 if err != nil { 183 if err != nil {
142 return nil, fmt.Errorf("error processing CVS: %v", err) 184 return nil, fmt.Errorf("error processing CVS: %v", err)
143 } 185 }
144 186
145 if err := tx.Commit(); err != nil { 187 if err := tx.Commit(); err != nil {
149 } 191 }
150 192
151 feedback.Info("Importing waterway profiles took %s", 193 feedback.Info("Importing waterway profiles took %s",
152 time.Since(start)) 194 time.Since(start))
153 return summary, nil 195 return summary, nil
196 }
197
198 func (wp *WaterwayProfiles) downloadGeometries(
199 ctx context.Context,
200 importID int64,
201 tx *sql.Tx,
202 start time.Time,
203 feedback Feedback,
204 ) (int, error) {
205 feedback.Info("Start downloading geometries from WFS.")
206
207 feedback.Info("Loading capabilities from %s", wp.URL)
208 caps, err := wfs.GetCapabilities(wp.URL)
209 if err != nil {
210 feedback.Error("Loading capabilities failed: %v", err)
211 return 0, err
212 }
213
214 ft := caps.FindFeatureType(wp.FeatureType)
215 if ft == nil {
216 return 0, fmt.Errorf("Unknown feature type '%s'", wp.FeatureType)
217 }
218
219 feedback.Info("Found feature type '%s", wp.FeatureType)
220
221 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
222 if err != nil {
223 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
224 return 0, err
225 }
226
227 if wp.SortBy != "" {
228 feedback.Info("Features will be sorted by '%s'", wp.SortBy)
229 }
230
231 urls, err := wfs.GetFeaturesGET(
232 caps, wp.FeatureType, "application/json", wp.SortBy)
233 if err != nil {
234 feedback.Error("Cannot create GetFeature URLs. %v", err)
235 return 0, err
236 }
237
238 var (
239 unsupported = stringCounter{}
240 missingProperties int
241 features int
242 )
243
244 var insertStmt *sql.Stmt
245 defer func() {
246 if insertStmt != nil {
247 insertStmt.Close()
248 }
249 }()
250
251 var usedEPGS int
252
253 setup := func(epsg int) error {
254 if insertStmt != nil {
255 return nil
256 }
257 feedback.Info("Using EPSG: %d", epsg)
258 usedEPGS = epsg
259
260 tblSQL := fmt.Sprintf(createGeomTempTableSQL, epsg)
261 if _, err := tx.ExecContext(ctx, tblSQL); err != nil {
262 return err
263 }
264 if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil {
265 return err
266 }
267 insertSQL := fmt.Sprintf(insertGeomTmpTableSQL, epsg)
268 var err error
269 insertStmt, err = tx.PrepareContext(ctx, insertSQL)
270 return err
271 }
272
273 if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
274 rfc, err := wfs.ParseRawFeatureCollection(r)
275 if err != nil {
276 return fmt.Errorf("parsing GetFeature document failed: %v", err)
277 }
278 if rfc.CRS != nil {
279 crsName := rfc.CRS.Properties.Name
280 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
281 feedback.Error("Unsupported CRS: %d", crsName)
282 return err
283 }
284 }
285
286 // No features -> ignore.
287 if rfc.Features == nil {
288 return nil
289 }
290
291 if err := setup(epsg); err != nil {
292 return err
293 }
294
295 for _, feature := range rfc.Features {
296 if feature.Geometry.Coordinates == nil {
297 missingProperties++
298 continue
299 }
300
301 switch feature.Geometry.Type {
302 case "LineString":
303 var l lineSlice
304 if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
305 return err
306 }
307 if _, err := insertStmt.ExecContext(
308 ctx,
309 l.asWKB(),
310 epsg,
311 ); err != nil {
312 return err
313 }
314 features++
315 default:
316 unsupported[feature.Geometry.Type]++
317 }
318 }
319 return nil
320 }); err != nil {
321 feedback.Error("Downloading features failed: %v", err)
322 return 0, err
323 }
324
325 if missingProperties > 0 {
326 feedback.Warn("Missing properties: %d", missingProperties)
327 }
328
329 if len(unsupported) != 0 {
330 feedback.Warn("Unsupported types found: %s", unsupported)
331 }
332
333 if features == 0 {
334 return 0, errors.New("No features found")
335 }
336 if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil {
337 return 0, err
338 }
339 return usedEPGS, nil
154 } 340 }
155 341
156 func (wp *WaterwayProfiles) processCSV( 342 func (wp *WaterwayProfiles) processCSV(
157 ctx context.Context, 343 ctx context.Context,
158 importID int64, 344 importID int64,
159 tx *sql.Tx, 345 tx *sql.Tx,
160 start time.Time, 346 start time.Time,
347 epsg int,
161 feedback Feedback, 348 feedback Feedback,
162 ) (interface{}, error) { 349 ) (interface{}, error) {
350 feedback.Info("Start processing CSV file.")
163 351
164 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) 352 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
165 if err != nil { 353 if err != nil {
166 return nil, err 354 return nil, err
167 } 355 }
355 hnwl, 543 hnwl,
356 fe30, 544 fe30,
357 fe100, 545 fe100,
358 dateInfo, 546 dateInfo,
359 source, 547 source,
548 epsg,
360 ).Scan(&id); err != nil { 549 ).Scan(&id); err != nil {
361 return nil, err 550 return nil, err
362 } 551 }
363 552
364 if _, err := trackStmt.ExecContext( 553 if _, err := trackStmt.ExecContext(