Mercurial > gemma
comparison pkg/imports/gm.go @ 1637:dd31be75ce6d
Implemented gauge measurement import.
author | Raimund Renkert <raimund.renkert@intevation.de> |
---|---|
date | Thu, 20 Dec 2018 12:06:37 +0100 |
parents | |
children | 2a35bbbb4d93 |
comparison
equal
deleted
inserted
replaced
1636:37ee25bc2bbe | 1637:dd31be75ce6d |
---|---|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 | |
2 // without warranty, see README.md and license for details. | |
3 // | |
4 // SPDX-License-Identifier: AGPL-3.0-or-later | |
5 // License-Filename: LICENSES/AGPL-3.0.txt | |
6 // | |
7 // Copyright (C) 2018 by via donau | |
8 // – Österreichische Wasserstraßen-Gesellschaft mbH | |
9 // Software engineering by Intevation GmbH | |
10 // | |
11 // Author(s): | |
12 // * Raimund Renkert <raimund.renkert@intevation.de> | |
13 package imports | |
14 | |
15 import ( | |
16 "context" | |
17 "database/sql" | |
18 "errors" | |
19 "time" | |
20 | |
21 "gemma.intevation.de/gemma/pkg/common" | |
22 "gemma.intevation.de/gemma/pkg/models" | |
23 "gemma.intevation.de/gemma/pkg/soap/nts" | |
24 ) | |
25 | |
26 type GaugeMeasurement struct { | |
27 URL string `json:"url"` | |
28 Insecure bool `json:"insecure"` | |
29 } | |
30 | |
31 const GMJobKind JobKind = "gm" | |
32 | |
33 const ( | |
34 listGaugesSQL = ` | |
35 SELECT | |
36 (location).country_code, | |
37 (location).locode, | |
38 (location).fairway_section, | |
39 (location).orc, | |
40 (location).hectometre | |
41 FROM waterway.gauges | |
42 WHERE (location).country_code = users.current_user_country()` | |
43 | |
44 hasGaugeMeasurementSQL = ` | |
45 SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1` | |
46 | |
47 insertGMSQL = ` | |
48 INSERT INTO waterway.gauge_measurements ( | |
49 fk_gauge_id, | |
50 measure_date, | |
51 sender, | |
52 language_code, | |
53 date_issue, | |
54 water_level, | |
55 predicted, | |
56 is_waterlevel, | |
57 value_min, | |
58 value_max, | |
59 date_info, | |
60 source_organization | |
61 ) VALUES( | |
62 ($1, $2, $3, $4, $5), | |
63 $6, | |
64 $7, | |
65 $8, | |
66 $9, | |
67 $10, | |
68 $11, | |
69 $12, | |
70 $13, | |
71 $14, | |
72 $15, | |
73 $16 | |
74 ) | |
75 RETURNING id` | |
76 ) | |
77 | |
78 type gmJobCreator struct{} | |
79 | |
80 func init() { | |
81 RegisterJobCreator(GMJobKind, gmJobCreator{}) | |
82 } | |
83 | |
84 func (gmJobCreator) Create(_ JobKind, data string) (Job, error) { | |
85 gm := new(GaugeMeasurement) | |
86 if err := common.FromJSONString(data, gm); err != nil { | |
87 return nil, err | |
88 } | |
89 return gm, nil | |
90 } | |
91 | |
92 func (gmJobCreator) Depends() []string { | |
93 return []string{ | |
94 "waterway.gauges", | |
95 "waterway.gauge_measurements", | |
96 } | |
97 } | |
98 | |
99 // StageDone moves the imported gauge measurement out of the staging area. | |
100 // Currently doing nothing. | |
101 func (gmJobCreator) StageDone( | |
102 ctx context.Context, | |
103 tx *sql.Tx, | |
104 id int64, | |
105 ) error { | |
106 return nil | |
107 } | |
108 | |
109 // CleanUp of a gauge measurement import is a NOP. | |
110 func (gm *GaugeMeasurement) CleanUp() error { return nil } | |
111 | |
112 // Do executes the actual bottleneck import. | |
113 func (gm *GaugeMeasurement) Do( | |
114 ctx context.Context, | |
115 importID int64, | |
116 conn *sql.Conn, | |
117 feedback Feedback, | |
118 ) (interface{}, error) { | |
119 | |
120 // Get available gauges from database for use as filter in SOAP request | |
121 var rows *sql.Rows | |
122 | |
123 rows, err := conn.QueryContext(ctx, listGaugesSQL) | |
124 if err != nil { | |
125 return nil, err | |
126 } | |
127 defer rows.Close() | |
128 | |
129 gauges := []models.GaugeMeasurement{} | |
130 | |
131 for rows.Next() { | |
132 var g models.GaugeMeasurement | |
133 if err = rows.Scan( | |
134 &g.Gauge.CountryCode, | |
135 &g.Gauge.LoCode, | |
136 &g.Gauge.FairwaySection, | |
137 &g.Gauge.Orc, | |
138 &g.Gauge.Hectometre, | |
139 ); err != nil { | |
140 return nil, err | |
141 } | |
142 gauges = append(gauges, g) | |
143 } | |
144 | |
145 if err = rows.Err(); err != nil { | |
146 return nil, err | |
147 } | |
148 | |
149 // TODO get date_issue for selected gauges | |
150 gids, err := gm.doForGM(ctx, gauges, conn, feedback) | |
151 if err != nil { | |
152 feedback.Error("Error processing %d gauges: %s", len(gauges), err) | |
153 } | |
154 if len(gids) == 0 { | |
155 feedback.Error("No new gauge measurements found") | |
156 return nil, errors.New("No new gauge measurements found") | |
157 } | |
158 // TODO: needs to be filled more useful. | |
159 summary := struct { | |
160 GaugeMeasuremets []string `json:"gaugeMeasurements"` | |
161 }{ | |
162 GaugeMeasuremets: gids, | |
163 } | |
164 return &summary, err | |
165 } | |
166 | |
167 func (gm *GaugeMeasurement) doForGM( | |
168 ctx context.Context, | |
169 gauges []models.GaugeMeasurement, | |
170 conn *sql.Conn, | |
171 feedback Feedback, | |
172 ) ([]string, error) { | |
173 start := time.Now() | |
174 | |
175 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) | |
176 | |
177 var idPairs []*nts.Id_pair | |
178 for _, g := range gauges { | |
179 isrs := g.Gauge.String() | |
180 isrsID := nts.Isrs_code_type(isrs) | |
181 idPairs = append(idPairs, &nts.Id_pair{ | |
182 Id: &isrsID, | |
183 }) | |
184 } | |
185 mt := nts.Message_type_typeWRM | |
186 req := &nts.Get_messages_query{ | |
187 Message_type: &mt, | |
188 Ids: idPairs, | |
189 } | |
190 resp, err := client.Get_messages(req) | |
191 if err != nil { | |
192 feedback.Error("%v", err) | |
193 return nil, err | |
194 } | |
195 | |
196 if resp.Result_message == nil { | |
197 err := errors.New("no Gauge Measurements found") | |
198 for i, e := range resp.Result_error { | |
199 feedback.Error("%d: %v", i, string(*e)) | |
200 } | |
201 return nil, err | |
202 } | |
203 | |
204 result := resp.Result_message | |
205 | |
206 tx, err := conn.BeginTx(ctx, nil) | |
207 if err != nil { | |
208 return nil, err | |
209 } | |
210 defer tx.Rollback() | |
211 | |
212 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) | |
213 if err != nil { | |
214 return nil, err | |
215 } | |
216 defer insertStmt.Close() | |
217 | |
218 var gid int64 | |
219 var gids []string | |
220 for _, msg := range result { | |
221 feedback.Info("Found %d gauges with measurements", len(msg.Wrm)) | |
222 for _, wrm := range msg.Wrm { | |
223 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) | |
224 for _, measure := range wrm.Measure { | |
225 isWaterlevel := false | |
226 if *measure.Measure_code == nts.Measure_code_enumWAL { | |
227 isWaterlevel = true | |
228 } | |
229 err = insertStmt.QueryRowContext( | |
230 ctx, | |
231 currIsrs.CountryCode, | |
232 currIsrs.LoCode, | |
233 currIsrs.FairwaySection, | |
234 currIsrs.Orc, | |
235 currIsrs.Hectometre, | |
236 measure.Measuredate, | |
237 msg.Identification.From, | |
238 msg.Identification.Language_code, | |
239 msg.Identification.Date_issue, | |
240 measure.Value, | |
241 measure.Predicted, | |
242 isWaterlevel, | |
243 measure.Value_min, | |
244 measure.Value_max, | |
245 msg.Identification.Date_issue, | |
246 msg.Identification.Originator, | |
247 ).Scan(&gid) | |
248 if err != nil { | |
249 return nil, err | |
250 } | |
251 feedback.Info("Inserted '%s'", currIsrs) | |
252 } | |
253 gids = append(gids, currIsrs.String()) | |
254 } | |
255 } | |
256 feedback.Info("Storing gauge measurements took %s", time.Since(start)) | |
257 if err = tx.Commit(); err == nil { | |
258 feedback.Info("Import of gauge measurements was successful") | |
259 } | |
260 | |
261 return gids, nil | |
262 } |