comparison pkg/imports/gm.go @ 3277:232fc90e6ee2

Disentangle gauge measurements and predictions Representing both in one table has led to the necessity to make the distinction at many places such as statements, definitions of partial indexes and application code. At least in one place in the AGM import the distinction in application code was too late and measurements matching an approved measurement could have been missed.
author Tom Gottfried <tom@intevation.de>
date Wed, 15 May 2019 19:08:49 +0200
parents 3dee5cf16a58
children 831193935739
comparison
equal deleted inserted replaced
3276:75db3199f76e 3277:232fc90e6ee2
22 "strings" 22 "strings"
23 "time" 23 "time"
24 24
25 "gemma.intevation.de/gemma/pkg/models" 25 "gemma.intevation.de/gemma/pkg/models"
26 "gemma.intevation.de/gemma/pkg/soap/nts" 26 "gemma.intevation.de/gemma/pkg/soap/nts"
27 "github.com/jackc/pgx/pgtype"
27 ) 28 )
28 29
29 // GaugeMeasurement is an import job to import 30 // GaugeMeasurement is an import job to import
30 // gauges measurement data from a NtS SOAP service. 31 // gauges measurement data from a NtS SOAP service.
31 type GaugeMeasurement struct { 32 type GaugeMeasurement struct {
51 language_code, 52 language_code,
52 country_code, 53 country_code,
53 date_issue, 54 date_issue,
54 reference_code, 55 reference_code,
55 water_level, 56 water_level,
56 predicted,
57 is_waterlevel, 57 is_waterlevel,
58 value_min,
59 value_max,
60 date_info, 58 date_info,
61 source_organization, 59 source_organization,
62 staging_done 60 staging_done
63 ) VALUES( 61 ) VALUES(
64 ($1, $2, $3, $4, $5), 62 ($1, $2, $3, $4, $5),
70 $11, 68 $11,
71 $12, 69 $12,
72 $13, 70 $13,
73 $14, 71 $14,
74 $15, 72 $15,
75 $16, 73 true
76 $17,
77 $18,
78 $19
79 ) 74 )
80 ON CONFLICT DO NOTHING 75 ON CONFLICT DO NOTHING
81 RETURNING id 76 RETURNING 1
77 `
78
79 insertGPSQL = `
80 INSERT INTO waterway.gauge_predictions (
81 fk_gauge_id,
82 measure_date,
83 sender,
84 language_code,
85 country_code,
86 date_issue,
87 reference_code,
88 water_level,
89 is_waterlevel,
90 conf_interval,
91 date_info,
92 source_organization
93 ) VALUES(
94 ($1, $2, $3, $4, $5),
95 $6,
96 $7,
97 $8,
98 $9,
99 $10,
100 $11,
101 $12,
102 $13,
103 $14,
104 $15,
105 $16
106 )
107 ON CONFLICT DO NOTHING
108 RETURNING 1
82 ` 109 `
83 ) 110 )
84 111
85 type gmJobCreator struct{} 112 type gmJobCreator struct{}
86 113
222 fetch func() ([]*nts.RIS_Message_Type, error), 249 fetch func() ([]*nts.RIS_Message_Type, error),
223 conn *sql.Conn, 250 conn *sql.Conn,
224 feedback Feedback, 251 feedback Feedback,
225 ) ([]string, error) { 252 ) ([]string, error) {
226 253
227 insertStmt, err := conn.PrepareContext(ctx, insertGMSQL) 254 insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
228 if err != nil { 255 if err != nil {
229 return nil, err 256 return nil, err
230 } 257 }
231 defer insertStmt.Close() 258 defer insertGPStmt.Close()
259
260 insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
261 if err != nil {
262 return nil, err
263 }
264 defer insertGMStmt.Close()
232 265
233 result, err := fetch() 266 result, err := fetch()
234 if err != nil { 267 if err != nil {
235 return nil, err 268 return nil, err
236 } 269 }
237 270
238 var gids []string 271 var gids []string
239 for _, msg := range result { 272 for _, msg := range result {
240 var gid int64 273 var dummy int
241 for _, wrm := range msg.Wrm { 274 for _, wrm := range msg.Wrm {
242 curr := string(*wrm.Geo_object.Id) 275 curr := string(*wrm.Geo_object.Id)
243 currIsrs, err := models.IsrsFromString(curr) 276 currIsrs, err := models.IsrsFromString(curr)
244 if err != nil { 277 if err != nil {
245 feedback.Warn("Invalid ISRS code %v", err) 278 feedback.Warn("Invalid ISRS code %v", err)
246 continue 279 continue
247 } 280 }
248 feedback.Info("Found measurements for %s", curr) 281 feedback.Info("Found measurements/predictions for %s", curr)
249 282
250 var referenceCode string 283 var referenceCode string
251 if wrm.Reference_code == nil { 284 if wrm.Reference_code == nil {
252 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") 285 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
253 referenceCode = "ZPG" 286 referenceCode = "ZPG"
254 } else { 287 } else {
255 referenceCode = string(*wrm.Reference_code) 288 referenceCode = string(*wrm.Reference_code)
256 } 289 }
257 290
258 var newCnt int = 0 291 newM, newP := 0, 0
259 for _, measure := range wrm.Measure { 292 for _, measure := range wrm.Measure {
260 var unit string 293 var unit string
261 if measure.Unit == nil { 294 if measure.Unit == nil {
262 feedback.Info("'Unit' not specified. Assuming 'cm'") 295 feedback.Info("'Unit' not specified. Assuming 'cm'")
263 unit = "cm" 296 unit = "cm"
271 convert(measure.Value) 304 convert(measure.Value)
272 convert(measure.Value_min) 305 convert(measure.Value_min)
273 convert(measure.Value_max) 306 convert(measure.Value_max)
274 307
275 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL 308 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
276 err = insertStmt.QueryRowContext( 309
277 ctx, 310 if measure.Predicted {
278 currIsrs.CountryCode, 311 var confInterval pgtype.Numrange
279 currIsrs.LoCode, 312 if measure.Value_min != nil && measure.Value_max != nil {
280 currIsrs.FairwaySection, 313 var valueMin, valueMax pgtype.Numeric
281 currIsrs.Orc, 314 valueMin.Set(measure.Value_min)
282 currIsrs.Hectometre, 315 valueMax.Set(measure.Value_max)
283 measure.Measuredate, 316 confInterval = pgtype.Numrange{
284 msg.Identification.From, 317 Lower: valueMin,
285 msg.Identification.Language_code, 318 Upper: valueMax,
286 msg.Identification.Country_code, 319 LowerType: pgtype.Inclusive,
287 msg.Identification.Date_issue, 320 UpperType: pgtype.Inclusive,
288 referenceCode, 321 Status: pgtype.Present,
289 measure.Value, 322 }
290 measure.Predicted, 323 }
291 isWaterlevel, 324 err = insertGPStmt.QueryRowContext(
292 measure.Value_min, 325 ctx,
293 measure.Value_max, 326 currIsrs.CountryCode,
294 msg.Identification.Date_issue, 327 currIsrs.LoCode,
295 msg.Identification.Originator, 328 currIsrs.FairwaySection,
296 true, // staging_done 329 currIsrs.Orc,
297 ).Scan(&gid) 330 currIsrs.Hectometre,
298 switch { 331 measure.Measuredate,
299 case err == sql.ErrNoRows: 332 msg.Identification.From,
300 // thats expected, nothing to do 333 msg.Identification.Language_code,
301 case err != nil: 334 msg.Identification.Country_code,
302 feedback.Warn(handleError(err).Error()) 335 msg.Identification.Date_issue,
303 default: 336 referenceCode,
304 newCnt++ 337 measure.Value,
338 isWaterlevel,
339 &confInterval,
340 msg.Identification.Date_issue,
341 msg.Identification.Originator,
342 ).Scan(&dummy)
343 switch {
344 case err == sql.ErrNoRows:
345 // thats expected, nothing to do
346 case err != nil:
347 feedback.Warn(handleError(err).Error())
348 default:
349 newP++
350 }
351 } else {
352 err = insertGMStmt.QueryRowContext(
353 ctx,
354 currIsrs.CountryCode,
355 currIsrs.LoCode,
356 currIsrs.FairwaySection,
357 currIsrs.Orc,
358 currIsrs.Hectometre,
359 measure.Measuredate,
360 msg.Identification.From,
361 msg.Identification.Language_code,
362 msg.Identification.Country_code,
363 msg.Identification.Date_issue,
364 referenceCode,
365 measure.Value,
366 isWaterlevel,
367 msg.Identification.Date_issue,
368 msg.Identification.Originator,
369 ).Scan(&dummy)
370 switch {
371 case err == sql.ErrNoRows:
372 // thats expected, nothing to do
373 case err != nil:
374 feedback.Warn(handleError(err).Error())
375 default:
376 newM++
377 }
305 } 378 }
306 } 379 }
307 feedback.Info("Inserted %d measurements for %s", 380 feedback.Info("Inserted %d measurements for %s",
308 newCnt, curr) 381 newM, curr)
382 feedback.Info("Inserted %d predictions for %s",
383 newP, curr)
309 gids = append(gids, curr) 384 gids = append(gids, curr)
310 } 385 }
311 } 386 }
312 return gids, nil 387 return gids, nil
313 } 388 }