Mercurial > gemma
comparison pkg/imports/gm.go @ 5528:133dc5b3076a aggregate-gm-import-logging
WIP: Started to log GM imports in a more agregated way.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 24 Oct 2021 16:54:00 +0200 |
parents | 5f47eeea988d |
children | f95f268a83bd |
comparison
equal
deleted
inserted
replaced
5524:35966741e45e | 5528:133dc5b3076a |
---|---|
319 } | 319 } |
320 } | 320 } |
321 return fn, nil | 321 return fn, nil |
322 } | 322 } |
323 | 323 |
324 type gmLog struct { | |
325 isrs *models.Isrs | |
326 unknown bool | |
327 assumedZPG bool | |
328 ignoredMeasureCodes []nts.Measure_code_enum | |
329 assumedCM int | |
330 missingValues []time.Time | |
331 badValue int | |
332 newM int | |
333 newP int | |
334 } | |
335 | |
336 type gmLogs []*gmLog | |
337 | |
338 func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) { | |
339 for _, m := range gl.ignoredMeasureCodes { | |
340 if m == mc { | |
341 return | |
342 } | |
343 } | |
344 gl.ignoredMeasureCodes = append(gl.ignoredMeasureCodes, mc) | |
345 } | |
346 | |
347 func (gls gmLogs) logging(feedback Feedback) { | |
348 gls.logUnknown(feedback) | |
349 gls.logAssumedZPG(feedback) | |
350 // TODO: assumed CM | |
351 // TODO: bad values | |
352 // TODO: missing values | |
353 // TODO: new predictions | |
354 // TODO: new measurements | |
355 } | |
356 | |
357 func (gls gmLogs) logUnknown(feedback Feedback) { | |
358 var sb strings.Builder | |
359 for _, gl := range gls { | |
360 if gl.assumedZPG { | |
361 if sb.Len() == 0 { | |
362 sb.WriteString("Cannot find following gauges: ") | |
363 } else { | |
364 sb.WriteString(", ") | |
365 } | |
366 sb.WriteString(gl.isrs.String()) | |
367 } | |
368 } | |
369 if sb.Len() > 0 { | |
370 feedback.Warn(sb.String()) | |
371 } | |
372 } | |
373 | |
374 func (gls gmLogs) logAssumedZPG(feedback Feedback) { | |
375 var sb strings.Builder | |
376 for _, gl := range gls { | |
377 if gl.assumedZPG { | |
378 if sb.Len() == 0 { | |
379 sb.WriteString("'Reference_code' not specified. Assuming 'ZPG': ") | |
380 } else { | |
381 sb.WriteString(", ") | |
382 } | |
383 sb.WriteString(gl.isrs.String()) | |
384 } | |
385 } | |
386 if sb.Len() > 0 { | |
387 feedback.Info(sb.String()) | |
388 } | |
389 } | |
390 | |
324 func doForGM( | 391 func doForGM( |
325 ctx context.Context, | 392 ctx context.Context, |
326 gauges []string, | 393 gauges []string, |
327 fetch func() ([]*nts.RIS_Message_Type, error), | 394 fetch func() ([]*nts.RIS_Message_Type, error), |
328 conn *sql.Conn, | 395 conn *sql.Conn, |
351 if err != nil { | 418 if err != nil { |
352 return nil, err | 419 return nil, err |
353 } | 420 } |
354 | 421 |
355 var gids []string | 422 var gids []string |
423 | |
424 // To prevent spamming the log actual logging | |
425 // is defered to be presented in an aggregated way. | |
426 var logs gmLogs | |
427 defer logs.logging(feedback) | |
428 | |
356 for _, msg := range result { | 429 for _, msg := range result { |
357 for _, wrm := range msg.Wrm { | 430 for _, wrm := range msg.Wrm { |
358 curr := string(*wrm.Geo_object.Id) | 431 curr := string(*wrm.Geo_object.Id) |
359 curr = strings.TrimSpace(curr) | 432 curr = strings.TrimSpace(curr) |
360 currIsrs, err := models.IsrsFromString(curr) | 433 currIsrs, err := models.IsrsFromString(curr) |
361 if err != nil { | 434 if err != nil { |
362 feedback.Warn("Invalid ISRS code %v", err) | 435 feedback.Warn("Invalid ISRS code %v", err) |
363 continue | 436 continue |
364 } | 437 } |
365 feedback.Info("Found measurements/predictions for %s", curr) | 438 log := &gmLog{isrs: currIsrs} |
439 logs = append(logs, log) | |
440 gids = append(gids, curr) | |
441 | |
442 //feedback.Info("Found measurements/predictions for %s", curr) | |
366 if !isKnown(curr) { | 443 if !isKnown(curr) { |
367 feedback.Warn("Cannot find gauge %q for import", curr) | 444 log.unknown = true |
445 // feedback.Warn("Cannot find gauge %q for import", curr) | |
368 continue | 446 continue |
369 } | 447 } |
370 | 448 |
371 var referenceCode string | 449 var referenceCode string |
372 if wrm.Reference_code == nil { | 450 if wrm.Reference_code == nil { |
373 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") | 451 //feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") |
452 log.assumedZPG = true | |
374 referenceCode = "ZPG" | 453 referenceCode = "ZPG" |
375 } else { | 454 } else { |
376 referenceCode = string(*wrm.Reference_code) | 455 referenceCode = string(*wrm.Reference_code) |
377 } | 456 } |
378 | 457 |
379 badValue := 0 | 458 //badValue := 0 |
380 newM, newP := 0, 0 | 459 //newM, newP := 0, 0 |
381 for _, measure := range wrm.Measure { | 460 for _, measure := range wrm.Measure { |
382 var unit string | 461 var unit string |
383 if *measure.Measure_code != nts.Measure_code_enumWAL { | 462 if *measure.Measure_code != nts.Measure_code_enumWAL { |
384 feedback.Warn("Ignored message with measure_code %s", | 463 log.ignoreMeasureCode(*measure.Measure_code) |
385 *measure.Measure_code) | 464 //feedback.Warn("Ignored message with measure_code %s", |
465 // *measure.Measure_code) | |
386 continue | 466 continue |
387 } | 467 } |
388 if measure.Unit == nil { | 468 if measure.Unit == nil { |
389 feedback.Info("'Unit' not specified. Assuming 'cm'") | 469 //feedback.Info("'Unit' not specified. Assuming 'cm'") |
470 log.assumedCM++ | |
390 unit = "cm" | 471 unit = "cm" |
391 } else { | 472 } else { |
392 unit = string(*measure.Unit) | 473 unit = string(*measure.Unit) |
393 } | 474 } |
394 | 475 |
395 if measure.Value == nil { | 476 if measure.Value == nil { |
396 feedback.Warn("Missing mandatory value at %s. Ignored (bad service)", | 477 log.missingValues = append(log.missingValues, measure.Measuredate.Time) |
397 measure.Measuredate.Format(time.RFC3339)) | 478 //feedback.Warn("Missing mandatory value at %s. Ignored (bad service)", |
479 // measure.Measuredate.Format(time.RFC3339)) | |
398 continue | 480 continue |
399 } | 481 } |
400 | 482 |
401 convert, err := rescale(unit) | 483 convert, err := rescale(unit) |
402 if err != nil { | 484 if err != nil { |
485 // TODO: log rescale error | |
403 feedback.Error(err.Error()) | 486 feedback.Error(err.Error()) |
404 continue | 487 continue |
405 } | 488 } |
406 convert(measure.Value) | 489 convert(measure.Value) |
407 convert(measure.Value_min) | 490 convert(measure.Value_min) |
408 convert(measure.Value_max) | 491 convert(measure.Value_max) |
409 | 492 |
410 // -99999 is used by some gauges to signal an error | 493 // -99999 is used by some gauges to signal an error |
411 if *measure.Value == -99999 { | 494 if *measure.Value == -99999 { |
412 badValue++ | 495 log.badValue++ |
413 continue | 496 continue |
414 } | 497 } |
415 | 498 |
416 var dummy int | 499 var dummy int |
417 if measure.Predicted { | 500 if measure.Predicted { |
457 case err == sql.ErrNoRows: | 540 case err == sql.ErrNoRows: |
458 // thats expected, nothing to do | 541 // thats expected, nothing to do |
459 case err != nil: | 542 case err != nil: |
460 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | 543 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) |
461 default: | 544 default: |
462 newP++ | 545 log.newP++ |
463 } | 546 } |
464 } else { | 547 } else { |
465 err = insertGMStmt.QueryRowContext( | 548 err = insertGMStmt.QueryRowContext( |
466 ctx, | 549 ctx, |
467 currIsrs.CountryCode, | 550 currIsrs.CountryCode, |
483 case err == sql.ErrNoRows: | 566 case err == sql.ErrNoRows: |
484 // thats expected, nothing to do | 567 // thats expected, nothing to do |
485 case err != nil: | 568 case err != nil: |
486 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | 569 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) |
487 default: | 570 default: |
488 newM++ | 571 log.newM++ |
489 } | 572 } |
490 } | 573 } |
491 } | 574 } |
492 if badValue > 0 { | 575 //if badValue > 0 { |
493 feedback.Warn("Ignored %d measurements with value -99999", | 576 // feedback.Warn("Ignored %d measurements with value -99999", |
494 badValue) | 577 // badValue) |
495 } | 578 //} |
496 feedback.Info("Inserted %d measurements for %s", | 579 //feedback.Info("Inserted %d measurements for %s", |
497 newM, curr) | 580 // newM, curr) |
498 feedback.Info("Inserted %d predictions for %s", | 581 //feedback.Info("Inserted %d predictions for %s", |
499 newP, curr) | 582 // newP, curr) |
500 gids = append(gids, curr) | |
501 } | 583 } |
502 } | 584 } |
503 return gids, nil | 585 return gids, nil |
504 } | 586 } |