comparison pkg/imports/isr.go @ 5421:c9da747d4109 marking-single-beam

Extended ISR import job to handle marking scans as well.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 08 Jul 2021 01:46:03 +0200
parents 34bc6041e61e
children b8d5f1cd15fb
comparison
equal deleted inserted replaced
5420:851c14d57680 5421:c9da747d4109
14 package imports 14 package imports
15 15
16 import ( 16 import (
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "log"
19 "time" 20 "time"
20 21
21 "gemma.intevation.de/gemma/pkg/common"
22 "gemma.intevation.de/gemma/pkg/mesh" 22 "gemma.intevation.de/gemma/pkg/mesh"
23 "gemma.intevation.de/gemma/pkg/models"
23 ) 24 )
24 25
25 // IsoRefresh is an import job to refresh the pre-calculated 26 // IsoRefresh is an import job to refresh the pre-calculated
26 // iso morphology data when the class breaks were changed. 27 // iso morphology data when the class breaks were changed.
27 type IsoRefresh struct { 28 type IsoRefresh struct {
50 return srJobCreator{}.Depends() 51 return srJobCreator{}.Depends()
51 } 52 }
52 53
53 const ( 54 const (
54 fetchSoundingResultsIDsSQL = ` 55 fetchSoundingResultsIDsSQL = `
55 SELECT bottleneck_id, id 56 SELECT
57 bottleneck_id,
58 id,
59 surtyp
56 FROM waterway.sounding_results 60 FROM waterway.sounding_results
57 ORDER BY bottleneck_id 61 ORDER BY bottleneck_id`
58 ` 62
59 deleteIsoAreasSQL = ` 63 deleteIsoAreasSQL = `
60 DELETE FROM waterway.sounding_results_iso_areas 64 DELETE FROM waterway.sounding_results_iso_areas
61 WHERE sounding_result_id = $1 65 WHERE sounding_result_id = $1`
62 ` 66
67 fetchMarkingPointsSQL = `
68 SELECT ST_AsBinary(ST_Transform(points, 4326))
69 FROM waterway.sounding_results_marking_points
70 WHERE sounding_result_id = $1`
71
72 deleteMarkingPointsSQL = `
73 DELETE FROM waterway.sounding_results_marking_points
74 WHERE sounding_result_id = $1`
63 ) 75 )
64 76
65 // CleanUp of a iso refresh import is a NOP. 77 // CleanUp of a iso refresh import is a NOP.
66 func (isr *IsoRefresh) CleanUp() error { return nil } 78 func (isr *IsoRefresh) CleanUp() error { return nil }
67 79
68 type bottleneckSoundingResults struct { 80 type (
69 bn string 81 scanResult struct {
70 srs []int64 82 id int64
71 } 83 surveyType models.SurveyType
84 }
85 bottleneckSoundingResults struct {
86 bn string
87 srs []scanResult
88 }
89 )
72 90
73 func fetchBottleneckResults( 91 func fetchBottleneckResults(
74 ctx context.Context, 92 ctx context.Context,
75 conn *sql.Conn, 93 conn *sql.Conn,
76 ) ([]bottleneckSoundingResults, error) { 94 ) ([]bottleneckSoundingResults, error) {
79 if err != nil { 97 if err != nil {
80 return nil, err 98 return nil, err
81 } 99 }
82 defer rows.Close() 100 defer rows.Close()
83 101
84 var ids []bottleneckSoundingResults 102 var bsrs []bottleneckSoundingResults
85 103
86 for rows.Next() { 104 for rows.Next() {
87 var bn string 105 var (
88 var sr int64 106 bn string
89 if err := rows.Scan(&bn, &sr); err != nil { 107 id int64
108 st string
109 )
110 if err := rows.Scan(&bn, &id, &st); err != nil {
90 return nil, err 111 return nil, err
91 } 112 }
92 if len(ids) > 0 { 113 sr := scanResult{id: id, surveyType: models.SurveyType(st)}
93 if ids[len(ids)-1].bn != bn { 114
94 ids = append(ids, bottleneckSoundingResults{ 115 if l := len(bsrs); l == 0 || bsrs[l-1].bn != bn {
95 bn: bn, 116 bsrs = append(bsrs, bottleneckSoundingResults{
96 srs: []int64{sr}, 117 bn: bn,
97 }) 118 srs: []scanResult{sr},
98 } else { 119 })
99 ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
100 }
101 } else { 120 } else {
102 ids = []bottleneckSoundingResults{ 121 bsrs[l-1].srs = append(bsrs[l-1].srs, sr)
103 {bn: bn, srs: []int64{sr}},
104 }
105 } 122 }
106 } 123 }
107 if err := rows.Err(); err != nil { 124 if err := rows.Err(); err != nil {
108 return nil, err 125 return nil, err
109 } 126 }
110 return ids, nil 127 return bsrs, nil
111 } 128 }
112 129
113 // Do executes the actual refreshing of the iso areas. 130 // Do executes the actual refreshing of the iso areas.
114 func (isr *IsoRefresh) Do( 131 func (isr *IsoRefresh) Do(
115 ctx context.Context, 132 ctx context.Context,
165 if err != nil { 182 if err != nil {
166 return nil 183 return nil
167 } 184 }
168 defer tx.Rollback() 185 defer tx.Rollback()
169 186
170 insertAreasStmt, err := tx.Prepare(insertIsoAreasSQL) 187 var (
171 if err != nil { 188 insertAreasStmt,
172 return err 189 deleteAreasStmt,
173 } 190 fetchMarkingPointsStmt,
174 defer insertAreasStmt.Close() 191 deleteMarkingPointsStmt,
192 insertMarkingPointsStmt *sql.Stmt
193 )
194
195 for _, x := range []struct {
196 stmt **sql.Stmt
197 query string
198 }{
199 {&insertAreasStmt, insertIsoAreasSQL},
200 {&deleteAreasStmt, deleteIsoAreasSQL},
201 {&fetchMarkingPointsStmt, fetchMarkingPointsSQL},
202 {&deleteMarkingPointsStmt, deleteMarkingPointsSQL},
203 {&insertMarkingPointsStmt, insertMarkingPointsSQL},
204 } {
205 if *x.stmt, err = tx.PrepareContext(ctx, x.query); err != nil {
206 return err
207 }
208 defer (*x.stmt).Close()
209 }
175 210
176 // For all sounding results in bottleneck. 211 // For all sounding results in bottleneck.
177 for _, sr := range bn.srs { 212 for _, sr := range bn.srs {
178 tree, err := mesh.FetchMeshDirectly(ctx, tx, sr) 213 switch sr.surveyType {
179 if err != nil { 214 case models.SurveyTypeMarking:
180 return err 215
181 } 216 // Read all points back in.
182 hs := heights.ExtrapolateClassBreaks(tree.Min().Z, tree.Max().Z) 217
183 hs = common.DedupFloat64s(hs) 218 var points mesh.MultiPointZ
184 219
185 // Delete the old iso areas. 220 if err := func() error {
186 if _, err := tx.ExecContext(ctx, deleteIsoAreasSQL, sr); err != nil { 221 rows, err := fetchMarkingPointsStmt.QueryContext(ctx, sr.id)
187 return err 222 if err != nil {
188 } 223 return err
189 224 }
190 // Calculate and store the iso areas. 225 defer rows.Close()
191 box := mesh.Box2D{ 226
192 X1: tree.Min().X, 227 for rows.Next() {
193 Y1: tree.Min().Y, 228 var buf []byte
194 X2: tree.Max().X, 229 if err := rows.Scan(&buf); err != nil {
195 Y2: tree.Max().Y, 230 return err
196 } 231 }
197 232 var npoints mesh.MultiPointZ
198 raster := mesh.NewRaster(box, isoCellSize) 233 if err := npoints.FromWKB(buf); err != nil {
199 raster.Rasterize(tree.Value) 234 return err
200 areas := raster.Trace(hs) 235 }
201 236 points = append(points, npoints...)
202 for i, a := range areas { 237 }
203 if len(a) == 0 { 238 return rows.Err()
204 continue 239 }(); err != nil {
205 }
206 if _, err := insertAreasStmt.ExecContext(
207 ctx,
208 sr, hs[i], tree.EPSG(),
209 a.AsWKB(),
210 contourTolerance,
211 ); err != nil {
212 return err 240 return err
213 } 241 }
242
243 // Delete old points
244 if _, err := deleteMarkingPointsStmt.ExecContext(ctx, sr.id); err != nil {
245 return err
246 }
247
248 // Re-classify points.
249 classes := heights.Classify(points.All())
250
251 // Should not happen ... Z values over the top.
252 if n := len(classes) - 1; n > 1 && len(classes[n]) > 0 {
253 // Place the over the top values to the class below.
254 classes[n-1] = append(classes[n-1], classes[n]...)
255 classes[n] = nil
256 classes = classes[:n]
257 }
258
259 // Re-insert points
260 for i, class := range classes {
261 // Ignore empty classes
262 if len(class) == 0 {
263 continue
264 }
265 if _, err := insertMarkingPointsStmt.ExecContext(
266 ctx, sr.id, heights[i], 4326, class.AsWKB(),
267 ); err != nil {
268 return err
269 }
270 }
271
272 case models.SurveyTypeMultiBeam, models.SurveyTypeSingleBeam:
273 tree, err := mesh.FetchMeshDirectly(ctx, tx, sr.id)
274 if err != nil {
275 return err
276 }
277 hs := heights.ExtrapolateClassBreaks(tree.Min().Z, tree.Max().Z).Dedup()
278
279 // Delete the old iso areas.
280 if _, err := deleteAreasStmt.ExecContext(ctx, sr); err != nil {
281 return err
282 }
283
284 // Calculate and store the iso areas.
285 box := mesh.Box2D{
286 X1: tree.Min().X,
287 Y1: tree.Min().Y,
288 X2: tree.Max().X,
289 Y2: tree.Max().Y,
290 }
291
292 raster := mesh.NewRaster(box, isoCellSize)
293 raster.Rasterize(tree.Value)
294 areas := raster.Trace(hs)
295
296 for i, a := range areas {
297 if len(a) == 0 {
298 continue
299 }
300 if _, err := insertAreasStmt.ExecContext(
301 ctx,
302 sr, hs[i], tree.EPSG(),
303 a.AsWKB(),
304 contourTolerance,
305 ); err != nil {
306 return err
307 }
308 }
309 default:
310 log.Printf("error: unknown survey type '%s'\n", sr.surveyType)
214 } 311 }
215 } 312 }
216 313
217 return tx.Commit() 314 return tx.Commit()
218 } 315 }