Mercurial > gemma
comparison pkg/imports/wp.go @ 2086:6096ec4951f8
Waterway profiles imports: Download the geometries from WPS and join them with the CSV data.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 31 Jan 2019 18:42:57 +0100 |
parents | ddbac0f22ffb |
children | 5d3d2e823314 |
comparison
equal
deleted
inserted
replaced
2085:bca8bda0b805 | 2086:6096ec4951f8 |
---|---|
16 import ( | 16 import ( |
17 "bufio" | 17 "bufio" |
18 "context" | 18 "context" |
19 "database/sql" | 19 "database/sql" |
20 "encoding/csv" | 20 "encoding/csv" |
21 "encoding/json" | |
22 "errors" | |
21 "fmt" | 23 "fmt" |
22 "io" | 24 "io" |
23 "os" | 25 "os" |
24 "path/filepath" | 26 "path/filepath" |
25 "strconv" | 27 "strconv" |
27 "time" | 29 "time" |
28 | 30 |
29 "gemma.intevation.de/gemma/pkg/common" | 31 "gemma.intevation.de/gemma/pkg/common" |
30 "gemma.intevation.de/gemma/pkg/misc" | 32 "gemma.intevation.de/gemma/pkg/misc" |
31 "gemma.intevation.de/gemma/pkg/models" | 33 "gemma.intevation.de/gemma/pkg/models" |
34 "gemma.intevation.de/gemma/pkg/wfs" | |
32 "github.com/jackc/pgx/pgtype" | 35 "github.com/jackc/pgx/pgtype" |
33 ) | 36 ) |
34 | 37 |
35 type WaterwayProfiles struct { | 38 type WaterwayProfiles struct { |
36 Dir string `json:"dir"` | 39 Dir string `json:"dir"` |
40 // URL the GetCapabilities URL of the WFS service. | |
41 URL string `json:"url"` | |
42 // FeatureType selects the feature type of the WFS service. | |
43 FeatureType string `json:"feature-type"` | |
44 // SortBy works around misconfigured services to | |
45 // establish a sort order to get the features. | |
46 SortBy string `json:"sort-by"` | |
37 } | 47 } |
38 | 48 |
39 const WPJobKind JobKind = "wp" | 49 const WPJobKind JobKind = "wp" |
40 | 50 |
41 type wpJobCreator struct{} | 51 type wpJobCreator struct{} |
63 "waterway_profiles", | 73 "waterway_profiles", |
64 } | 74 } |
65 } | 75 } |
66 | 76 |
67 const ( | 77 const ( |
78 createGeomTempTableSQL = ` | |
79 CREATE TEMP TABLE wp_geoms ( | |
80 geom geometry(linestring, %d) | |
81 ) ON COMMIT DROP` | |
82 | |
83 createTempIndexSQL = ` | |
84 CREATE INDEX ON wp_geoms USING GIST(geom)` | |
85 | |
86 analyzeTempTableSQL = `ANALYZE wp_geoms` | |
87 | |
88 insertGeomTmpTableSQL = ` | |
89 INSERT INTO wp_geoms (geom) VALUES ( | |
90 ST_Transform(ST_GeomFromWKB($1, $2::int), %d) | |
91 )` | |
92 | |
68 insertWaterwayProfileSQL = ` | 93 insertWaterwayProfileSQL = ` |
94 WITH point AS ( | |
95 SELECT ST_Transform(geom::geometry, $14::int) geom | |
96 FROM waterway.distance_marks_virtual | |
97 WHERE location_code = | |
98 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) | |
99 ) | |
69 INSERT INTO waterway.waterway_profiles ( | 100 INSERT INTO waterway.waterway_profiles ( |
70 location, | 101 location, |
102 geom, | |
71 validity, | 103 validity, |
72 lnwl, | 104 lnwl, |
73 mwl, | 105 mwl, |
74 hnwl, | 106 hnwl, |
75 fe30, | 107 fe30, |
76 fe100, | 108 fe100, |
77 date_info, | 109 date_info, |
78 source_organization | 110 source_organization |
79 ) VALUES ( | 111 ) VALUES ( |
80 ($1, $2, $3, $4, $5), | 112 ($1, $2, $3, $4, $5), |
113 ( SELECT ST_Transform(geom, 4326)::geography | |
114 FROM wp_geoms | |
115 WHERE geom && ( SELECT geom from point ) AND | |
116 ST_Intersects(geom, ( SELECT geom FROM point )) | |
117 ), | |
81 $6, | 118 $6, |
82 $7, | 119 $7, |
83 $8, | 120 $8, |
84 $9, | 121 $9, |
85 $10, | 122 $10, |
133 if err != nil { | 170 if err != nil { |
134 return nil, err | 171 return nil, err |
135 } | 172 } |
136 defer tx.Rollback() | 173 defer tx.Rollback() |
137 | 174 |
138 // TODO: Download profile geometries from WFS. | 175 epsg, err := wp.downloadGeometries( |
139 | 176 ctx, importID, tx, start, feedback) |
140 summary, err := wp.processCSV(ctx, importID, tx, start, feedback) | 177 if err != nil { |
178 return nil, fmt.Errorf("error downloading geometries: %v", err) | |
179 } | |
180 | |
181 summary, err := wp.processCSV( | |
182 ctx, importID, tx, start, epsg, feedback) | |
141 if err != nil { | 183 if err != nil { |
142 return nil, fmt.Errorf("error processing CVS: %v", err) | 184 return nil, fmt.Errorf("error processing CVS: %v", err) |
143 } | 185 } |
144 | 186 |
145 if err := tx.Commit(); err != nil { | 187 if err := tx.Commit(); err != nil { |
149 } | 191 } |
150 | 192 |
151 feedback.Info("Importing waterway profiles took %s", | 193 feedback.Info("Importing waterway profiles took %s", |
152 time.Since(start)) | 194 time.Since(start)) |
153 return summary, nil | 195 return summary, nil |
196 } | |
197 | |
198 func (wp *WaterwayProfiles) downloadGeometries( | |
199 ctx context.Context, | |
200 importID int64, | |
201 tx *sql.Tx, | |
202 start time.Time, | |
203 feedback Feedback, | |
204 ) (int, error) { | |
205 feedback.Info("Start downloading geometries from WFS.") | |
206 | |
207 feedback.Info("Loading capabilities from %s", wp.URL) | |
208 caps, err := wfs.GetCapabilities(wp.URL) | |
209 if err != nil { | |
210 feedback.Error("Loading capabilities failed: %v", err) | |
211 return 0, err | |
212 } | |
213 | |
214 ft := caps.FindFeatureType(wp.FeatureType) | |
215 if ft == nil { | |
216 return 0, fmt.Errorf("Unknown feature type '%s'", wp.FeatureType) | |
217 } | |
218 | |
219 feedback.Info("Found feature type '%s", wp.FeatureType) | |
220 | |
221 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) | |
222 if err != nil { | |
223 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) | |
224 return 0, err | |
225 } | |
226 | |
227 if wp.SortBy != "" { | |
228 feedback.Info("Features will be sorted by '%s'", wp.SortBy) | |
229 } | |
230 | |
231 urls, err := wfs.GetFeaturesGET( | |
232 caps, wp.FeatureType, "application/json", wp.SortBy) | |
233 if err != nil { | |
234 feedback.Error("Cannot create GetFeature URLs. %v", err) | |
235 return 0, err | |
236 } | |
237 | |
238 var ( | |
239 unsupported = stringCounter{} | |
240 missingProperties int | |
241 features int | |
242 ) | |
243 | |
244 var insertStmt *sql.Stmt | |
245 defer func() { | |
246 if insertStmt != nil { | |
247 insertStmt.Close() | |
248 } | |
249 }() | |
250 | |
251 var usedEPGS int | |
252 | |
253 setup := func(epsg int) error { | |
254 if insertStmt != nil { | |
255 return nil | |
256 } | |
257 feedback.Info("Using EPSG: %d", epsg) | |
258 usedEPGS = epsg | |
259 | |
260 tblSQL := fmt.Sprintf(createGeomTempTableSQL, epsg) | |
261 if _, err := tx.ExecContext(ctx, tblSQL); err != nil { | |
262 return err | |
263 } | |
264 if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil { | |
265 return err | |
266 } | |
267 insertSQL := fmt.Sprintf(insertGeomTmpTableSQL, epsg) | |
268 var err error | |
269 insertStmt, err = tx.PrepareContext(ctx, insertSQL) | |
270 return err | |
271 } | |
272 | |
273 if err := wfs.DownloadURLs(urls, func(r io.Reader) error { | |
274 rfc, err := wfs.ParseRawFeatureCollection(r) | |
275 if err != nil { | |
276 return fmt.Errorf("parsing GetFeature document failed: %v", err) | |
277 } | |
278 if rfc.CRS != nil { | |
279 crsName := rfc.CRS.Properties.Name | |
280 if epsg, err = wfs.CRSToEPSG(crsName); err != nil { | |
281 feedback.Error("Unsupported CRS: %d", crsName) | |
282 return err | |
283 } | |
284 } | |
285 | |
286 // No features -> ignore. | |
287 if rfc.Features == nil { | |
288 return nil | |
289 } | |
290 | |
291 if err := setup(epsg); err != nil { | |
292 return err | |
293 } | |
294 | |
295 for _, feature := range rfc.Features { | |
296 if feature.Geometry.Coordinates == nil { | |
297 missingProperties++ | |
298 continue | |
299 } | |
300 | |
301 switch feature.Geometry.Type { | |
302 case "LineString": | |
303 var l lineSlice | |
304 if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { | |
305 return err | |
306 } | |
307 if _, err := insertStmt.ExecContext( | |
308 ctx, | |
309 l.asWKB(), | |
310 epsg, | |
311 ); err != nil { | |
312 return err | |
313 } | |
314 features++ | |
315 default: | |
316 unsupported[feature.Geometry.Type]++ | |
317 } | |
318 } | |
319 return nil | |
320 }); err != nil { | |
321 feedback.Error("Downloading features failed: %v", err) | |
322 return 0, err | |
323 } | |
324 | |
325 if missingProperties > 0 { | |
326 feedback.Warn("Missing properties: %d", missingProperties) | |
327 } | |
328 | |
329 if len(unsupported) != 0 { | |
330 feedback.Warn("Unsupported types found: %s", unsupported) | |
331 } | |
332 | |
333 if features == 0 { | |
334 return 0, errors.New("No features found") | |
335 } | |
336 if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil { | |
337 return 0, err | |
338 } | |
339 return usedEPGS, nil | |
154 } | 340 } |
155 | 341 |
156 func (wp *WaterwayProfiles) processCSV( | 342 func (wp *WaterwayProfiles) processCSV( |
157 ctx context.Context, | 343 ctx context.Context, |
158 importID int64, | 344 importID int64, |
159 tx *sql.Tx, | 345 tx *sql.Tx, |
160 start time.Time, | 346 start time.Time, |
347 epsg int, | |
161 feedback Feedback, | 348 feedback Feedback, |
162 ) (interface{}, error) { | 349 ) (interface{}, error) { |
350 feedback.Info("Start processing CSV file.") | |
163 | 351 |
164 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) | 352 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) |
165 if err != nil { | 353 if err != nil { |
166 return nil, err | 354 return nil, err |
167 } | 355 } |
355 hnwl, | 543 hnwl, |
356 fe30, | 544 fe30, |
357 fe100, | 545 fe100, |
358 dateInfo, | 546 dateInfo, |
359 source, | 547 source, |
548 epsg, | |
360 ).Scan(&id); err != nil { | 549 ).Scan(&id); err != nil { |
361 return nil, err | 550 return nil, err |
362 } | 551 } |
363 | 552 |
364 if _, err := trackStmt.ExecContext( | 553 if _, err := trackStmt.ExecContext( |