comparison pkg/imports/dma.go @ 1862:427f86518097

Added distance marks importer in backend.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 17 Jan 2019 13:16:23 +0100
parents
children 2b72f5e005aa
comparison
equal deleted inserted replaced
1861:5083a1d19a4b 1862:427f86518097
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Raimund Renkert <raimund.renkert@intevation.de>
13
14 package imports
15
16 import (
17 "context"
18 "database/sql"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "io"
23 "time"
24
25 "gemma.intevation.de/gemma/pkg/common"
26 "gemma.intevation.de/gemma/pkg/wfs"
27 )
28
29 // FairwayDimension is an import job to import
30 // the fairway dimensions in form of polygon geometries
31 // and attribute data from a WFS service.
32 type DistanceMarksAshore struct {
33 // URL the GetCapabilities URL of the WFS service.
34 URL string `json:"url"`
35 // FeatureType selects the feature type of the WFS service.
36 FeatureType string `json:"feature-type"`
37 // SortBy sorts the feature by this key.
38 SortBy string `json:"sort-by"`
39 }
40
41 // DMAJobKind is the import queue type identifier.
42 const DMAJobKind JobKind = "dma"
43
44 type dmaJobCreator struct{}
45
46 func init() {
47 RegisterJobCreator(DMAJobKind, dmaJobCreator{})
48 }
49
50 func (dmaJobCreator) Description() string { return "distance marks" }
51
52 func (dmaJobCreator) AutoAccept() bool { return true }
53
54 func (dmaJobCreator) Create(_ JobKind, data string) (Job, error) {
55 dma := new(DistanceMarksAshore)
56 if err := common.FromJSONString(data, dma); err != nil {
57 return nil, err
58 }
59 return dma, nil
60 }
61
62 func (dmaJobCreator) Depends() []string {
63 return []string{
64 "distance_marks",
65 }
66 }
67
68 // StageDone is a NOP for distance marks imports.
69 func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
70 return nil
71 }
72
73 // CleanUp for distance marks imports is a NOP.
74 func (*DistanceMarksAshore) CleanUp() error { return nil }
75
76 type distanceMarksAshoreProperties struct {
77 HydroCatdis int `json:"hydro_catdis"`
78 }
79
80 const (
81 deleteDistanceMarksSQL = `
82 WITH resp AS (
83 SELECT best_utm(area::geometry) AS t,
84 ST_Transform(area::geometry, best_utm(area::geometry)) AS a
85 FROM users.responsibility_areas
86 WHERE country = users.current_user_country()
87 )
88 DELETE FROM waterway.distance_marks
89 WHERE ST_Covers(
90 (SELECT a FROM resp),
91 ST_Transform(geom::geometry, (SELECT t FROM resp)))
92 `
93 insertDistanceMarksSQL = `
94 WITH resp AS (
95 SELECT best_utm(area::geometry) AS t,
96 ST_Transform(area::geometry, best_utm(area::geometry)) AS a
97 FROM users.responsibility_areas
98 WHERE country = users.current_user_country()
99 )
100 INSERT INTO waterway.distance_marks (geom, catdis)
101 SELECT ST_Transform(clipped.geom, 4326)::geography, $3 FROM (
102 SELECT (ST_Dump(
103 ST_Intersection(
104 (SELECT a FROM resp),
105 ST_Transform(
106 ST_GeomFromWKB($1, $2::integer),
107 (SELECT t FROM resp)
108 )
109 )
110 )).geom AS geom
111 ) AS clipped
112 WHERE clipped.geom IS NOT NULL
113 `
114 )
115
116 // Do executes the actual fairway dimension import.
117 func (dma *DistanceMarksAshore) Do(
118 ctx context.Context,
119 importID int64,
120 conn *sql.Conn,
121 feedback Feedback,
122 ) (interface{}, error) {
123
124 start := time.Now()
125
126 feedback.Info("Import distance marks")
127
128 feedback.Info("Loading capabilities from %s", dma.URL)
129 caps, err := wfs.GetCapabilities(dma.URL)
130 if err != nil {
131 feedback.Error("Loading capabilities failed: %v", err)
132 return nil, err
133 }
134
135 ft := caps.FindFeatureType(dma.FeatureType)
136 if ft == nil {
137 return nil, fmt.Errorf("Unknown feature type '%s'", dma.FeatureType)
138 }
139
140 feedback.Info("Found feature type '%s'", dma.FeatureType)
141
142 epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
143 if err != nil {
144 feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
145 return nil, err
146 }
147
148 urls, err := wfs.GetFeaturesGET(
149 caps, dma.FeatureType, "application/json", dma.SortBy)
150 if err != nil {
151 feedback.Error("Cannot create GetFeature URLs. %v", err)
152 return nil, err
153 }
154
155 tx, err := conn.BeginTx(ctx, nil)
156 if err != nil {
157 return nil, err
158 }
159 defer tx.Rollback()
160
161 insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL)
162 if err != nil {
163 return nil, err
164 }
165 defer insertStmt.Close()
166
167 // Delete the old features.
168 if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil {
169 return nil, err
170 }
171
172 var (
173 unsupported = stringCounter{}
174 missingProperties int
175 badProperties int
176 features int
177 )
178
179 if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
180 rfc, err := wfs.ParseRawFeatureCollection(r)
181 if err != nil {
182 return fmt.Errorf("parsing GetFeature document failed: %v", err)
183 }
184 if rfc.CRS != nil {
185 crsName := rfc.CRS.Properties.Name
186 if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
187 feedback.Error("Unsupported CRS: %d", crsName)
188 return err
189 }
190 }
191
192 // No features -> ignore.
193 if rfc.Features == nil {
194 return nil
195 }
196
197 feedback.Info("Using EPSG: %d", epsg)
198
199 for _, feature := range rfc.Features {
200 if feature.Geometry.Coordinates == nil {
201 missingProperties++
202 continue
203 }
204
205 var props distanceMarksAshoreProperties
206
207 if err := json.Unmarshal(*feature.Properties, &props); err != nil {
208 badProperties++
209 continue
210 }
211 switch feature.Geometry.Type {
212 case "Point":
213 var p pointSlice
214 if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil {
215 return err
216 }
217 if _, err := insertStmt.ExecContext(
218 ctx,
219 p.asWKB(),
220 epsg,
221 props.HydroCatdis,
222 ); err != nil {
223 feedback.Error("error: %s", err)
224 return err
225 }
226 features++
227 default:
228 unsupported[feature.Geometry.Type]++
229 }
230 }
231 return nil
232 }); err != nil {
233 feedback.Error("Downloading features failed: %v", err)
234 return nil, err
235 }
236
237 if badProperties > 0 {
238 feedback.Warn("Bad properties: %d", badProperties)
239 }
240
241 if missingProperties > 0 {
242 feedback.Warn("Missing properties: %d", missingProperties)
243 }
244
245 if len(unsupported) != 0 {
246 feedback.Warn("Unsupported types found: %s", unsupported)
247 }
248
249 if features == 0 {
250 err := errors.New("No features found")
251 feedback.Error("%v", err)
252 return nil, err
253 }
254
255 if err = tx.Commit(); err == nil {
256 feedback.Info("Storing %d features took %s",
257 features, time.Since(start))
258 }
259
260 return nil, err
261 }