comparison pkg/imports/wx.go @ 1681:4d6ce621379e

Waterway axis import: Completed, but the final commit unexpectedly resulted in rollback. TODO: Fix this bug.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 27 Dec 2018 17:08:09 +0100
parents de8089944b19
children 3c99d599503a
comparison
equal deleted inserted replaced
1680:de8089944b19 1681:4d6ce621379e
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13 13
14 package imports 14 package imports
15 15
16 import ( 16 import (
17 "bytes"
17 "context" 18 "context"
18 "database/sql" 19 "database/sql"
20 "encoding/binary"
19 "encoding/json" 21 "encoding/json"
22 "errors"
20 "fmt" 23 "fmt"
21 "io" 24 "io"
22 "log" 25 "math"
23 "strings" 26 "strings"
24 "time" 27 "time"
25 28
26 "gemma.intevation.de/gemma/pkg/common" 29 "gemma.intevation.de/gemma/pkg/common"
27 "gemma.intevation.de/gemma/pkg/wfs" 30 "gemma.intevation.de/gemma/pkg/wfs"
69 72
70 type waterwayAxisProperties struct { 73 type waterwayAxisProperties struct {
71 ObjNam string `json:"hydro_objnam"` 74 ObjNam string `json:"hydro_objnam"`
72 NObjNnm *string `json:"hydro_nobjnm"` 75 NObjNnm *string `json:"hydro_nobjnm"`
73 } 76 }
77
78 type line [][]float64
79
80 const wkbLineString uint32 = 2
81
82 func (l line) asWKB() []byte {
83
84 size := 1 + 4 + 4 + len(l)*(1+4+2*8)
85
86 buf := bytes.NewBuffer(make([]byte, 0, size))
87
88 binary.Write(buf, binary.LittleEndian, wkbNDR)
89 binary.Write(buf, binary.LittleEndian, wkbLineString)
90 binary.Write(buf, binary.LittleEndian, uint32(len(l)))
91
92 for i := range l {
93 c := l[i]
94 var lat, lon float64
95 if len(c) > 0 {
96 lat = c[0]
97 }
98 if len(c) > 1 {
99 lon = c[1]
100 }
101 binary.Write(buf, binary.LittleEndian, math.Float64bits(lat))
102 binary.Write(buf, binary.LittleEndian, math.Float64bits(lon))
103 }
104
105 return buf.Bytes()
106 }
107
108 const (
109 deleteWaterwayAxisSQL = `DELETE FROM waterway.waterway_axis`
110 insertWaterwayAxisSQL = `
111 INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
112 VALUES (
113 ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography,
114 $3,
115 $4
116 )`
117 )
74 118
75 // Do executes the actual waterway exis import. 119 // Do executes the actual waterway exis import.
76 func (wx *WaterwayAxis) Do( 120 func (wx *WaterwayAxis) Do(
77 ctx context.Context, 121 ctx context.Context,
78 importID int64, 122 importID int64,
96 err := fmt.Errorf("Unknown feature type '%s'", wx.FeatureType) 140 err := fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
97 feedback.Error("%v", err) 141 feedback.Error("%v", err)
98 return nil, err 142 return nil, err
99 } 143 }
100 144
145 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
146 if err != nil {
147 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
148 return nil, err
149 }
150
101 urls, err := wfs.GetFeaturesGET( 151 urls, err := wfs.GetFeaturesGET(
102 caps, wx.FeatureType, "application/json", wx.SortBy) 152 caps, wx.FeatureType, "application/json", wx.SortBy)
103 if err != nil { 153 if err != nil {
104 feedback.Error("Cannot create GetFeature URLs. %v", err) 154 feedback.Error("Cannot create GetFeature URLs. %v", err)
105 return nil, err 155 return nil, err
106 } 156 }
107 157
158 tx, err := conn.BeginTx(ctx, nil)
159 if err != nil {
160 return nil, err
161 }
162 defer tx.Rollback()
163
164 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
165 if err != nil {
166 return nil, err
167 }
168 defer insertStmt.Close()
169
170 // Delete the old features.
171 if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
172 return nil, err
173 }
174
108 var ( 175 var (
109 crsName string
110 unsupportedTypes = map[string]int{} 176 unsupportedTypes = map[string]int{}
111 missingProperties int 177 missingProperties int
112 badProperties int 178 badProperties int
179 features int
113 ) 180 )
114 181
115 if err := wfs.DownloadURLs(urls, func(r io.Reader) error { 182 if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
116 rfc, err := wfs.ParseRawFeatureCollection(r) 183 rfc, err := wfs.ParseRawFeatureCollection(r)
117 if err != nil { 184 if err != nil {
118 return err 185 return err
119 } 186 }
120 if crsName != "" && rfc.CRS != nil { 187 if rfc.CRS != nil {
121 crsName = rfc.CRS.Properties.Name 188 crsName := rfc.CRS.Properties.Name
189 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
190 feedback.Error("Unsupported CRS: %d", crsName)
191 return err
192 }
122 } 193 }
123 194
124 // No features -> ignore. 195 // No features -> ignore.
125 if rfc.Features == nil { 196 if rfc.Features == nil {
126 return nil 197 return nil
127 } 198 }
128 199
129 for _, feature := range rfc.Features { 200 for _, feature := range rfc.Features {
130 if feature.Properties == nil { 201 if feature.Properties == nil || feature.Geometry.Coordinates == nil {
131 missingProperties++ 202 missingProperties++
132 continue 203 continue
133 } 204 }
134 205
135 var props waterwayAxisProperties 206 var props waterwayAxisProperties
137 if err := json.Unmarshal(*feature.Properties, &props); err != nil { 208 if err := json.Unmarshal(*feature.Properties, &props); err != nil {
138 badProperties++ 209 badProperties++
139 continue 210 continue
140 } 211 }
141 212
142 log.Printf("properties: %s\n", props.ObjNam) 213 var nobjnam sql.NullString
214 if props.NObjNnm != nil {
215 nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
216 }
143 217
144 switch feature.Geometry.Type { 218 switch feature.Geometry.Type {
145 case "LineString": 219 case "LineString":
146 // TODO: Parse concrete features. 220 var l line
221 if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
222 return err
223 }
224 if _, err := insertStmt.ExecContext(
225 ctx,
226 l.asWKB(),
227 epsg,
228 props.ObjNam,
229 nobjnam,
230 ); err != nil {
231 return err
232 }
233 features++
147 case "MultiLineString": 234 case "MultiLineString":
148 // TODO: Parse concrete features. 235 var ls []line
236 if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
237 return err
238 }
239 for _, l := range ls {
240 if _, err := insertStmt.ExecContext(
241 ctx,
242 l.asWKB(),
243 epsg,
244 props.ObjNam,
245 nobjnam,
246 ); err != nil {
247 return err
248 }
249 features++
250 }
149 default: 251 default:
150 unsupportedTypes[feature.Geometry.Type]++ 252 unsupportedTypes[feature.Geometry.Type]++
151 } 253 }
152 } 254 }
153 return nil 255 return nil
154 }); err != nil { 256 }); err != nil {
155 feedback.Error("Downloading features failed: %v", err) 257 feedback.Error("Downloading features failed: %v", err)
258 return nil, err
259 }
260
261 if features == 0 {
262 err := errors.New("No features found")
263 feedback.Error("%v", err)
156 return nil, err 264 return nil, err
157 } 265 }
158 266
159 if badProperties > 0 { 267 if badProperties > 0 {
160 feedback.Warn("Bad properties: %d", badProperties) 268 feedback.Warn("Bad properties: %d", badProperties)
173 b.WriteString(fmt.Sprintf("%s: %d", t, c)) 281 b.WriteString(fmt.Sprintf("%s: %d", t, c))
174 } 282 }
175 feedback.Warn("Unsupported types found: %s", b.String()) 283 feedback.Warn("Unsupported types found: %s", b.String())
176 } 284 }
177 285
178 if crsName == "" { 286 if err = tx.Commit(); err == nil {
179 crsName = ft.DefaultCRS 287 feedback.Info("Storing %d features took %s",
180 } 288 features, time.Since(start))
181 289 }
182 epsg, err := wfs.CRSToEPSG(crsName) 290
183 if err != nil { 291 return nil, err
184 feedback.Error("Unsupported CRS name '%s'", crsName) 292 }
185 return nil, err
186 }
187
188 feedback.Info("using ESPG: %d", epsg)
189
190 // TODO: Store extracted features.
191
192 feedback.Info("Storing took %s", time.Since(start))
193
194 return nil, nil
195 }