Mercurial > gemma
changeset 5539:90ba92820b26 aggregate-gm-import-logging
Merged default into aggregate-gm-import-logging branch.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 26 Oct 2021 00:05:28 +0200 |
parents | ff95d3603e4d (diff) fcab36751490 (current diff) |
children | 4d815f295e57 |
files | |
diffstat | 2 files changed, 447 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/gm.go Mon Oct 25 16:19:13 2021 +0200 +++ b/pkg/imports/gm.go Tue Oct 26 00:05:28 2021 +0200 @@ -321,6 +321,200 @@ return fn, nil } +type gmLog struct { + gid string + unknown bool + assumedZPG bool + ignoredMeasureCodes []string + rescaleErrors []string + missingValues []string + assumedCM int + badValues int + measurements int + predictions int +} + +type gmLogs []*gmLog + +func extend(haystack []string, needle string) []string { + for _, straw := range haystack { + if straw == needle { + return haystack + } + } + return append(haystack, needle) +} + +func (gl *gmLog) addRescaleError(err error) { + gl.rescaleErrors = extend(gl.rescaleErrors, err.Error()) +} + +func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) { + gl.ignoredMeasureCodes = extend(gl.ignoredMeasureCodes, string(mc)) +} + +func (gls gmLogs) find(gid string) *gmLog { + for _, gl := range gls { + if gl.gid == gid { + return gl + } + } + return nil +} + +func (gls gmLogs) logging(feedback Feedback) { + + gls.logBool( + (*gmLog).getUnkown, + "Cannot find following gauges: ", + feedback.Warn) + + gls.logBool( + (*gmLog).getAssumedZPG, + "'Reference_code' not specified. Assuming 'ZPG': ", + feedback.Warn) + + gls.logInt( + (*gmLog).getAssumedCM, + "'Unit' not specified. Assuming 'cm': ", + feedback.Warn) + + gls.logInt( + (*gmLog).getBadValues, + "Ignored measurements with value -99999: ", + feedback.Warn) + + gls.logString( + (*gmLog).getMissingValues, + "Missing mandatory values: ", + feedback.Warn) + + gls.logString( + (*gmLog).getRescaleErrors, + "Cannot convert units: ", + feedback.Error) + + gls.logString( + (*gmLog).getRescaleErrors, + "Ignored measure codes: ", + feedback.Warn) + + gls.logInt( + (*gmLog).getPredictions, + "New predictions: ", + feedback.Info) + + gls.logInt( + (*gmLog).getMeasurements, + "New measurements: ", + feedback.Info) + + gls.logBool( + (*gmLog).nothingChanged, + "No changes for: ", + feedback.Info) +} + +func (gl *gmLog) getAssumedZPG() bool { return gl.assumedZPG } +func (gl *gmLog) getUnkown() bool { return gl.unknown } +func (gl *gmLog) getIgnoredMeasureCodes() []string { return gl.ignoredMeasureCodes } +func (gl *gmLog) getRescaleErrors() []string { return gl.rescaleErrors } +func (gl *gmLog) getMissingValues() []string { return gl.missingValues } +func (gl *gmLog) getAssumedCM() int { return gl.assumedCM } +func (gl *gmLog) getBadValues() int { return gl.badValues } +func (gl *gmLog) getPredictions() int { return gl.predictions } +func (gl *gmLog) getMeasurements() int { return gl.measurements } +func (gl *gmLog) nothingChanged() bool { return gl.measurements == 0 && gl.predictions == 0 } + +func (gls gmLogs) logBool( + access func(*gmLog) bool, + header string, + log func(string, ...interface{}), +) { + var sb strings.Builder + for _, gl := range gls { + if access(gl) { + if sb.Len() == 0 { + sb.WriteString(header) + } else { + sb.WriteString(", ") + } + sb.WriteString(gl.gid) + } + } + if sb.Len() > 0 { + log(sb.String()) + } +} + +func (gls gmLogs) logInt( + access func(*gmLog) int, + header string, + log func(string, ...interface{}), +) { + var sb strings.Builder + for _, gl := range gls { + if v := access(gl); v > 0 { + if sb.Len() == 0 { + sb.WriteString(header) + } else { + sb.WriteString(", ") + } + fmt.Fprintf(&sb, "%s (%d)", gl.gid, v) + } + } + if sb.Len() > 0 { + log(sb.String()) + } +} + +func (gls gmLogs) logString( + access func(*gmLog) []string, + header string, + log func(string, ...interface{}), +) { + var sb strings.Builder + for _, gl := range gls { + if s := access(gl); len(s) > 0 { + if sb.Len() == 0 { + sb.WriteString(header) + } else { + sb.WriteString(", ") + } + fmt.Fprintf(&sb, "%s (", gl.gid) + for i, v := range s { + if i > 0 { + sb.WriteString("; ") + } + sb.WriteString(v) + } + sb.WriteByte(')') + } + } + if sb.Len() > 0 { + log(sb.String()) + } +} + +// logFinder is a helper to search recently used logs +// or create a new one if no log for a given gauge +// existed before. +func logFinder(logs *gmLogs) func(string) *gmLog { + var lastLog *gmLog + return func(gid string) *gmLog { + if lastLog != nil && lastLog.gid == gid { + return lastLog + } + if ll := logs.find(gid); ll != nil { + lastLog = ll + return ll + } + lastLog = &gmLog{gid: gid} + *logs = append(*logs, lastLog) + return lastLog + } +} + func doForGM( ctx context.Context, gauges []string, @@ -353,6 +547,14 @@ } var gids []string + + // To prevent spamming the log actual logging + // is defered to be presented in an aggregated way. + var logs gmLogs + defer func() { logs.logging(feedback) }() + + findLog := logFinder(&logs) + for _, msg := range result { for _, wrm := range msg.Wrm { curr := string(*wrm.Geo_object.Id) @@ -362,45 +564,45 @@ feedback.Warn("Invalid ISRS code %v", err) continue } - feedback.Info("Found measurements/predictions for %s", curr) + logger := findLog(curr) + gids = append(gids, curr) + if !isKnown(curr) { - feedback.Warn("Cannot find gauge %q for import", curr) + logger.unknown = true continue } var referenceCode string if wrm.Reference_code == nil { - feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") + logger.assumedZPG = true referenceCode = "ZPG" } else { referenceCode = string(*wrm.Reference_code) } - badValue := 0 - newM, newP := 0, 0 for _, measure := range wrm.Measure { var unit string if *measure.Measure_code != nts.Measure_code_enumWAL { - feedback.Warn("Ignored message with measure_code %s", - *measure.Measure_code) + logger.ignoreMeasureCode(*measure.Measure_code) continue } if measure.Unit == nil { - feedback.Info("'Unit' not specified. Assuming 'cm'") + logger.assumedCM++ unit = "cm" } else { unit = string(*measure.Unit) } if measure.Value == nil { - feedback.Warn("Missing mandatory value at %s. Ignored (bad service)", - measure.Measuredate.Format(time.RFC3339)) + logger.missingValues = append( + logger.missingValues, + measure.Measuredate.Time.Format(time.RFC3339)) continue } convert, err := rescale(unit) if err != nil { - feedback.Error(err.Error()) + logger.addRescaleError(err) continue } convert(measure.Value) @@ -409,7 +611,7 @@ // -99999 is used by some gauges to signal an error if *measure.Value == -99999 { - badValue++ + logger.badValues++ continue } @@ -459,7 +661,7 @@ case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) default: - newP++ + logger.predictions++ } } else { err = insertGMStmt.QueryRowContext( @@ -485,19 +687,10 @@ case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) default: - newM++ + logger.measurements++ } } } - if badValue > 0 { - feedback.Warn("Ignored %d measurements with value -99999", - badValue) - } - feedback.Info("Inserted %d measurements for %s", - newM, curr) - feedback.Info("Inserted %d predictions for %s", - newP, curr) - gids = append(gids, curr) } } return gids, nil
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1469/aggregate-gm-logs.sql Tue Oct 26 00:05:28 2021 +0200 @@ -0,0 +1,231 @@ +DO $$ +DECLARE + logs CURSOR FOR SELECT + lo.* FROM import.imports im JOIN import.import_logs lo + ON lo.import_id = im.id + WHERE im.kind = 'gm' + ORDER BY lo.import_id, lo.time; + last_id integer; + last_time timestamp with time zone; + curr_gid text; + value text; + joined text; + num integer; +BEGIN + + CREATE TEMP TABLE filtered_logs ( + import_id integer NOT NULL, + time timestamp with time zone NOT NULL, + kind log_type NOT NULL default 'info'::log_type, + msg text NOT NULL + ); + + CREATE TEMP TABLE agg_tracker ( + gid text NOT NULL UNIQUE, + unknown boolean NOT NULL DEFAULT false, + assume_zpg boolean NOT NULL DEFAULT false, + ign_meas_codes text ARRAY, + rescale_errors text ARRAY, + missing_values text ARRAY, + assume_cm integer NOT NULL DEFAULT 0, + bad_values integer NOT NULL DEFAULT 0, + measurements integer NOT NULL DEFAULT 0, + predictions integer NOT NULL DEFAULT 0 + ); + last_id := -1; + + FOR line IN logs LOOP + + IF last_id <> line.import_id THEN + + -- unknown + SELECT INTO joined string_agg(gid, ', ') + FROM agg_tracker + WHERE unknown; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + 'Cannot find following gauges: ' || joined); + END IF; + + -- assume zpg + SELECT INTO joined string_agg(gid, ', ') + FROM agg_tracker + WHERE assume_zpg; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + '''Reference_code'' not specified. Assuming ''ZPG'': ' || joined); + END IF; + + -- assume cm + SELECT INTO joined string_agg( + gid || ' (' || assume_cm || ')', ', ') + FROM agg_tracker + WHERE assume_cm > 0; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + '''Unit'' not specified. Assuming ''cm'': ' || joined); + END IF; + + -- bad values + SELECT INTO joined string_agg( + gid || ' (' || bad_values || ')', ', ') + FROM agg_tracker + WHERE bad_values > 0; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + 'Ignored measurements with value -99999: ' || joined); + END IF; + + + -- TODO: ""Missing mandatory values: " + -- TODO: "Cannot convert units: " + + -- ignored measure codes + SELECT INTO joined string_agg( + gid || ' (' + || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ') + || ')', ', ') + FROM agg_tracker + WHERE ign_meas_codes IS NOT NULL; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + 'Ignored measure codes: ' || joined); + END IF; + + -- predictions + SELECT INTO joined string_agg( + gid || ' (' || predictions || ')', ', ') + FROM agg_tracker + WHERE predictions > 0; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'info'::log_type, + 'New predictions: ' || joined); + END IF; + + -- measurements + SELECT INTO joined string_agg( + gid || ' (' || measurements || ')', ', ') + FROM agg_tracker + WHERE measurements > 0; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'info'::log_type, + 'New measurements: ' || joined); + END IF; + + -- nothing changed + SELECT INTO joined string_agg(gid, ', ') + FROM agg_tracker + WHERE measurements = 0 and predictions = 0; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'info'::log_type, + 'No changes for: ' || joined); + END IF; + + + TRUNCATE agg_tracker; + last_id := line.import_id; + + ELSIF line.msg ~ '^Found measurements/predictions for ....................$' THEN + curr_gid := substring( + line.msg from '^Found measurements/predictions for (....................)$'); + + ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN + num := substring( + line.msg from '^Inserted (\d+)')::integer; + value := substring( + line.msg from '^Inserted \d+ measurements for (....................)$'); + + INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) + ON CONFLICT (gid) + DO UPDATE SET measurements = EXCLUDED.measurements + num; + + ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN + num := substring( + line.msg from '^Ignored (\d+)')::integer; + + INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num) + ON CONFLICT (gid) + DO UPDATE SET bad_values = EXCLUDED.bad_values + num; + + ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN + num := substring( + line.msg from '^Inserted (\d+)')::integer; + value := substring( + line.msg from '^Inserted \d+ predictions for (....................)$'); + + INSERT INTO agg_tracker (gid, predictions) VALUES (value, num) + ON CONFLICT (gid) + DO UPDATE SET predictions = EXCLUDED.predictions + num; + + ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN + + INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true) + ON CONFLICT (gid) + DO UPDATE SET assume_zpg = true; + + ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN + value := substring( + line.msg from '^Cannot find gauge "(....................)" for import$'); + + INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) + ON CONFLICT (gid) + DO UPDATE SET unknown = true; + + ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN + + INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1) + ON CONFLICT (gid) + DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1; + + ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN + value := substring( + line.msg from '^Ignored message with measure_code (.+)$'); + + INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value]) + ON CONFLICT (gid) + DO UPDATE SET ign_meas_codes = + array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes); + + ELSIF line.msg ~ '^Importing gauge measurements took ' THEN + + -- TODO: Flush aggregation because its likely the last entry. + ELSE + -- TODO: Handle 'Missing mandatory value at %s. Ignored (bad service)' + -- TODO: Handle "unknown unit '%s'" + -- Not handled, copy through .. + INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg); + + END IF; + + last_time := line.time; + END LOOP; + + -- TODO: Handle remains from agg_tracker + + -- DELETE FROM import.import_logs WHERE import_id IN ( + -- SELECT id FROM import.imports WHERE kind = 'gm') + + -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs; + + DROP TABLE filtered_logs; + DROP TABLE agg_tracker; +END $$; +-- VACUUM FULL; + +