comparison pkg/imports/wfsjob.go @ 4979:ee12730acd73

Moved generalized WFS feature import to a better named file.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 03 Mar 2020 18:13:57 +0100
parents pkg/imports/pointwfs.go@35a3dc12050f
children 21d2acc080c9
comparison
equal deleted inserted replaced
4978:35a3dc12050f 4979:ee12730acd73
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2020 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Tom Gottfried <tom.gottfried@intevation.de>
13 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
14
15 package imports
16
17 import (
18 "context"
19 "database/sql"
20 "encoding/json"
21 "errors"
22 "fmt"
23 "io"
24 "time"
25
26 "gemma.intevation.de/gemma/pkg/models"
27 "gemma.intevation.de/gemma/pkg/wfs"
28 )
29
30 var (
31 ErrFeatureIgnored = errors.New("feature ignored")
32 ErrFeatureDuplicated = errors.New("feature duplicated")
33 ErrFeaturesUnmodified = errors.New("features unmodified")
34 )
35
36 type (
37 WFSFeatureConsumer interface {
38 Commit() error
39 Rollback() error
40
41 NewFeature() (kind string, geom interface{}, properties interface{})
42
43 Consume(geom, properties interface{}, epsg int) error
44 }
45
46 WFSFeatureJobCreator struct {
47 description string
48 depends [2][]string
49
50 newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)
51 }
52
53 WFSFeatureJob struct {
54 models.WFSImport
55 creator *WFSFeatureJobCreator
56 }
57 )
58
59 func (wfjc *WFSFeatureJobCreator) Description() string {
60 return wfjc.description
61 }
62
63 func (wfjc *WFSFeatureJobCreator) Depends() [2][]string {
64 return wfjc.depends
65 }
66
67 func (*WFSFeatureJobCreator) AutoAccept() bool {
68 return true
69 }
70
71 // StageDone is a NOP for WFS imports.
72 func (*WFSFeatureJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
73 return nil
74 }
75
76 func (wfjc *WFSFeatureJobCreator) Create() Job {
77 return &WFSFeatureJob{creator: wfjc}
78 }
79
80 // Description gives a short info about relevant facts of this import.
81 func (wfj *WFSFeatureJob) Description() (string, error) {
82 return wfj.URL + "|" + wfj.FeatureType, nil
83 }
84
85 // CleanUp for WFS imports is a NOP.
86 func (*WFSFeatureJob) CleanUp() error {
87 return nil
88 }
89
90 func (wfj *WFSFeatureJob) Do(
91 ctx context.Context,
92 importID int64,
93 conn *sql.Conn,
94 feedback Feedback,
95 ) (interface{}, error) {
96
97 start := time.Now()
98
99 feedback.Info("Import %s", wfj.creator.Description())
100
101 feedback.Info("Loading capabilities from %s", wfj.URL)
102 caps, err := wfs.GetCapabilities(wfj.URL)
103 if err != nil {
104 feedback.Error("Loading capabilities failed: %v", err)
105 return nil, err
106 }
107
108 ft := caps.FindFeatureType(wfj.FeatureType)
109 if ft == nil {
110 return nil, fmt.Errorf("unknown feature type '%s'", wfj.FeatureType)
111 }
112
113 feedback.Info("Found feature type '%s'", wfj.FeatureType)
114
115 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
116 if err != nil {
117 feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
118 return nil, err
119 }
120
121 if nilString(wfj.SortBy) != "" {
122 feedback.Info("Features will be sorted by '%s'", *wfj.SortBy)
123 }
124
125 dl, err := wfs.GetFeatures(caps, wfj.FeatureType, nilString(wfj.SortBy))
126 if err != nil {
127 feedback.Error("Cannot create GetFeature URLs. %v", err)
128 return nil, err
129 }
130
131 var (
132 unsupported = stringCounter{}
133 missingProperties int
134 badProperties int
135 dupes int
136 features int
137 )
138
139 consumer, err := wfj.creator.newConsumer(ctx, conn, feedback)
140 if err != nil {
141 return nil, err
142 }
143 defer consumer.Rollback()
144
145 if err := dl.Download(nilString(wfj.User), nilString(wfj.Password), func(url string, r io.Reader) error {
146 feedback.Info("Get features from: '%s'", url)
147 rfc, err := wfs.ParseRawFeatureCollection(r)
148 if err != nil {
149 return fmt.Errorf("parsing GetFeature document failed: %v", err)
150 }
151 if rfc.CRS != nil {
152 crsName := rfc.CRS.Properties.Name
153 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
154 feedback.Error("Unsupported CRS: %d", crsName)
155 return err
156 }
157 }
158
159 // No features -> ignore.
160 if rfc.Features == nil {
161 return nil
162 }
163
164 feedback.Info("Using EPSG: %d", epsg)
165
166 feedback.Info(
167 "Found %d features in data source", len(rfc.Features))
168
169 for _, feature := range rfc.Features {
170 if feature.Properties == nil || feature.Geometry.Coordinates == nil {
171 missingProperties++
172 continue
173 }
174
175 kind, geom, props := consumer.NewFeature()
176
177 if err := json.Unmarshal(*feature.Properties, props); err != nil {
178 badProperties++
179 continue
180 }
181
182 if feature.Geometry.Type == kind {
183 if err := json.Unmarshal(*feature.Geometry.Coordinates, geom); err != nil {
184 return err
185 }
186
187 err := consumer.Consume(geom, props, epsg)
188 switch {
189 case err == ErrFeatureDuplicated:
190 dupes++
191 case err == ErrFeatureIgnored:
192 // be silent
193 case err != nil:
194 return err
195 default:
196 features++
197 }
198 } else {
199 unsupported[feature.Geometry.Type]++
200 }
201 }
202 return nil
203 }); err != nil {
204 return nil, err
205 }
206
207 if dupes > 0 {
208 feedback.Info(
209 "Features outside responsibility area, duplicates or unchanged: %d",
210 dupes)
211 }
212
213 if badProperties > 0 {
214 feedback.Warn("Bad properties: %d", badProperties)
215 }
216
217 if missingProperties > 0 {
218 feedback.Warn("Missing properties: %d", missingProperties)
219 }
220
221 if len(unsupported) != 0 {
222 feedback.Warn("Unsupported types found: %s", unsupported)
223 }
224
225 if err = consumer.Commit(); err == nil || err == ErrFeaturesUnmodified {
226 feedback.Info("Storing %d features took %s",
227 features, time.Since(start))
228 }
229
230 // Commit before eventually returning UnchangedError because we might
231 // have updated last_found
232 if features == 0 && err == ErrFeaturesUnmodified {
233 return nil, UnchangedError("no valid new features found")
234 }
235
236 if err == ErrFeaturesUnmodified {
237 // It's not really an error.
238 err = nil
239 }
240
241 return nil, err
242 }
243
244 type (
245 SQLGeometryConsumer struct {
246 ctx context.Context
247 tx *sql.Tx
248 feedback Feedback
249 consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error
250 newFeature func() (string, interface{}, interface{})
251 preCommit func(*SQLGeometryConsumer) error
252 savepoint func(func() error) error
253 stmts []*sql.Stmt
254 }
255 )
256
257 func (sgc *SQLGeometryConsumer) Rollback() error {
258 if tx := sgc.tx; tx != nil {
259 sgc.releaseStmts()
260 sgc.tx = nil
261 sgc.ctx = nil
262 return tx.Rollback()
263 }
264 return nil
265 }
266
267 func (sgc *SQLGeometryConsumer) Commit() error {
268 var err error
269 if tx := sgc.tx; tx != nil {
270 if sgc.preCommit != nil {
271 err = sgc.preCommit(sgc)
272 }
273 sgc.releaseStmts()
274 sgc.tx = nil
275 sgc.ctx = nil
276 if err2 := tx.Commit(); err2 != nil {
277 // A real error on commit overrules the first.
278 err = err2
279 }
280 }
281 return err
282 }
283
284 func (sgc *SQLGeometryConsumer) NewFeature() (string, interface{}, interface{}) {
285 return sgc.newFeature()
286 }
287
288 func (sgc *SQLGeometryConsumer) Consume(
289 geom, properties interface{},
290 epsg int,
291 ) error {
292 return sgc.consume(sgc, geom, properties, epsg)
293 }
294
295 func (sgc *SQLGeometryConsumer) ConsumePolygon(
296 polygon polygonSlice,
297 properties interface{},
298 epsg int,
299 ) error {
300 return sgc.consume(sgc, polygon, properties, epsg)
301 }
302
303 func newSQLConsumer(
304 init func(*SQLGeometryConsumer) error,
305 consume func(*SQLGeometryConsumer, interface{}, interface{}, int) error,
306 preCommit func(*SQLGeometryConsumer) error,
307 newFeature func() (string, interface{}, interface{}),
308
309 ) func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error) {
310 return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSFeatureConsumer, error) {
311 tx, err := conn.BeginTx(ctx, nil)
312 if err != nil {
313 return nil, err
314 }
315 sgc := &SQLGeometryConsumer{
316 ctx: ctx,
317 tx: tx,
318 feedback: feedback,
319 consume: consume,
320 newFeature: newFeature,
321 preCommit: preCommit,
322 savepoint: Savepoint(ctx, tx, "feature"),
323 }
324 if err := init(sgc); err != nil {
325 tx.Rollback()
326 return nil, err
327 }
328 return sgc, nil
329 }
330 }
331
332 func (sgc *SQLGeometryConsumer) releaseStmts() {
333 for i := len(sgc.stmts); i > 0; i-- {
334 sgc.stmts[i-1].Close()
335 sgc.stmts[i-1] = nil
336 }
337 sgc.stmts = nil
338 }
339
340 func prepareStmnts(queries ...string) func(*SQLGeometryConsumer) error {
341 return func(sgc *SQLGeometryConsumer) error {
342 for _, query := range queries {
343 stmt, err := sgc.tx.PrepareContext(sgc.ctx, query)
344 if err != nil {
345 return err
346 }
347 sgc.stmts = append(sgc.stmts, stmt)
348 }
349 return nil
350 }
351 }