comparison pkg/imports/gm.go @ 5528:133dc5b3076a aggregate-gm-import-logging

WIP: Started to log GM imports in a more agregated way.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Sun, 24 Oct 2021 16:54:00 +0200
parents 5f47eeea988d
children f95f268a83bd
comparison
equal deleted inserted replaced
5524:35966741e45e 5528:133dc5b3076a
319 } 319 }
320 } 320 }
321 return fn, nil 321 return fn, nil
322 } 322 }
323 323
324 type gmLog struct {
325 isrs *models.Isrs
326 unknown bool
327 assumedZPG bool
328 ignoredMeasureCodes []nts.Measure_code_enum
329 assumedCM int
330 missingValues []time.Time
331 badValue int
332 newM int
333 newP int
334 }
335
336 type gmLogs []*gmLog
337
338 func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) {
339 for _, m := range gl.ignoredMeasureCodes {
340 if m == mc {
341 return
342 }
343 }
344 gl.ignoredMeasureCodes = append(gl.ignoredMeasureCodes, mc)
345 }
346
347 func (gls gmLogs) logging(feedback Feedback) {
348 gls.logUnknown(feedback)
349 gls.logAssumedZPG(feedback)
350 // TODO: assumed CM
351 // TODO: bad values
352 // TODO: missing values
353 // TODO: new predictions
354 // TODO: new measurements
355 }
356
357 func (gls gmLogs) logUnknown(feedback Feedback) {
358 var sb strings.Builder
359 for _, gl := range gls {
360 if gl.assumedZPG {
361 if sb.Len() == 0 {
362 sb.WriteString("Cannot find following gauges: ")
363 } else {
364 sb.WriteString(", ")
365 }
366 sb.WriteString(gl.isrs.String())
367 }
368 }
369 if sb.Len() > 0 {
370 feedback.Warn(sb.String())
371 }
372 }
373
374 func (gls gmLogs) logAssumedZPG(feedback Feedback) {
375 var sb strings.Builder
376 for _, gl := range gls {
377 if gl.assumedZPG {
378 if sb.Len() == 0 {
379 sb.WriteString("'Reference_code' not specified. Assuming 'ZPG': ")
380 } else {
381 sb.WriteString(", ")
382 }
383 sb.WriteString(gl.isrs.String())
384 }
385 }
386 if sb.Len() > 0 {
387 feedback.Info(sb.String())
388 }
389 }
390
324 func doForGM( 391 func doForGM(
325 ctx context.Context, 392 ctx context.Context,
326 gauges []string, 393 gauges []string,
327 fetch func() ([]*nts.RIS_Message_Type, error), 394 fetch func() ([]*nts.RIS_Message_Type, error),
328 conn *sql.Conn, 395 conn *sql.Conn,
351 if err != nil { 418 if err != nil {
352 return nil, err 419 return nil, err
353 } 420 }
354 421
355 var gids []string 422 var gids []string
423
424 // To prevent spamming the log actual logging
425 // is defered to be presented in an aggregated way.
426 var logs gmLogs
427 defer logs.logging(feedback)
428
356 for _, msg := range result { 429 for _, msg := range result {
357 for _, wrm := range msg.Wrm { 430 for _, wrm := range msg.Wrm {
358 curr := string(*wrm.Geo_object.Id) 431 curr := string(*wrm.Geo_object.Id)
359 curr = strings.TrimSpace(curr) 432 curr = strings.TrimSpace(curr)
360 currIsrs, err := models.IsrsFromString(curr) 433 currIsrs, err := models.IsrsFromString(curr)
361 if err != nil { 434 if err != nil {
362 feedback.Warn("Invalid ISRS code %v", err) 435 feedback.Warn("Invalid ISRS code %v", err)
363 continue 436 continue
364 } 437 }
365 feedback.Info("Found measurements/predictions for %s", curr) 438 log := &gmLog{isrs: currIsrs}
439 logs = append(logs, log)
440 gids = append(gids, curr)
441
442 //feedback.Info("Found measurements/predictions for %s", curr)
366 if !isKnown(curr) { 443 if !isKnown(curr) {
367 feedback.Warn("Cannot find gauge %q for import", curr) 444 log.unknown = true
445 // feedback.Warn("Cannot find gauge %q for import", curr)
368 continue 446 continue
369 } 447 }
370 448
371 var referenceCode string 449 var referenceCode string
372 if wrm.Reference_code == nil { 450 if wrm.Reference_code == nil {
373 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") 451 //feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
452 log.assumedZPG = true
374 referenceCode = "ZPG" 453 referenceCode = "ZPG"
375 } else { 454 } else {
376 referenceCode = string(*wrm.Reference_code) 455 referenceCode = string(*wrm.Reference_code)
377 } 456 }
378 457
379 badValue := 0 458 //badValue := 0
380 newM, newP := 0, 0 459 //newM, newP := 0, 0
381 for _, measure := range wrm.Measure { 460 for _, measure := range wrm.Measure {
382 var unit string 461 var unit string
383 if *measure.Measure_code != nts.Measure_code_enumWAL { 462 if *measure.Measure_code != nts.Measure_code_enumWAL {
384 feedback.Warn("Ignored message with measure_code %s", 463 log.ignoreMeasureCode(*measure.Measure_code)
385 *measure.Measure_code) 464 //feedback.Warn("Ignored message with measure_code %s",
465 // *measure.Measure_code)
386 continue 466 continue
387 } 467 }
388 if measure.Unit == nil { 468 if measure.Unit == nil {
389 feedback.Info("'Unit' not specified. Assuming 'cm'") 469 //feedback.Info("'Unit' not specified. Assuming 'cm'")
470 log.assumedCM++
390 unit = "cm" 471 unit = "cm"
391 } else { 472 } else {
392 unit = string(*measure.Unit) 473 unit = string(*measure.Unit)
393 } 474 }
394 475
395 if measure.Value == nil { 476 if measure.Value == nil {
396 feedback.Warn("Missing mandatory value at %s. Ignored (bad service)", 477 log.missingValues = append(log.missingValues, measure.Measuredate.Time)
397 measure.Measuredate.Format(time.RFC3339)) 478 //feedback.Warn("Missing mandatory value at %s. Ignored (bad service)",
479 // measure.Measuredate.Format(time.RFC3339))
398 continue 480 continue
399 } 481 }
400 482
401 convert, err := rescale(unit) 483 convert, err := rescale(unit)
402 if err != nil { 484 if err != nil {
485 // TODO: log rescale error
403 feedback.Error(err.Error()) 486 feedback.Error(err.Error())
404 continue 487 continue
405 } 488 }
406 convert(measure.Value) 489 convert(measure.Value)
407 convert(measure.Value_min) 490 convert(measure.Value_min)
408 convert(measure.Value_max) 491 convert(measure.Value_max)
409 492
410 // -99999 is used by some gauges to signal an error 493 // -99999 is used by some gauges to signal an error
411 if *measure.Value == -99999 { 494 if *measure.Value == -99999 {
412 badValue++ 495 log.badValue++
413 continue 496 continue
414 } 497 }
415 498
416 var dummy int 499 var dummy int
417 if measure.Predicted { 500 if measure.Predicted {
457 case err == sql.ErrNoRows: 540 case err == sql.ErrNoRows:
458 // thats expected, nothing to do 541 // thats expected, nothing to do
459 case err != nil: 542 case err != nil:
460 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) 543 feedback.Error(pgxutils.ReadableError{Err: err}.Error())
461 default: 544 default:
462 newP++ 545 log.newP++
463 } 546 }
464 } else { 547 } else {
465 err = insertGMStmt.QueryRowContext( 548 err = insertGMStmt.QueryRowContext(
466 ctx, 549 ctx,
467 currIsrs.CountryCode, 550 currIsrs.CountryCode,
483 case err == sql.ErrNoRows: 566 case err == sql.ErrNoRows:
484 // thats expected, nothing to do 567 // thats expected, nothing to do
485 case err != nil: 568 case err != nil:
486 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) 569 feedback.Error(pgxutils.ReadableError{Err: err}.Error())
487 default: 570 default:
488 newM++ 571 log.newM++
489 } 572 }
490 } 573 }
491 } 574 }
492 if badValue > 0 { 575 //if badValue > 0 {
493 feedback.Warn("Ignored %d measurements with value -99999", 576 // feedback.Warn("Ignored %d measurements with value -99999",
494 badValue) 577 // badValue)
495 } 578 //}
496 feedback.Info("Inserted %d measurements for %s", 579 //feedback.Info("Inserted %d measurements for %s",
497 newM, curr) 580 // newM, curr)
498 feedback.Info("Inserted %d predictions for %s", 581 //feedback.Info("Inserted %d predictions for %s",
499 newP, curr) 582 // newP, curr)
500 gids = append(gids, curr)
501 } 583 }
502 } 584 }
503 return gids, nil 585 return gids, nil
504 } 586 }