Mercurial > gemma
comparison pkg/imports/wa.go @ 1785:614c6c766691
Waterway area import: Implemented.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sat, 12 Jan 2019 19:53:31 +0100 |
parents | |
children | 09349ca27dd7 |
comparison
equal
deleted
inserted
replaced
1784:724758455a4e | 1785:614c6c766691 |
---|---|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 | |
2 // without warranty, see README.md and license for details. | |
3 // | |
4 // SPDX-License-Identifier: AGPL-3.0-or-later | |
5 // License-Filename: LICENSES/AGPL-3.0.txt | |
6 // | |
7 // Copyright (C) 2018 by via donau | |
8 // – Österreichische Wasserstraßen-Gesellschaft mbH | |
9 // Software engineering by Intevation GmbH | |
10 // | |
11 // Author(s): | |
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | |
13 | |
14 package imports | |
15 | |
16 import ( | |
17 "context" | |
18 "database/sql" | |
19 "encoding/json" | |
20 "errors" | |
21 "fmt" | |
22 "io" | |
23 "strconv" | |
24 "time" | |
25 | |
26 "gemma.intevation.de/gemma/pkg/common" | |
27 "gemma.intevation.de/gemma/pkg/wfs" | |
28 ) | |
29 | |
30 // WaterwayArea is an import job to import | |
31 // the waterway area in form of polygon geometries | |
32 // and attribute data from a WFS service. | |
33 type WaterwayArea struct { | |
34 // URL the GetCapabilities URL of the WFS service. | |
35 URL string `json:"url"` | |
36 // FeatureType selects the feature type of the WFS service. | |
37 FeatureType string `json:"feature-type"` | |
38 // SortBy works around misconfigured services to | |
39 // establish a sort order to get the features. | |
40 SortBy string `json:"sort-by"` | |
41 } | |
42 | |
43 // WAJobKind is the import queue type identifier. | |
44 const WAJobKind JobKind = "wa" | |
45 | |
46 type waJobCreator struct{} | |
47 | |
48 func init() { | |
49 RegisterJobCreator(WAJobKind, waJobCreator{}) | |
50 } | |
51 | |
52 func (waJobCreator) Description() string { return "waterway area" } | |
53 | |
54 func (waJobCreator) AutoAccept() bool { return true } | |
55 | |
56 func (waJobCreator) Create(_ JobKind, data string) (Job, error) { | |
57 wa := new(WaterwayArea) | |
58 if err := common.FromJSONString(data, wa); err != nil { | |
59 return nil, err | |
60 } | |
61 return wa, nil | |
62 } | |
63 | |
64 func (waJobCreator) Depends() []string { | |
65 return []string{ | |
66 "waterway_area", | |
67 } | |
68 } | |
69 | |
70 // StageDone is a NOP for waterway area imports. | |
71 func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error { | |
72 return nil | |
73 } | |
74 | |
75 // CleanUp for waterway area imports is a NOP. | |
76 func (*WaterwayArea) CleanUp() error { return nil } | |
77 | |
78 type waterwayAreaProperties struct { | |
79 Catccl *string `json:"ienc_catccl"` | |
80 Dirimp *string `json:"ienc_dirimp"` | |
81 } | |
82 | |
83 const ( | |
84 deleteWaterwayAreaSQL = `DELETE FROM waterway.waterway_area` | |
85 insertWaterwayAreaSQL = ` | |
86 INSERT INTO waterway.waterway_area (area, catccl, dirimp) | |
87 VALUES ( | |
88 ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography, | |
89 $3, | |
90 $4 | |
91 )` | |
92 ) | |
93 | |
94 // Do executes the actual waterway axis import. | |
95 func (wx *WaterwayArea) Do( | |
96 ctx context.Context, | |
97 importID int64, | |
98 conn *sql.Conn, | |
99 feedback Feedback, | |
100 ) (interface{}, error) { | |
101 | |
102 start := time.Now() | |
103 | |
104 feedback.Info("Import waterway area") | |
105 | |
106 feedback.Info("Loading capabilities from %s", wx.URL) | |
107 caps, err := wfs.GetCapabilities(wx.URL) | |
108 if err != nil { | |
109 feedback.Error("Loading capabilities failed: %v", err) | |
110 return nil, err | |
111 } | |
112 | |
113 ft := caps.FindFeatureType(wx.FeatureType) | |
114 if ft == nil { | |
115 return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) | |
116 } | |
117 | |
118 feedback.Info("Found feature type '%s", wx.FeatureType) | |
119 | |
120 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) | |
121 if err != nil { | |
122 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) | |
123 return nil, err | |
124 } | |
125 | |
126 if wx.SortBy != "" { | |
127 feedback.Info("Features will be sorted by '%s'", wx.SortBy) | |
128 } | |
129 | |
130 urls, err := wfs.GetFeaturesGET( | |
131 caps, wx.FeatureType, "application/json", wx.SortBy) | |
132 if err != nil { | |
133 feedback.Error("Cannot create GetFeature URLs. %v", err) | |
134 return nil, err | |
135 } | |
136 | |
137 tx, err := conn.BeginTx(ctx, nil) | |
138 if err != nil { | |
139 return nil, err | |
140 } | |
141 defer tx.Rollback() | |
142 | |
143 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL) | |
144 if err != nil { | |
145 return nil, err | |
146 } | |
147 defer insertStmt.Close() | |
148 | |
149 // Delete the old features. | |
150 if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil { | |
151 return nil, err | |
152 } | |
153 | |
154 var ( | |
155 unsupported = stringCounter{} | |
156 missingProperties int | |
157 badProperties int | |
158 features int | |
159 ) | |
160 | |
161 if err := wfs.DownloadURLs(urls, func(r io.Reader) error { | |
162 rfc, err := wfs.ParseRawFeatureCollection(r) | |
163 if err != nil { | |
164 return fmt.Errorf("parsing GetFeature document failed: %v", err) | |
165 } | |
166 if rfc.CRS != nil { | |
167 crsName := rfc.CRS.Properties.Name | |
168 if epsg, err = wfs.CRSToEPSG(crsName); err != nil { | |
169 feedback.Error("Unsupported CRS: %d", crsName) | |
170 return err | |
171 } | |
172 } | |
173 | |
174 // No features -> ignore. | |
175 if rfc.Features == nil { | |
176 return nil | |
177 } | |
178 | |
179 feedback.Info("Using EPSG: %d", epsg) | |
180 | |
181 for _, feature := range rfc.Features { | |
182 if feature.Properties == nil || feature.Geometry.Coordinates == nil { | |
183 missingProperties++ | |
184 continue | |
185 } | |
186 | |
187 var props waterwayAreaProperties | |
188 | |
189 if err := json.Unmarshal(*feature.Properties, &props); err != nil { | |
190 badProperties++ | |
191 continue | |
192 } | |
193 | |
194 var catccl sql.NullInt64 | |
195 if props.Catccl != nil { | |
196 if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil { | |
197 catccl = sql.NullInt64{Int64: value, Valid: true} | |
198 } | |
199 } | |
200 var dirimp sql.NullInt64 | |
201 if props.Dirimp != nil { | |
202 if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil { | |
203 dirimp = sql.NullInt64{Int64: value, Valid: true} | |
204 } | |
205 } | |
206 | |
207 switch feature.Geometry.Type { | |
208 case "Polygon": | |
209 var p polygon | |
210 if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { | |
211 return err | |
212 } | |
213 if _, err := insertStmt.ExecContext( | |
214 ctx, | |
215 p.asWKB(), | |
216 epsg, | |
217 catccl, | |
218 dirimp, | |
219 ); err != nil { | |
220 return err | |
221 } | |
222 features++ | |
223 default: | |
224 unsupported[feature.Geometry.Type]++ | |
225 } | |
226 } | |
227 return nil | |
228 }); err != nil { | |
229 feedback.Error("Downloading features failed: %v", err) | |
230 return nil, err | |
231 } | |
232 | |
233 if features == 0 { | |
234 err := errors.New("No features found") | |
235 feedback.Error("%v", err) | |
236 return nil, err | |
237 } | |
238 | |
239 if badProperties > 0 { | |
240 feedback.Warn("Bad properties: %d", badProperties) | |
241 } | |
242 | |
243 if missingProperties > 0 { | |
244 feedback.Warn("Missing properties: %d", missingProperties) | |
245 } | |
246 | |
247 if len(unsupported) != 0 { | |
248 feedback.Warn("Unsupported types found: %s", unsupported) | |
249 } | |
250 | |
251 if err = tx.Commit(); err == nil { | |
252 feedback.Info("Storing %d features took %s", | |
253 features, time.Since(start)) | |
254 } | |
255 | |
256 return nil, err | |
257 } |