Mercurial > gemma
comparison pkg/imports/gm.go @ 2242:786c3fb7efe1
Gauge measurements import: Re-factored to be re-usable for upcoming uploaded gauge measurements.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 13 Feb 2019 15:38:24 +0100 |
parents | 4ca41516115b |
children | 52093f82a786 |
comparison
equal
deleted
inserted
replaced
2241:5529e1f08dba | 2242:786c3fb7efe1 |
---|---|
15 | 15 |
16 import ( | 16 import ( |
17 "context" | 17 "context" |
18 "database/sql" | 18 "database/sql" |
19 "fmt" | 19 "fmt" |
20 "sort" | |
20 "strings" | 21 "strings" |
21 "time" | 22 "time" |
22 | 23 |
23 "gemma.intevation.de/gemma/pkg/models" | 24 "gemma.intevation.de/gemma/pkg/models" |
24 "gemma.intevation.de/gemma/pkg/soap/nts" | 25 "gemma.intevation.de/gemma/pkg/soap/nts" |
135 } | 136 } |
136 | 137 |
137 // CleanUp of a gauge measurement import is a NOP. | 138 // CleanUp of a gauge measurement import is a NOP. |
138 func (*GaugeMeasurement) CleanUp() error { return nil } | 139 func (*GaugeMeasurement) CleanUp() error { return nil } |
139 | 140 |
140 func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) { | |
141 | |
142 rows, err := tx.QueryContext(ctx, listGaugesSQL) | |
143 if err != nil { | |
144 return nil, err | |
145 } | |
146 defer rows.Close() | |
147 | |
148 var gauges []*gauge | |
149 | |
150 for rows.Next() { | |
151 var g gauge | |
152 if err = rows.Scan( | |
153 &g.location.CountryCode, | |
154 &g.location.LoCode, | |
155 &g.location.FairwaySection, | |
156 &g.location.Orc, | |
157 &g.location.Hectometre, | |
158 ); err != nil { | |
159 return nil, err | |
160 } | |
161 gauges = append(gauges, &g) | |
162 } | |
163 | |
164 if err = rows.Err(); err != nil { | |
165 return nil, err | |
166 } | |
167 | |
168 return gauges, nil | |
169 } | |
170 | |
171 // Do executes the actual bottleneck import. | 141 // Do executes the actual bottleneck import. |
172 func (gm *GaugeMeasurement) Do( | 142 func (gm *GaugeMeasurement) Do( |
173 ctx context.Context, | 143 ctx context.Context, |
174 importID int64, | 144 importID int64, |
175 conn *sql.Conn, | 145 conn *sql.Conn, |
176 feedback Feedback, | 146 feedback Feedback, |
177 ) (interface{}, error) { | 147 ) (interface{}, error) { |
178 | 148 |
149 fetch := func() ([]*nts.RIS_Message_Type, error) { | |
150 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) | |
151 | |
152 mt := nts.Message_type_typeWRM | |
153 | |
154 req := &nts.Get_messages_query{ | |
155 Message_type: &mt, | |
156 } | |
157 | |
158 resp, err := client.Get_messages(req) | |
159 if err != nil { | |
160 return nil, err | |
161 } | |
162 | |
163 result := resp.Result_message | |
164 if result == nil { | |
165 for _, e := range resp.Result_error { | |
166 if e != nil { | |
167 feedback.Error("Error code: %s", *e) | |
168 } else { | |
169 feedback.Error("Unknown error") | |
170 } | |
171 } | |
172 } | |
173 return result, nil | |
174 } | |
175 | |
176 return storeGaugeMeasurements( | |
177 ctx, | |
178 importID, | |
179 fetch, | |
180 conn, | |
181 feedback, | |
182 ) | |
183 } | |
184 | |
185 func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) { | |
186 | |
187 rows, err := tx.QueryContext(ctx, listGaugesSQL) | |
188 if err != nil { | |
189 return nil, err | |
190 } | |
191 defer rows.Close() | |
192 | |
193 var gauges []string | |
194 | |
195 for rows.Next() { | |
196 var g models.Isrs | |
197 if err = rows.Scan( | |
198 &g.CountryCode, | |
199 &g.LoCode, | |
200 &g.FairwaySection, | |
201 &g.Orc, | |
202 &g.Hectometre, | |
203 ); err != nil { | |
204 return nil, err | |
205 } | |
206 gauges = append(gauges, g.String()) | |
207 } | |
208 | |
209 if err = rows.Err(); err != nil { | |
210 return nil, err | |
211 } | |
212 | |
213 sort.Strings(gauges) | |
214 | |
215 return gauges, nil | |
216 } | |
217 | |
218 func storeGaugeMeasurements( | |
219 ctx context.Context, | |
220 importID int64, | |
221 fetch func() ([]*nts.RIS_Message_Type, error), | |
222 conn *sql.Conn, | |
223 feedback Feedback, | |
224 ) (interface{}, error) { | |
225 | |
179 start := time.Now() | 226 start := time.Now() |
180 | 227 |
181 tx, err := conn.BeginTx(ctx, nil) | 228 tx, err := conn.BeginTx(ctx, nil) |
182 if err != nil { | 229 if err != nil { |
183 return nil, err | 230 return nil, err |
189 if err != nil { | 236 if err != nil { |
190 return nil, err | 237 return nil, err |
191 } | 238 } |
192 | 239 |
193 // TODO get date_issue for selected gauges | 240 // TODO get date_issue for selected gauges |
194 gids, err := gm.doForGM(ctx, gauges, tx, feedback) | 241 gids, err := doForGM(ctx, gauges, fetch, tx, feedback) |
195 if err != nil { | 242 if err != nil { |
196 feedback.Error("Error processing %d gauges: %v", len(gauges), err) | 243 feedback.Error("Error processing %d gauges: %v", len(gauges), err) |
197 return nil, err | 244 return nil, err |
198 } | 245 } |
199 | 246 |
243 | 290 |
244 fn := func(x float32) float32 { return scale * x } | 291 fn := func(x float32) float32 { return scale * x } |
245 return fn, nil | 292 return fn, nil |
246 } | 293 } |
247 | 294 |
248 func (gm *GaugeMeasurement) doForGM( | 295 func doForGM( |
249 ctx context.Context, | 296 ctx context.Context, |
250 gauges []*gauge, | 297 gauges []string, |
298 fetch func() ([]*nts.RIS_Message_Type, error), | |
251 tx *sql.Tx, | 299 tx *sql.Tx, |
252 feedback Feedback, | 300 feedback Feedback, |
253 ) ([]string, error) { | 301 ) ([]string, error) { |
254 | 302 |
255 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) | |
256 | |
257 mt := nts.Message_type_typeWRM | |
258 | |
259 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) | 303 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) |
260 if err != nil { | 304 if err != nil { |
261 return nil, err | 305 return nil, err |
262 } | 306 } |
263 defer insertStmt.Close() | 307 defer insertStmt.Close() |
264 | 308 |
309 // lookup to see if we have gauges in the database. | |
310 isKnown := func(s string) bool { | |
311 idx := sort.SearchStrings(gauges, s) | |
312 return idx < len(gauges) && gauges[idx] == s | |
313 } | |
314 | |
315 result, err := fetch() | |
316 if err != nil { | |
317 return nil, err | |
318 } | |
319 | |
265 var gids []string | 320 var gids []string |
266 for _, g := range gauges { | 321 for _, msg := range result { |
267 | 322 var gid int64 |
268 isrsID := nts.Isrs_code_type(g.location.String()) | 323 for _, wrm := range msg.Wrm { |
269 idPair := []*nts.Id_pair{{Id: &isrsID}} | 324 curr := string(*wrm.Geo_object.Id) |
270 | 325 currIsrs, err := models.IsrsFromString(curr) |
271 req := &nts.Get_messages_query{ | 326 if err != nil { |
272 Message_type: &mt, | 327 feedback.Warn("Invalid ISRS code %v", err) |
273 Ids: idPair, | 328 continue |
274 } | 329 } |
275 | 330 feedback.Info("Found measurements for %s", curr) |
276 resp, err := client.Get_messages(req) | 331 if !isKnown(curr) { |
277 if err != nil { | 332 feedback.Warn("Gauge '%s' is not in database.", curr) |
278 return nil, err | 333 continue |
279 } | 334 } |
280 | 335 |
281 if resp.Result_message == nil { | 336 var referenceCode string |
282 for _, e := range resp.Result_error { | 337 if wrm.Reference_code == nil { |
283 if e != nil { | 338 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") |
284 feedback.Error("No gauge measurements found for %s", g.location) | 339 referenceCode = "ZPG" |
340 } else { | |
341 referenceCode = string(*wrm.Reference_code) | |
342 } | |
343 for _, measure := range wrm.Measure { | |
344 var unit string | |
345 if measure.Unit == nil { | |
346 feedback.Info("'Unit' not specified. Assuming 'cm'") | |
347 unit = "cm" | |
285 } else { | 348 } else { |
286 feedback.Error("unknown") | 349 unit = string(*measure.Unit) |
287 } | 350 } |
288 } | 351 convert, err := rescale(unit) |
289 continue | |
290 } | |
291 result := resp.Result_message | |
292 | |
293 for _, msg := range result { | |
294 var gid int64 | |
295 feedback.Info("Found measurements for %s", g.location) | |
296 for _, wrm := range msg.Wrm { | |
297 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) | |
298 if err != nil { | 352 if err != nil { |
299 feedback.Warn("Invalid ISRS code %v", err) | 353 return nil, err |
300 continue | |
301 } | 354 } |
302 var referenceCode string | 355 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL |
303 if wrm.Reference_code == nil { | 356 err = insertStmt.QueryRowContext( |
304 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") | 357 ctx, |
305 referenceCode = "ZPG" | 358 currIsrs.CountryCode, |
306 } else { | 359 currIsrs.LoCode, |
307 referenceCode = string(*wrm.Reference_code) | 360 currIsrs.FairwaySection, |
361 currIsrs.Orc, | |
362 currIsrs.Hectometre, | |
363 measure.Measuredate, | |
364 msg.Identification.From, | |
365 msg.Identification.Language_code, | |
366 msg.Identification.Country_code, | |
367 msg.Identification.Date_issue, | |
368 referenceCode, | |
369 convert(measure.Value), | |
370 measure.Predicted, | |
371 isWaterlevel, | |
372 convert(measure.Value_min), | |
373 convert(measure.Value_max), | |
374 msg.Identification.Date_issue, | |
375 msg.Identification.Originator, | |
376 true, // staging_done | |
377 ).Scan(&gid) | |
378 if err != nil { | |
379 return nil, err | |
308 } | 380 } |
309 for _, measure := range wrm.Measure { | 381 } |
310 var unit string | 382 feedback.Info("Inserted %d measurements for %s", |
311 if measure.Unit == nil { | 383 len(wrm.Measure), curr) |
312 feedback.Info("'Unit' not specified. Assuming 'cm'") | 384 gids = append(gids, curr) |
313 unit = "cm" | |
314 } else { | |
315 unit = string(*measure.Unit) | |
316 } | |
317 convert, err := rescale(unit) | |
318 if err != nil { | |
319 return nil, err | |
320 } | |
321 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL | |
322 err = insertStmt.QueryRowContext( | |
323 ctx, | |
324 currIsrs.CountryCode, | |
325 currIsrs.LoCode, | |
326 currIsrs.FairwaySection, | |
327 currIsrs.Orc, | |
328 currIsrs.Hectometre, | |
329 measure.Measuredate, | |
330 msg.Identification.From, | |
331 msg.Identification.Language_code, | |
332 msg.Identification.Country_code, | |
333 msg.Identification.Date_issue, | |
334 referenceCode, | |
335 convert(measure.Value), | |
336 measure.Predicted, | |
337 isWaterlevel, | |
338 convert(measure.Value_min), | |
339 convert(measure.Value_max), | |
340 msg.Identification.Date_issue, | |
341 msg.Identification.Originator, | |
342 true, // staging_done | |
343 ).Scan(&gid) | |
344 if err != nil { | |
345 return nil, err | |
346 } | |
347 } | |
348 feedback.Info("Inserted %d measurements for %s", | |
349 len(wrm.Measure), currIsrs) | |
350 gids = append(gids, currIsrs.String()) | |
351 } | |
352 } | 385 } |
353 } | 386 } |
354 return gids, nil | 387 return gids, nil |
355 } | 388 } |