Mercurial > gemma
comparison pkg/imports/wx.go @ 1681:4d6ce621379e
Waterway axis import: Completed, but the final commit unexpectedly resulted in rollback.
TODO: Fix this bug.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 27 Dec 2018 17:08:09 +0100 |
parents | de8089944b19 |
children | 3c99d599503a |
comparison
equal
deleted
inserted
replaced
1680:de8089944b19 | 1681:4d6ce621379e |
---|---|
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
13 | 13 |
14 package imports | 14 package imports |
15 | 15 |
16 import ( | 16 import ( |
17 "bytes" | |
17 "context" | 18 "context" |
18 "database/sql" | 19 "database/sql" |
20 "encoding/binary" | |
19 "encoding/json" | 21 "encoding/json" |
22 "errors" | |
20 "fmt" | 23 "fmt" |
21 "io" | 24 "io" |
22 "log" | 25 "math" |
23 "strings" | 26 "strings" |
24 "time" | 27 "time" |
25 | 28 |
26 "gemma.intevation.de/gemma/pkg/common" | 29 "gemma.intevation.de/gemma/pkg/common" |
27 "gemma.intevation.de/gemma/pkg/wfs" | 30 "gemma.intevation.de/gemma/pkg/wfs" |
69 | 72 |
70 type waterwayAxisProperties struct { | 73 type waterwayAxisProperties struct { |
71 ObjNam string `json:"hydro_objnam"` | 74 ObjNam string `json:"hydro_objnam"` |
72 NObjNnm *string `json:"hydro_nobjnm"` | 75 NObjNnm *string `json:"hydro_nobjnm"` |
73 } | 76 } |
77 | |
78 type line [][]float64 | |
79 | |
80 const wkbLineString uint32 = 2 | |
81 | |
82 func (l line) asWKB() []byte { | |
83 | |
84 size := 1 + 4 + 4 + len(l)*(1+4+2*8) | |
85 | |
86 buf := bytes.NewBuffer(make([]byte, 0, size)) | |
87 | |
88 binary.Write(buf, binary.LittleEndian, wkbNDR) | |
89 binary.Write(buf, binary.LittleEndian, wkbLineString) | |
90 binary.Write(buf, binary.LittleEndian, uint32(len(l))) | |
91 | |
92 for i := range l { | |
93 c := l[i] | |
94 var lat, lon float64 | |
95 if len(c) > 0 { | |
96 lat = c[0] | |
97 } | |
98 if len(c) > 1 { | |
99 lon = c[1] | |
100 } | |
101 binary.Write(buf, binary.LittleEndian, math.Float64bits(lat)) | |
102 binary.Write(buf, binary.LittleEndian, math.Float64bits(lon)) | |
103 } | |
104 | |
105 return buf.Bytes() | |
106 } | |
107 | |
108 const ( | |
109 deleteWaterwayAxisSQL = `DELETE FROM waterway.waterway_axis` | |
110 insertWaterwayAxisSQL = ` | |
111 INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam) | |
112 VALUES ( | |
113 ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography, | |
114 $3, | |
115 $4 | |
116 )` | |
117 ) | |
74 | 118 |
75 // Do executes the actual waterway exis import. | 119 // Do executes the actual waterway exis import. |
76 func (wx *WaterwayAxis) Do( | 120 func (wx *WaterwayAxis) Do( |
77 ctx context.Context, | 121 ctx context.Context, |
78 importID int64, | 122 importID int64, |
96 err := fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) | 140 err := fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) |
97 feedback.Error("%v", err) | 141 feedback.Error("%v", err) |
98 return nil, err | 142 return nil, err |
99 } | 143 } |
100 | 144 |
145 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) | |
146 if err != nil { | |
147 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) | |
148 return nil, err | |
149 } | |
150 | |
101 urls, err := wfs.GetFeaturesGET( | 151 urls, err := wfs.GetFeaturesGET( |
102 caps, wx.FeatureType, "application/json", wx.SortBy) | 152 caps, wx.FeatureType, "application/json", wx.SortBy) |
103 if err != nil { | 153 if err != nil { |
104 feedback.Error("Cannot create GetFeature URLs. %v", err) | 154 feedback.Error("Cannot create GetFeature URLs. %v", err) |
105 return nil, err | 155 return nil, err |
106 } | 156 } |
107 | 157 |
158 tx, err := conn.BeginTx(ctx, nil) | |
159 if err != nil { | |
160 return nil, err | |
161 } | |
162 defer tx.Rollback() | |
163 | |
164 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL) | |
165 if err != nil { | |
166 return nil, err | |
167 } | |
168 defer insertStmt.Close() | |
169 | |
170 // Delete the old features. | |
171 if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil { | |
172 return nil, err | |
173 } | |
174 | |
108 var ( | 175 var ( |
109 crsName string | |
110 unsupportedTypes = map[string]int{} | 176 unsupportedTypes = map[string]int{} |
111 missingProperties int | 177 missingProperties int |
112 badProperties int | 178 badProperties int |
179 features int | |
113 ) | 180 ) |
114 | 181 |
115 if err := wfs.DownloadURLs(urls, func(r io.Reader) error { | 182 if err := wfs.DownloadURLs(urls, func(r io.Reader) error { |
116 rfc, err := wfs.ParseRawFeatureCollection(r) | 183 rfc, err := wfs.ParseRawFeatureCollection(r) |
117 if err != nil { | 184 if err != nil { |
118 return err | 185 return err |
119 } | 186 } |
120 if crsName != "" && rfc.CRS != nil { | 187 if rfc.CRS != nil { |
121 crsName = rfc.CRS.Properties.Name | 188 crsName := rfc.CRS.Properties.Name |
189 if epsg, err = wfs.CRSToEPSG(crsName); err != nil { | |
190 feedback.Error("Unsupported CRS: %d", crsName) | |
191 return err | |
192 } | |
122 } | 193 } |
123 | 194 |
124 // No features -> ignore. | 195 // No features -> ignore. |
125 if rfc.Features == nil { | 196 if rfc.Features == nil { |
126 return nil | 197 return nil |
127 } | 198 } |
128 | 199 |
129 for _, feature := range rfc.Features { | 200 for _, feature := range rfc.Features { |
130 if feature.Properties == nil { | 201 if feature.Properties == nil || feature.Geometry.Coordinates == nil { |
131 missingProperties++ | 202 missingProperties++ |
132 continue | 203 continue |
133 } | 204 } |
134 | 205 |
135 var props waterwayAxisProperties | 206 var props waterwayAxisProperties |
137 if err := json.Unmarshal(*feature.Properties, &props); err != nil { | 208 if err := json.Unmarshal(*feature.Properties, &props); err != nil { |
138 badProperties++ | 209 badProperties++ |
139 continue | 210 continue |
140 } | 211 } |
141 | 212 |
142 log.Printf("properties: %s\n", props.ObjNam) | 213 var nobjnam sql.NullString |
214 if props.NObjNnm != nil { | |
215 nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true} | |
216 } | |
143 | 217 |
144 switch feature.Geometry.Type { | 218 switch feature.Geometry.Type { |
145 case "LineString": | 219 case "LineString": |
146 // TODO: Parse concrete features. | 220 var l line |
221 if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { | |
222 return err | |
223 } | |
224 if _, err := insertStmt.ExecContext( | |
225 ctx, | |
226 l.asWKB(), | |
227 epsg, | |
228 props.ObjNam, | |
229 nobjnam, | |
230 ); err != nil { | |
231 return err | |
232 } | |
233 features++ | |
147 case "MultiLineString": | 234 case "MultiLineString": |
148 // TODO: Parse concrete features. | 235 var ls []line |
236 if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil { | |
237 return err | |
238 } | |
239 for _, l := range ls { | |
240 if _, err := insertStmt.ExecContext( | |
241 ctx, | |
242 l.asWKB(), | |
243 epsg, | |
244 props.ObjNam, | |
245 nobjnam, | |
246 ); err != nil { | |
247 return err | |
248 } | |
249 features++ | |
250 } | |
149 default: | 251 default: |
150 unsupportedTypes[feature.Geometry.Type]++ | 252 unsupportedTypes[feature.Geometry.Type]++ |
151 } | 253 } |
152 } | 254 } |
153 return nil | 255 return nil |
154 }); err != nil { | 256 }); err != nil { |
155 feedback.Error("Downloading features failed: %v", err) | 257 feedback.Error("Downloading features failed: %v", err) |
258 return nil, err | |
259 } | |
260 | |
261 if features == 0 { | |
262 err := errors.New("No features found") | |
263 feedback.Error("%v", err) | |
156 return nil, err | 264 return nil, err |
157 } | 265 } |
158 | 266 |
159 if badProperties > 0 { | 267 if badProperties > 0 { |
160 feedback.Warn("Bad properties: %d", badProperties) | 268 feedback.Warn("Bad properties: %d", badProperties) |
173 b.WriteString(fmt.Sprintf("%s: %d", t, c)) | 281 b.WriteString(fmt.Sprintf("%s: %d", t, c)) |
174 } | 282 } |
175 feedback.Warn("Unsupported types found: %s", b.String()) | 283 feedback.Warn("Unsupported types found: %s", b.String()) |
176 } | 284 } |
177 | 285 |
178 if crsName == "" { | 286 if err = tx.Commit(); err == nil { |
179 crsName = ft.DefaultCRS | 287 feedback.Info("Storing %d features took %s", |
180 } | 288 features, time.Since(start)) |
181 | 289 } |
182 epsg, err := wfs.CRSToEPSG(crsName) | 290 |
183 if err != nil { | 291 return nil, err |
184 feedback.Error("Unsupported CRS name '%s'", crsName) | 292 } |
185 return nil, err | |
186 } | |
187 | |
188 feedback.Info("using ESPG: %d", epsg) | |
189 | |
190 // TODO: Store extracted features. | |
191 | |
192 feedback.Info("Storing took %s", time.Since(start)) | |
193 | |
194 return nil, nil | |
195 } |