Mercurial > gemma
comparison pkg/imports/wfsjob.go @ 4979:ee12730acd73
Moved generalized WFS feature import to a better named file.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 03 Mar 2020 18:13:57 +0100 |
parents | pkg/imports/pointwfs.go@35a3dc12050f |
children | 21d2acc080c9 |
comparison
equal
deleted
inserted
replaced
4978:35a3dc12050f | 4979:ee12730acd73 |
---|---|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 | |
2 // without warranty, see README.md and license for details. | |
3 // | |
4 // SPDX-License-Identifier: AGPL-3.0-or-later | |
5 // License-Filename: LICENSES/AGPL-3.0.txt | |
6 // | |
7 // Copyright (C) 2020 by via donau | |
8 // – Österreichische Wasserstraßen-Gesellschaft mbH | |
9 // Software engineering by Intevation GmbH | |
10 // | |
11 // Author(s): | |
12 // * Tom Gottfried <tom.gottfried@intevation.de> | |
13 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | |
14 | |
15 package imports | |
16 | |
17 import ( | |
18 "context" | |
19 "database/sql" | |
20 "encoding/json" | |
21 "errors" | |
22 "fmt" | |
23 "io" | |
24 "time" | |
25 | |
26 "gemma.intevation.de/gemma/pkg/models" | |
27 "gemma.intevation.de/gemma/pkg/wfs" | |
28 ) | |
29 | |
30 var ( | |
31 ErrFeatureIgnored = errors.New("feature ignored") | |
32 ErrFeatureDuplicated = errors.New("feature duplicated") | |
33 ErrFeaturesUnmodified = errors.New("features unmodified") | |
34 ) | |
35 | |
36 type ( | |
37 WFSFeatureConsumer interface { | |
38 Commit() error | |
39 Rollback() error | |
40 | |
41 NewFeature() (kind string, geom interface{}, properties interface{}) | |
42 | |
43 Consume(geom, properties interface{}, epsg int) error | |
44 } | |
45 | |
46 WFSFeatureJobCreator struct { | |
47 description string | |
48 depends [2][]string | |
49 | |
50 newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) | |
51 } | |
52 | |
53 WFSFeatureJob struct { | |
54 models.WFSImport | |
55 creator *WFSFeatureJobCreator | |
56 } | |
57 ) | |
58 | |
59 func (wfjc *WFSFeatureJobCreator) Description() string { | |
60 return wfjc.description | |
61 } | |
62 | |
63 func (wfjc *WFSFeatureJobCreator) Depends() [2][]string { | |
64 return wfjc.depends | |
65 } | |
66 | |
67 func (*WFSFeatureJobCreator) AutoAccept() bool { | |
68 return true | |
69 } | |
70 | |
71 // StageDone is a NOP for WFS imports. | |
72 func (*WFSFeatureJobCreator) StageDone(context.Context, *sql.Tx, int64) error { | |
73 return nil | |
74 } | |
75 | |
76 func (wfjc *WFSFeatureJobCreator) Create() Job { | |
77 return &WFSFeatureJob{creator: wfjc} | |
78 } | |
79 | |
80 // Description gives a short info about relevant facts of this import. | |
81 func (wfj *WFSFeatureJob) Description() (string, error) { | |
82 return wfj.URL + "|" + wfj.FeatureType, nil | |
83 } | |
84 | |
85 // CleanUp for WFS imports is a NOP. | |
86 func (*WFSFeatureJob) CleanUp() error { | |
87 return nil | |
88 } | |
89 | |
90 func (wfj *WFSFeatureJob) Do( | |
91 ctx context.Context, | |
92 importID int64, | |
93 conn *sql.Conn, | |
94 feedback Feedback, | |
95 ) (interface{}, error) { | |
96 | |
97 start := time.Now() | |
98 | |
99 feedback.Info("Import %s", wfj.creator.Description()) | |
100 | |
101 feedback.Info("Loading capabilities from %s", wfj.URL) | |
102 caps, err := wfs.GetCapabilities(wfj.URL) | |
103 if err != nil { | |
104 feedback.Error("Loading capabilities failed: %v", err) | |
105 return nil, err | |
106 } | |
107 | |
108 ft := caps.FindFeatureType(wfj.FeatureType) | |
109 if ft == nil { | |
110 return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType) | |
111 } | |
112 | |
113 feedback.Info("Found feature type '%s'", wfj.FeatureType) | |
114 | |
115 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) | |
116 if err != nil { | |
117 feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) | |
118 return nil, err | |
119 } | |
120 | |
121 if nilString(wfj.SortBy) != "" { | |
122 feedback.Info("Features will be sorted by '%s'", *wfj.SortBy) | |
123 } | |
124 | |
125 dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy)) | |
126 if err != nil { | |
127 feedback.Error("Cannot create GetFeature URLs. %v", err) | |
128 return nil, err | |
129 } | |
130 | |
131 var ( | |
132 unsupported = stringCounter{} | |
133 missingProperties int | |
134 badProperties int | |
135 dupes int | |
136 features int | |
137 ) | |
138 | |
139 consumer, err := wfj.creator.newConsumer(ctx, conn, feedback) | |
140 if err != nil { | |
141 return nil, err | |
142 } | |
143 defer consumer.Rollback() | |
144 | |
145 if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error { | |
146 feedback.Info("Get features from: '%s'", url) | |
147 rfc, err := wfs.ParseRawFeatureCollection(r) | |
148 if err != nil { | |
149 return fmt.Errorf("parsing GetFeature document failed: %v", err) | |
150 } | |
151 if rfc.CRS != nil { | |
152 crsName := rfc.CRS.Properties.Name | |
153 if epsg, err = wfs.CRSToEPSG(crsName); err != nil { | |
154 feedback.Error("Unsupported CRS: %d", crsName) | |
155 return err | |
156 } | |
157 } | |
158 | |
159 // No features -> ignore. | |
160 if rfc.Features == nil { | |
161 return nil | |
162 } | |
163 | |
164 feedback.Info("Using EPSG: %d", epsg) | |
165 | |
166 feedback.Info( | |
167 "Found %d features in data source", len(rfc.Features)) | |
168 | |
169 for _, feature := range rfc.Features { | |
170 if feature.Properties == nil || feature.Geometry.Coordinates == nil { | |
171 missingProperties++ | |
172 continue | |
173 } | |
174 | |
175 kind, geom, props := consumer.NewFeature() | |
176 | |
177 if err := json.Unmarshal(*feature.Properties, props); err != nil { | |
178 badProperties++ | |
179 continue | |
180 } | |
181 | |
182 if feature.Geometry.Type == kind { | |
183 if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil { | |
184 return err | |
185 } | |
186 | |
187 err := consumer.Consume(geom, props, epsg) | |
188 switch { | |
189 case err == ErrFeatureDuplicated: | |
190 dupes++ | |
191 case err == ErrFeatureIgnored: | |
192 // be silent | |
193 case err != nil: | |
194 return err | |
195 default: | |
196 features++ | |
197 } | |
198 } else { | |
199 unsupported[feature.Geometry.Type]++ | |
200 } | |
201 } | |
202 return nil | |
203 }); err != nil { | |
204 return nil, err | |
205 } | |
206 | |
207 if dupes > 0 { | |
208 feedback.Info( | |
209 "Features outside responsibility area, duplicates or unchanged: %d", | |
210 dupes) | |
211 } | |
212 | |
213 if badProperties > 0 { | |
214 feedback.Warn("Bad properties: %d", badProperties) | |
215 } | |
216 | |
217 if missingProperties > 0 { | |
218 feedback.Warn("Missing properties: %d", missingProperties) | |
219 } | |
220 | |
221 if len(unsupported) != 0 { | |
222 feedback.Warn("Unsupported types found: %s", unsupported) | |
223 } | |
224 | |
225 if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified { | |
226 feedback.Info("Storing %d features took %s", | |
227 features, time.Since(start)) | |
228 } | |
229 | |
230 // Commit before eventually returning UnchangedError because we might | |
231 // have updated last_found | |
232 if features == 0 && err == ErrFeaturesUnmodified { | |
233 return nil, UnchangedError("no valid new features found") | |
234 } | |
235 | |
236 if err == ErrFeaturesUnmodified { | |
237 // It's not really an error. | |
238 err = nil | |
239 } | |
240 | |
241 return nil, err | |
242 } | |
243 | |
244 type ( | |
245 SQLGeometryConsumer struct { | |
246 ctx context.Context | |
247 tx *sql.Tx | |
248 feedback Feedback | |
249 consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error | |
250 newFeature func() (string, interface{}, interface{}) | |
251 preCommit func(*SQLGeometryConsumer) error | |
252 savepoint func(func() error) error | |
253 stmts []*sql.Stmt | |
254 } | |
255 ) | |
256 | |
257 func (sgc *SQLGeometryConsumer) Rollback() error { | |
258 if tx := sgc.tx; tx != nil { | |
259 sgc.releaseStmts() | |
260 sgc.tx = nil | |
261 sgc.ctx = nil | |
262 return tx.Rollback() | |
263 } | |
264 return nil | |
265 } | |
266 | |
267 func (sgc *SQLGeometryConsumer) Commit() error { | |
268 var err error | |
269 if tx := sgc.tx; tx != nil { | |
270 if sgc.preCommit != nil { | |
271 err = sgc.preCommit(sgc) | |
272 } | |
273 sgc.releaseStmts() | |
274 sgc.tx = nil | |
275 sgc.ctx = nil | |
276 if err2 := tx.Commit(); err2 != nil { | |
277 // A real error on commit overrules the first. | |
278 err = err2 | |
279 } | |
280 } | |
281 return err | |
282 } | |
283 | |
284 func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) { | |
285 return sgc.newFeature() | |
286 } | |
287 | |
288 func (sgc *SQLGeometryConsumer) Consume( | |
289 geom, properties interface{}, | |
290 epsg int, | |
291 ) error { | |
292 return sgc.consume(sgc, geom, properties, epsg) | |
293 } | |
294 | |
295 func (sgc *SQLGeometryConsumer) ConsumePolygon( | |
296 polygon polygonSlice, | |
297 properties interface{}, | |
298 epsg int, | |
299 ) error { | |
300 return sgc.consume(sgc, polygon, properties, epsg) | |
301 } | |
302 | |
303 func newSQLConsumer( | |
304 init func(*SQLGeometryConsumer) error, | |
305 consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error, | |
306 preCommit func(*SQLGeometryConsumer) error, | |
307 newFeature func() (string, interface{}, interface{}), | |
308 | |
309 ) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) { | |
310 return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) { | |
311 tx, err := conn.BeginTx(ctx, nil) | |
312 if err != nil { | |
313 return nil, err | |
314 } | |
315 sgc := &SQLGeometryConsumer{ | |
316 ctx: ctx, | |
317 tx: tx, | |
318 feedback: feedback, | |
319 consume: consume, | |
320 newFeature: newFeature, | |
321 preCommit: preCommit, | |
322 savepoint: Savepoint(ctx, tx, "feature"), | |
323 } | |
324 if err := init(sgc); err != nil { | |
325 tx.Rollback() | |
326 return nil, err | |
327 } | |
328 return sgc, nil | |
329 } | |
330 } | |
331 | |
332 func (sgc *SQLGeometryConsumer) releaseStmts() { | |
333 for i := len(sgc.stmts); i > 0; i-- { | |
334 sgc.stmts[i-1].Close() | |
335 sgc.stmts[i-1] = nil | |
336 } | |
337 sgc.stmts = nil | |
338 } | |
339 | |
340 func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error { | |
341 return func(sgc *SQLGeometryConsumer) error { | |
342 for _, query := range queries { | |
343 stmt, err := sgc.tx.PrepareContext(sgc.ctx, query) | |
344 if err != nil { | |
345 return err | |
346 } | |
347 sgc.stmts = append(sgc.stmts, stmt) | |
348 } | |
349 return nil | |
350 } | |
351 } |