comparison pkg/imports/isr.go @ 4214:49564382ffff

Added a import queue job to recalculate the contour lines of the sounding results if the heights have changed.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 16 Aug 2019 13:15:34 +0200
parents
children e453d3bf7663
comparison
equal deleted inserted replaced
4197:5d7ce7f926eb 4214:49564382ffff
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13
14 package imports
15
16 import (
17 "context"
18 "database/sql"
19 "time"
20
21 "gemma.intevation.de/gemma/pkg/octree"
22 )
23
24 type IsoRefresh struct {
25 ClassBreaks string `json:"class-breaks"`
26 }
27
28 // ISRJobKind is the unique name of this import job type.
29 const ISRJobKind JobKind = "isr"
30
31 func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) }
32
33 type isrJobCreator struct{}
34
35 func (isrJobCreator) Description() string { return "refresh iso lines" }
36
37 func (isrJobCreator) AutoAccept() bool { return true }
38
39 func (isrJobCreator) Create() Job { return new(IsoRefresh) }
40
41 func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
42 return nil
43 }
44
45 func (isrJobCreator) Depends() [2][]string {
46 return [2][]string{
47 {"sounding_results", "sounding_results_contour_lines"},
48 {},
49 }
50 }
51
52 const (
53 fetchSoundingResultsIDsSQL = `
54 SELECT bottleneck_id, id
55 FROM waterway.sounding_results
56 ORDER BY bottleneck_id
57 `
58 deleteContourLinesSQL = `
59 DELETE FROM waterway.sounding_results_contour_lines
60 WHERE sounding_result_id = $1
61 `
62 )
63
64 func (isr *IsoRefresh) CleanUp() error { return nil }
65
66 type bottleneckSoundingResults struct {
67 bn int64
68 srs []int64
69 }
70
71 func fetchBottleneckResults(
72 ctx context.Context,
73 conn *sql.Conn,
74 ) ([]bottleneckSoundingResults, error) {
75
76 rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL)
77 if err != nil {
78 return nil, err
79 }
80 defer rows.Close()
81
82 var ids []bottleneckSoundingResults
83
84 for rows.Next() {
85 var bn, sr int64
86 if err := rows.Scan(&bn, sr); err != nil {
87 return nil, err
88 }
89 if len(ids) > 0 {
90 if ids[len(ids)-1].bn != bn {
91 ids = append(ids, bottleneckSoundingResults{
92 bn: bn,
93 srs: []int64{sr},
94 })
95 } else {
96 ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr)
97 }
98 } else {
99 ids = []bottleneckSoundingResults{
100 {bn: bn, srs: []int64{sr}},
101 }
102 }
103 }
104 if err := rows.Err(); err != nil {
105 return nil, err
106 }
107 return ids, nil
108 }
109
110 // Do executes the actual refreshing of the iso lines.
111 func (isr *IsoRefresh) Do(
112 ctx context.Context,
113 importID int64,
114 conn *sql.Conn,
115 feedback Feedback,
116 ) (interface{}, error) {
117
118 start := time.Now()
119 defer func() {
120 feedback.Info(
121 "Processing all sounding results took %v.",
122 time.Since(start))
123 }()
124
125 heights, err := octree.ParseClassBreaks(isr.ClassBreaks)
126 if err != nil {
127 return nil, err
128 }
129
130 bns, err := fetchBottleneckResults(ctx, conn)
131 if err != nil {
132 return nil, err
133 }
134
135 for i := range bns {
136 start := time.Now()
137 err := isr.processBottleneck(
138 ctx, conn,
139 heights,
140 &bns[i],
141 )
142 feedback.Info("Processing bottleneck with ID %d took %v",
143 bns[i].bn,
144 time.Since(start))
145 if err != nil {
146 return nil, err
147 }
148 }
149
150 return nil, nil
151 }
152
153 func (isr *IsoRefresh) processBottleneck(
154 ctx context.Context,
155 conn *sql.Conn,
156 heights []float64,
157 bn *bottleneckSoundingResults,
158 ) error {
159 // Do one transaction per bottleneck.
160 tx, err := conn.BeginTx(ctx, nil)
161 if err != nil {
162 return nil
163 }
164 defer tx.Rollback()
165
166 insertStmt, err := tx.Prepare(insertContourSQL)
167 if err != nil {
168 return err
169 }
170
171 // For all sounding results in bottleneck.
172 for _, sr := range bn.srs {
173 tree, err := octree.FetchOctreeDirectly(ctx, tx, sr)
174 if err != nil {
175 return err
176 }
177 hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z)
178
179 // Delete the old contour lines.
180 if _, err := tx.ExecContext(ctx, deleteContourLinesSQL, sr); err != nil {
181 return err
182 }
183
184 octree.DoContours(tree, hs, func(res *octree.ContourResult) {
185 if err == nil && len(res.Lines) > 0 {
186 _, err = insertStmt.ExecContext(
187 ctx,
188 sr, res.Height, tree.EPSG,
189 res.Lines.AsWKB2D(),
190 contourTolerance)
191 }
192 })
193 if err != nil {
194 return err
195 }
196 }
197
198 return nil
199 }