Mercurial > gemma
comparison pkg/imports/gm.go @ 3277:232fc90e6ee2
Disentangle gauge measurements and predictions
Representing both in one table has led to the necessity to make the
distinction at many places such as statements, definitions of partial
indexes and application code. At least in one place in the AGM
import the distinction in application code was too late and
measurements matching an approved measurement could have been missed.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 15 May 2019 19:08:49 +0200 |
parents | 3dee5cf16a58 |
children | 831193935739 |
comparison
equal
deleted
inserted
replaced
3276:75db3199f76e | 3277:232fc90e6ee2 |
---|---|
22 "strings" | 22 "strings" |
23 "time" | 23 "time" |
24 | 24 |
25 "gemma.intevation.de/gemma/pkg/models" | 25 "gemma.intevation.de/gemma/pkg/models" |
26 "gemma.intevation.de/gemma/pkg/soap/nts" | 26 "gemma.intevation.de/gemma/pkg/soap/nts" |
27 "github.com/jackc/pgx/pgtype" | |
27 ) | 28 ) |
28 | 29 |
29 // GaugeMeasurement is an import job to import | 30 // GaugeMeasurement is an import job to import |
30 // gauges measurement data from a NtS SOAP service. | 31 // gauges measurement data from a NtS SOAP service. |
31 type GaugeMeasurement struct { | 32 type GaugeMeasurement struct { |
51 language_code, | 52 language_code, |
52 country_code, | 53 country_code, |
53 date_issue, | 54 date_issue, |
54 reference_code, | 55 reference_code, |
55 water_level, | 56 water_level, |
56 predicted, | |
57 is_waterlevel, | 57 is_waterlevel, |
58 value_min, | |
59 value_max, | |
60 date_info, | 58 date_info, |
61 source_organization, | 59 source_organization, |
62 staging_done | 60 staging_done |
63 ) VALUES( | 61 ) VALUES( |
64 ($1, $2, $3, $4, $5), | 62 ($1, $2, $3, $4, $5), |
70 $11, | 68 $11, |
71 $12, | 69 $12, |
72 $13, | 70 $13, |
73 $14, | 71 $14, |
74 $15, | 72 $15, |
75 $16, | 73 true |
76 $17, | |
77 $18, | |
78 $19 | |
79 ) | 74 ) |
80 ON CONFLICT DO NOTHING | 75 ON CONFLICT DO NOTHING |
81 RETURNING id | 76 RETURNING 1 |
77 ` | |
78 | |
79 insertGPSQL = ` | |
80 INSERT INTO waterway.gauge_predictions ( | |
81 fk_gauge_id, | |
82 measure_date, | |
83 sender, | |
84 language_code, | |
85 country_code, | |
86 date_issue, | |
87 reference_code, | |
88 water_level, | |
89 is_waterlevel, | |
90 conf_interval, | |
91 date_info, | |
92 source_organization | |
93 ) VALUES( | |
94 ($1, $2, $3, $4, $5), | |
95 $6, | |
96 $7, | |
97 $8, | |
98 $9, | |
99 $10, | |
100 $11, | |
101 $12, | |
102 $13, | |
103 $14, | |
104 $15, | |
105 $16 | |
106 ) | |
107 ON CONFLICT DO NOTHING | |
108 RETURNING 1 | |
82 ` | 109 ` |
83 ) | 110 ) |
84 | 111 |
85 type gmJobCreator struct{} | 112 type gmJobCreator struct{} |
86 | 113 |
222 fetch func() ([]*nts.RIS_Message_Type, error), | 249 fetch func() ([]*nts.RIS_Message_Type, error), |
223 conn *sql.Conn, | 250 conn *sql.Conn, |
224 feedback Feedback, | 251 feedback Feedback, |
225 ) ([]string, error) { | 252 ) ([]string, error) { |
226 | 253 |
227 insertStmt, err := conn.PrepareContext(ctx, insertGMSQL) | 254 insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL) |
228 if err != nil { | 255 if err != nil { |
229 return nil, err | 256 return nil, err |
230 } | 257 } |
231 defer insertStmt.Close() | 258 defer insertGPStmt.Close() |
259 | |
260 insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL) | |
261 if err != nil { | |
262 return nil, err | |
263 } | |
264 defer insertGMStmt.Close() | |
232 | 265 |
233 result, err := fetch() | 266 result, err := fetch() |
234 if err != nil { | 267 if err != nil { |
235 return nil, err | 268 return nil, err |
236 } | 269 } |
237 | 270 |
238 var gids []string | 271 var gids []string |
239 for _, msg := range result { | 272 for _, msg := range result { |
240 var gid int64 | 273 var dummy int |
241 for _, wrm := range msg.Wrm { | 274 for _, wrm := range msg.Wrm { |
242 curr := string(*wrm.Geo_object.Id) | 275 curr := string(*wrm.Geo_object.Id) |
243 currIsrs, err := models.IsrsFromString(curr) | 276 currIsrs, err := models.IsrsFromString(curr) |
244 if err != nil { | 277 if err != nil { |
245 feedback.Warn("Invalid ISRS code %v", err) | 278 feedback.Warn("Invalid ISRS code %v", err) |
246 continue | 279 continue |
247 } | 280 } |
248 feedback.Info("Found measurements for %s", curr) | 281 feedback.Info("Found measurements/predictions for %s", curr) |
249 | 282 |
250 var referenceCode string | 283 var referenceCode string |
251 if wrm.Reference_code == nil { | 284 if wrm.Reference_code == nil { |
252 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") | 285 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") |
253 referenceCode = "ZPG" | 286 referenceCode = "ZPG" |
254 } else { | 287 } else { |
255 referenceCode = string(*wrm.Reference_code) | 288 referenceCode = string(*wrm.Reference_code) |
256 } | 289 } |
257 | 290 |
258 var newCnt int = 0 | 291 newM, newP := 0, 0 |
259 for _, measure := range wrm.Measure { | 292 for _, measure := range wrm.Measure { |
260 var unit string | 293 var unit string |
261 if measure.Unit == nil { | 294 if measure.Unit == nil { |
262 feedback.Info("'Unit' not specified. Assuming 'cm'") | 295 feedback.Info("'Unit' not specified. Assuming 'cm'") |
263 unit = "cm" | 296 unit = "cm" |
271 convert(measure.Value) | 304 convert(measure.Value) |
272 convert(measure.Value_min) | 305 convert(measure.Value_min) |
273 convert(measure.Value_max) | 306 convert(measure.Value_max) |
274 | 307 |
275 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL | 308 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL |
276 err = insertStmt.QueryRowContext( | 309 |
277 ctx, | 310 if measure.Predicted { |
278 currIsrs.CountryCode, | 311 var confInterval pgtype.Numrange |
279 currIsrs.LoCode, | 312 if measure.Value_min != nil && measure.Value_max != nil { |
280 currIsrs.FairwaySection, | 313 var valueMin, valueMax pgtype.Numeric |
281 currIsrs.Orc, | 314 valueMin.Set(measure.Value_min) |
282 currIsrs.Hectometre, | 315 valueMax.Set(measure.Value_max) |
283 measure.Measuredate, | 316 confInterval = pgtype.Numrange{ |
284 msg.Identification.From, | 317 Lower: valueMin, |
285 msg.Identification.Language_code, | 318 Upper: valueMax, |
286 msg.Identification.Country_code, | 319 LowerType: pgtype.Inclusive, |
287 msg.Identification.Date_issue, | 320 UpperType: pgtype.Inclusive, |
288 referenceCode, | 321 Status: pgtype.Present, |
289 measure.Value, | 322 } |
290 measure.Predicted, | 323 } |
291 isWaterlevel, | 324 err = insertGPStmt.QueryRowContext( |
292 measure.Value_min, | 325 ctx, |
293 measure.Value_max, | 326 currIsrs.CountryCode, |
294 msg.Identification.Date_issue, | 327 currIsrs.LoCode, |
295 msg.Identification.Originator, | 328 currIsrs.FairwaySection, |
296 true, // staging_done | 329 currIsrs.Orc, |
297 ).Scan(&gid) | 330 currIsrs.Hectometre, |
298 switch { | 331 measure.Measuredate, |
299 case err == sql.ErrNoRows: | 332 msg.Identification.From, |
300 // thats expected, nothing to do | 333 msg.Identification.Language_code, |
301 case err != nil: | 334 msg.Identification.Country_code, |
302 feedback.Warn(handleError(err).Error()) | 335 msg.Identification.Date_issue, |
303 default: | 336 referenceCode, |
304 newCnt++ | 337 measure.Value, |
338 isWaterlevel, | |
339 &confInterval, | |
340 msg.Identification.Date_issue, | |
341 msg.Identification.Originator, | |
342 ).Scan(&dummy) | |
343 switch { | |
344 case err == sql.ErrNoRows: | |
345 // thats expected, nothing to do | |
346 case err != nil: | |
347 feedback.Warn(handleError(err).Error()) | |
348 default: | |
349 newP++ | |
350 } | |
351 } else { | |
352 err = insertGMStmt.QueryRowContext( | |
353 ctx, | |
354 currIsrs.CountryCode, | |
355 currIsrs.LoCode, | |
356 currIsrs.FairwaySection, | |
357 currIsrs.Orc, | |
358 currIsrs.Hectometre, | |
359 measure.Measuredate, | |
360 msg.Identification.From, | |
361 msg.Identification.Language_code, | |
362 msg.Identification.Country_code, | |
363 msg.Identification.Date_issue, | |
364 referenceCode, | |
365 measure.Value, | |
366 isWaterlevel, | |
367 msg.Identification.Date_issue, | |
368 msg.Identification.Originator, | |
369 ).Scan(&dummy) | |
370 switch { | |
371 case err == sql.ErrNoRows: | |
372 // thats expected, nothing to do | |
373 case err != nil: | |
374 feedback.Warn(handleError(err).Error()) | |
375 default: | |
376 newM++ | |
377 } | |
305 } | 378 } |
306 } | 379 } |
307 feedback.Info("Inserted %d measurements for %s", | 380 feedback.Info("Inserted %d measurements for %s", |
308 newCnt, curr) | 381 newM, curr) |
382 feedback.Info("Inserted %d predictions for %s", | |
383 newP, curr) | |
309 gids = append(gids, curr) | 384 gids = append(gids, curr) |
310 } | 385 } |
311 } | 386 } |
312 return gids, nil | 387 return gids, nil |
313 } | 388 } |