comparison pkg/imports/fd.go @ 1841:491f5b68da9e

Implemented fairway dimension import.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 17 Jan 2019 09:22:17 +0100
parents
children b1b0db195cc5
comparison
equal deleted inserted replaced
1837:00d63eb9306a 1841:491f5b68da9e
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Raimund Renkert <raimund.renkert@intevation.de>
13
14 package imports
15
16 import (
17 "context"
18 "database/sql"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "io"
23 "time"
24
25 "gemma.intevation.de/gemma/pkg/common"
26 "gemma.intevation.de/gemma/pkg/wfs"
27 )
28
29 // FairwayDimension is an import job to import
30 // the fairway dimensions in form of polygon geometries
31 // and attribute data from a WFS service.
32 type FairwayDimension struct {
33 // URL the GetCapabilities URL of the WFS service.
34 URL string `json:"url"`
35 // FeatureType selects the feature type of the WFS service.
36 FeatureType string `json:"feature-type"`
37 SortBy string `json:"sort-by"`
38 LOS int `json:"los"`
39 MinWidth int `json:"min-width"`
40 MaxWidth int `json:"max-width"`
41 Depth int `json:"depth"`
42 SourceOrganization string `json:"source-organization"`
43 }
44
45 type fdTime struct{ time.Time }
46
47 func (fdt *fdTime) UnmarshalJSON(data []byte) error {
48 var s string
49 if err := json.Unmarshal(data, &s); err != nil {
50 return err
51 }
52 t, err := time.Parse("20060102", s)
53 if err == nil {
54 *fdt = fdTime{t}
55 }
56 return err
57 }
58
59 // FDJobKind is the import queue type identifier.
60 const FDJobKind JobKind = "fd"
61
62 type fdJobCreator struct{}
63
64 func init() {
65 RegisterJobCreator(FDJobKind, fdJobCreator{})
66 }
67
68 func (fdJobCreator) Description() string { return "fairway dimension" }
69
70 func (fdJobCreator) AutoAccept() bool { return true }
71
72 func (fdJobCreator) Create(_ JobKind, data string) (Job, error) {
73 fd := new(FairwayDimension)
74 if err := common.FromJSONString(data, fd); err != nil {
75 return nil, err
76 }
77 return fd, nil
78 }
79
80 func (fdJobCreator) Depends() []string {
81 return []string{
82 "fairway_dimensions",
83 }
84 }
85
86 // StageDone is a NOP for fairway dimensions imports.
87 func (fdJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
88 return nil
89 }
90
91 // CleanUp for fairway dimension imports is a NOP.
92 func (*FairwayDimension) CleanUp() error { return nil }
93
94 type fairwayDimensionProperties struct {
95 HydroSorDat fdTime `json:"hydro_sordat"`
96 }
97
98 const (
99 deleteFairwayDimensionSQL = `
100 WITH resp AS (
101 SELECT best_utm(area::geometry) AS t,
102 ST_Transform(area::geometry, best_utm(area::geometry)) AS a
103 FROM users.responsibility_areas
104 WHERE country = users.current_user_country()
105 )
106 DELETE FROM waterway.fairway_dimensions
107 WHERE ST_Covers(
108 (SELECT a FROM resp),
109 ST_Transform(area::geometry, (SELECT t FROM resp)))
110 `
111
112 // The ST_MakeValid (line125) and ST_Buffer (line124) are a workarround to
113 // avoid errors due to reprojection.
114 insertFairwayDimensionSQL = `
115 WITH resp AS (
116 SELECT best_utm(area::geometry) AS t,
117 ST_Transform(area::geometry, best_utm(area::geometry)) AS a
118 FROM users.responsibility_areas
119 WHERE country = users.current_user_country()
120 )
121 INSERT INTO waterway.fairway_dimensions (area, level_of_service, min_width, max_width, min_depth, date_info, source_organization)
122 SELECT ST_Transform(clipped.geom, 4326)::geography, $3, $4, $5, $6, $7, $8 FROM (
123 SELECT (ST_Dump(
124 ST_Intersection(
125 (SELECT ST_Buffer(a, -0.0001) FROM resp),
126 ST_MakeValid(ST_Transform(
127 ST_GeomFromWKB($1, $2::integer),
128 (SELECT t FROM resp)
129 ))
130 )
131 )).geom AS geom
132 ) AS clipped
133 WHERE clipped.geom IS NOT NULL
134 `
135 )
136
137 // Do executes the actual fairway dimension import.
138 func (fd *FairwayDimension) Do(
139 ctx context.Context,
140 importID int64,
141 conn *sql.Conn,
142 feedback Feedback,
143 ) (interface{}, error) {
144
145 start := time.Now()
146
147 feedback.Info("Import fairway dimension")
148
149 feedback.Info("Loading capabilities from %s", fd.URL)
150 caps, err := wfs.GetCapabilities(fd.URL)
151 if err != nil {
152 feedback.Error("Loading capabilities failed: %v", err)
153 return nil, err
154 }
155
156 ft := caps.FindFeatureType(fd.FeatureType)
157 if ft == nil {
158 return nil, fmt.Errorf("Unknown feature type '%s'", fd.FeatureType)
159 }
160
161 feedback.Info("Found feature type '%s'", fd.FeatureType)
162
163 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
164 if err != nil {
165 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
166 return nil, err
167 }
168
169 urls, err := wfs.GetFeaturesGET(
170 caps, fd.FeatureType, "application/json", fd.SortBy)
171 if err != nil {
172 feedback.Error("Cannot create GetFeature URLs. %v", err)
173 return nil, err
174 }
175
176 tx, err := conn.BeginTx(ctx, nil)
177 if err != nil {
178 return nil, err
179 }
180 defer tx.Rollback()
181
182 insertStmt, err := tx.PrepareContext(ctx, insertFairwayDimensionSQL)
183 if err != nil {
184 return nil, err
185 }
186 defer insertStmt.Close()
187
188 // Delete the old features.
189 if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL); err != nil {
190 return nil, err
191 }
192
193 var (
194 unsupported = stringCounter{}
195 missingProperties int
196 badProperties int
197 features int
198 )
199
200 if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
201 rfc, err := wfs.ParseRawFeatureCollection(r)
202 if err != nil {
203 return fmt.Errorf("parsing GetFeature document failed: %v", err)
204 }
205 if rfc.CRS != nil {
206 crsName := rfc.CRS.Properties.Name
207 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
208 feedback.Error("Unsupported CRS: %d", crsName)
209 return err
210 }
211 }
212
213 // No features -> ignore.
214 if rfc.Features == nil {
215 return nil
216 }
217
218 feedback.Info("Using EPSG: %d", epsg)
219
220 for _, feature := range rfc.Features {
221 if feature.Geometry.Coordinates == nil {
222 missingProperties++
223 continue
224 }
225
226 var props fairwayDimensionProperties
227
228 if err := json.Unmarshal(*feature.Properties, &props); err != nil {
229 badProperties++
230 continue
231 }
232 switch feature.Geometry.Type {
233 case "Polygon":
234 var p polygonSlice
235 if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
236 return err
237 }
238 if _, err := insertStmt.ExecContext(
239 ctx,
240 p.asWKB(),
241 epsg,
242 fd.LOS,
243 fd.MinWidth,
244 fd.MaxWidth,
245 fd.Depth,
246 props.HydroSorDat.Time,
247 fd.SourceOrganization,
248 ); err != nil {
249 feedback.Error("error: %s", err)
250 return err
251 }
252 features++
253 default:
254 unsupported[feature.Geometry.Type]++
255 }
256 }
257 return nil
258 }); err != nil {
259 feedback.Error("Downloading features failed: %v", err)
260 return nil, err
261 }
262
263 if badProperties > 0 {
264 feedback.Warn("Bad properties: %d", badProperties)
265 }
266
267 if missingProperties > 0 {
268 feedback.Warn("Missing properties: %d", missingProperties)
269 }
270
271 if len(unsupported) != 0 {
272 feedback.Warn("Unsupported types found: %s", unsupported)
273 }
274
275 if features == 0 {
276 err := errors.New("No features found")
277 feedback.Error("%v", err)
278 return nil, err
279 }
280
281 if err = tx.Commit(); err == nil {
282 feedback.Info("Storing %d features took %s",
283 features, time.Since(start))
284 }
285
286 return nil, err
287 }