Mercurial > gemma
comparison pkg/imports/isr.go @ 4214:49564382ffff
Added a import queue job to recalculate the contour lines of the sounding results if the heights have changed.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 16 Aug 2019 13:15:34 +0200 |
parents | |
children | e453d3bf7663 |
comparison
equal
deleted
inserted
replaced
4197:5d7ce7f926eb | 4214:49564382ffff |
---|---|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 | |
2 // without warranty, see README.md and license for details. | |
3 // | |
4 // SPDX-License-Identifier: AGPL-3.0-or-later | |
5 // License-Filename: LICENSES/AGPL-3.0.txt | |
6 // | |
7 // Copyright (C) 2018 by via donau | |
8 // – Österreichische Wasserstraßen-Gesellschaft mbH | |
9 // Software engineering by Intevation GmbH | |
10 // | |
11 // Author(s): | |
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | |
13 | |
14 package imports | |
15 | |
16 import ( | |
17 "context" | |
18 "database/sql" | |
19 "time" | |
20 | |
21 "gemma.intevation.de/gemma/pkg/octree" | |
22 ) | |
23 | |
24 type IsoRefresh struct { | |
25 ClassBreaks string `json:"class-breaks"` | |
26 } | |
27 | |
28 // ISRJobKind is the unique name of this import job type. | |
29 const ISRJobKind JobKind = "isr" | |
30 | |
31 func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) } | |
32 | |
33 type isrJobCreator struct{} | |
34 | |
35 func (isrJobCreator) Description() string { return "refresh iso lines" } | |
36 | |
37 func (isrJobCreator) AutoAccept() bool { return true } | |
38 | |
39 func (isrJobCreator) Create() Job { return new(IsoRefresh) } | |
40 | |
41 func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error { | |
42 return nil | |
43 } | |
44 | |
45 func (isrJobCreator) Depends() [2][]string { | |
46 return [2][]string{ | |
47 {"sounding_results", "sounding_results_contour_lines"}, | |
48 {}, | |
49 } | |
50 } | |
51 | |
52 const ( | |
53 fetchSoundingResultsIDsSQL = ` | |
54 SELECT bottleneck_id, id | |
55 FROM waterway.sounding_results | |
56 ORDER BY bottleneck_id | |
57 ` | |
58 deleteContourLinesSQL = ` | |
59 DELETE FROM waterway.sounding_results_contour_lines | |
60 WHERE sounding_result_id = $1 | |
61 ` | |
62 ) | |
63 | |
64 func (isr *IsoRefresh) CleanUp() error { return nil } | |
65 | |
66 type bottleneckSoundingResults struct { | |
67 bn int64 | |
68 srs []int64 | |
69 } | |
70 | |
71 func fetchBottleneckResults( | |
72 ctx context.Context, | |
73 conn *sql.Conn, | |
74 ) ([]bottleneckSoundingResults, error) { | |
75 | |
76 rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL) | |
77 if err != nil { | |
78 return nil, err | |
79 } | |
80 defer rows.Close() | |
81 | |
82 var ids []bottleneckSoundingResults | |
83 | |
84 for rows.Next() { | |
85 var bn, sr int64 | |
86 if err := rows.Scan(&bn, sr); err != nil { | |
87 return nil, err | |
88 } | |
89 if len(ids) > 0 { | |
90 if ids[len(ids)-1].bn != bn { | |
91 ids = append(ids, bottleneckSoundingResults{ | |
92 bn: bn, | |
93 srs: []int64{sr}, | |
94 }) | |
95 } else { | |
96 ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr) | |
97 } | |
98 } else { | |
99 ids = []bottleneckSoundingResults{ | |
100 {bn: bn, srs: []int64{sr}}, | |
101 } | |
102 } | |
103 } | |
104 if err := rows.Err(); err != nil { | |
105 return nil, err | |
106 } | |
107 return ids, nil | |
108 } | |
109 | |
110 // Do executes the actual refreshing of the iso lines. | |
111 func (isr *IsoRefresh) Do( | |
112 ctx context.Context, | |
113 importID int64, | |
114 conn *sql.Conn, | |
115 feedback Feedback, | |
116 ) (interface{}, error) { | |
117 | |
118 start := time.Now() | |
119 defer func() { | |
120 feedback.Info( | |
121 "Processing all sounding results took %v.", | |
122 time.Since(start)) | |
123 }() | |
124 | |
125 heights, err := octree.ParseClassBreaks(isr.ClassBreaks) | |
126 if err != nil { | |
127 return nil, err | |
128 } | |
129 | |
130 bns, err := fetchBottleneckResults(ctx, conn) | |
131 if err != nil { | |
132 return nil, err | |
133 } | |
134 | |
135 for i := range bns { | |
136 start := time.Now() | |
137 err := isr.processBottleneck( | |
138 ctx, conn, | |
139 heights, | |
140 &bns[i], | |
141 ) | |
142 feedback.Info("Processing bottleneck with ID %d took %v", | |
143 bns[i].bn, | |
144 time.Since(start)) | |
145 if err != nil { | |
146 return nil, err | |
147 } | |
148 } | |
149 | |
150 return nil, nil | |
151 } | |
152 | |
153 func (isr *IsoRefresh) processBottleneck( | |
154 ctx context.Context, | |
155 conn *sql.Conn, | |
156 heights []float64, | |
157 bn *bottleneckSoundingResults, | |
158 ) error { | |
159 // Do one transaction per bottleneck. | |
160 tx, err := conn.BeginTx(ctx, nil) | |
161 if err != nil { | |
162 return nil | |
163 } | |
164 defer tx.Rollback() | |
165 | |
166 insertStmt, err := tx.Prepare(insertContourSQL) | |
167 if err != nil { | |
168 return err | |
169 } | |
170 | |
171 // For all sounding results in bottleneck. | |
172 for _, sr := range bn.srs { | |
173 tree, err := octree.FetchOctreeDirectly(ctx, tx, sr) | |
174 if err != nil { | |
175 return err | |
176 } | |
177 hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z) | |
178 | |
179 // Delete the old contour lines. | |
180 if _, err := tx.ExecContext(ctx, deleteContourLinesSQL, sr); err != nil { | |
181 return err | |
182 } | |
183 | |
184 octree.DoContours(tree, hs, func(res *octree.ContourResult) { | |
185 if err == nil && len(res.Lines) > 0 { | |
186 _, err = insertStmt.ExecContext( | |
187 ctx, | |
188 sr, res.Height, tree.EPSG, | |
189 res.Lines.AsWKB2D(), | |
190 contourTolerance) | |
191 } | |
192 }) | |
193 if err != nil { | |
194 return err | |
195 } | |
196 } | |
197 | |
198 return nil | |
199 } |