comparison pkg/imports/wa.go @ 1785:614c6c766691

Waterway area import: Implemented.
author Sascha L. Teichmann <teichmann@intevation.de>
date Sat, 12 Jan 2019 19:53:31 +0100
parents
children 09349ca27dd7
comparison
equal deleted inserted replaced
1784:724758455a4e 1785:614c6c766691
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13
14 package imports
15
16 import (
17 "context"
18 "database/sql"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "io"
23 "strconv"
24 "time"
25
26 "gemma.intevation.de/gemma/pkg/common"
27 "gemma.intevation.de/gemma/pkg/wfs"
28 )
29
30 // WaterwayArea is an import job to import
31 // the waterway area in form of polygon geometries
32 // and attribute data from a WFS service.
33 type WaterwayArea struct {
34 // URL the GetCapabilities URL of the WFS service.
35 URL string `json:"url"`
36 // FeatureType selects the feature type of the WFS service.
37 FeatureType string `json:"feature-type"`
38 // SortBy works around misconfigured services to
39 // establish a sort order to get the features.
40 SortBy string `json:"sort-by"`
41 }
42
43 // WAJobKind is the import queue type identifier.
44 const WAJobKind JobKind = "wa"
45
46 type waJobCreator struct{}
47
48 func init() {
49 RegisterJobCreator(WAJobKind, waJobCreator{})
50 }
51
52 func (waJobCreator) Description() string { return "waterway area" }
53
54 func (waJobCreator) AutoAccept() bool { return true }
55
56 func (waJobCreator) Create(_ JobKind, data string) (Job, error) {
57 wa := new(WaterwayArea)
58 if err := common.FromJSONString(data, wa); err != nil {
59 return nil, err
60 }
61 return wa, nil
62 }
63
64 func (waJobCreator) Depends() []string {
65 return []string{
66 "waterway_area",
67 }
68 }
69
70 // StageDone is a NOP for waterway area imports.
71 func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
72 return nil
73 }
74
75 // CleanUp for waterway area imports is a NOP.
76 func (*WaterwayArea) CleanUp() error { return nil }
77
78 type waterwayAreaProperties struct {
79 Catccl *string `json:"ienc_catccl"`
80 Dirimp *string `json:"ienc_dirimp"`
81 }
82
83 const (
84 deleteWaterwayAreaSQL = `DELETE FROM waterway.waterway_area`
85 insertWaterwayAreaSQL = `
86 INSERT INTO waterway.waterway_area (area, catccl, dirimp)
87 VALUES (
88 ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography,
89 $3,
90 $4
91 )`
92 )
93
94 // Do executes the actual waterway axis import.
95 func (wx *WaterwayArea) Do(
96 ctx context.Context,
97 importID int64,
98 conn *sql.Conn,
99 feedback Feedback,
100 ) (interface{}, error) {
101
102 start := time.Now()
103
104 feedback.Info("Import waterway area")
105
106 feedback.Info("Loading capabilities from %s", wx.URL)
107 caps, err := wfs.GetCapabilities(wx.URL)
108 if err != nil {
109 feedback.Error("Loading capabilities failed: %v", err)
110 return nil, err
111 }
112
113 ft := caps.FindFeatureType(wx.FeatureType)
114 if ft == nil {
115 return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
116 }
117
118 feedback.Info("Found feature type '%s", wx.FeatureType)
119
120 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
121 if err != nil {
122 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
123 return nil, err
124 }
125
126 if wx.SortBy != "" {
127 feedback.Info("Features will be sorted by '%s'", wx.SortBy)
128 }
129
130 urls, err := wfs.GetFeaturesGET(
131 caps, wx.FeatureType, "application/json", wx.SortBy)
132 if err != nil {
133 feedback.Error("Cannot create GetFeature URLs. %v", err)
134 return nil, err
135 }
136
137 tx, err := conn.BeginTx(ctx, nil)
138 if err != nil {
139 return nil, err
140 }
141 defer tx.Rollback()
142
143 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAreaSQL)
144 if err != nil {
145 return nil, err
146 }
147 defer insertStmt.Close()
148
149 // Delete the old features.
150 if _, err := tx.ExecContext(ctx, deleteWaterwayAreaSQL); err != nil {
151 return nil, err
152 }
153
154 var (
155 unsupported = stringCounter{}
156 missingProperties int
157 badProperties int
158 features int
159 )
160
161 if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
162 rfc, err := wfs.ParseRawFeatureCollection(r)
163 if err != nil {
164 return fmt.Errorf("parsing GetFeature document failed: %v", err)
165 }
166 if rfc.CRS != nil {
167 crsName := rfc.CRS.Properties.Name
168 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
169 feedback.Error("Unsupported CRS: %d", crsName)
170 return err
171 }
172 }
173
174 // No features -> ignore.
175 if rfc.Features == nil {
176 return nil
177 }
178
179 feedback.Info("Using EPSG: %d", epsg)
180
181 for _, feature := range rfc.Features {
182 if feature.Properties == nil || feature.Geometry.Coordinates == nil {
183 missingProperties++
184 continue
185 }
186
187 var props waterwayAreaProperties
188
189 if err := json.Unmarshal(*feature.Properties, &props); err != nil {
190 badProperties++
191 continue
192 }
193
194 var catccl sql.NullInt64
195 if props.Catccl != nil {
196 if value, err := strconv.ParseInt(*props.Catccl, 10, 64); err == nil {
197 catccl = sql.NullInt64{Int64: value, Valid: true}
198 }
199 }
200 var dirimp sql.NullInt64
201 if props.Dirimp != nil {
202 if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil {
203 dirimp = sql.NullInt64{Int64: value, Valid: true}
204 }
205 }
206
207 switch feature.Geometry.Type {
208 case "Polygon":
209 var p polygon
210 if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
211 return err
212 }
213 if _, err := insertStmt.ExecContext(
214 ctx,
215 p.asWKB(),
216 epsg,
217 catccl,
218 dirimp,
219 ); err != nil {
220 return err
221 }
222 features++
223 default:
224 unsupported[feature.Geometry.Type]++
225 }
226 }
227 return nil
228 }); err != nil {
229 feedback.Error("Downloading features failed: %v", err)
230 return nil, err
231 }
232
233 if features == 0 {
234 err := errors.New("No features found")
235 feedback.Error("%v", err)
236 return nil, err
237 }
238
239 if badProperties > 0 {
240 feedback.Warn("Bad properties: %d", badProperties)
241 }
242
243 if missingProperties > 0 {
244 feedback.Warn("Missing properties: %d", missingProperties)
245 }
246
247 if len(unsupported) != 0 {
248 feedback.Warn("Unsupported types found: %s", unsupported)
249 }
250
251 if err = tx.Commit(); err == nil {
252 feedback.Info("Storing %d features took %s",
253 features, time.Since(start))
254 }
255
256 return nil, err
257 }