changeset 5539:90ba92820b26 aggregate-gm-import-logging

Merged default into aggregate-gm-import-logging branch.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Oct 2021 00:05:28 +0200
parents ff95d3603e4d (diff) fcab36751490 (current diff)
children 4d815f295e57
files
diffstat 2 files changed, 447 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/gm.go	Mon Oct 25 16:19:13 2021 +0200
+++ b/pkg/imports/gm.go	Tue Oct 26 00:05:28 2021 +0200
@@ -321,6 +321,200 @@
 	return fn, nil
 }
 
+type gmLog struct {
+	gid                 string
+	unknown             bool
+	assumedZPG          bool
+	ignoredMeasureCodes []string
+	rescaleErrors       []string
+	missingValues       []string
+	assumedCM           int
+	badValues           int
+	measurements        int
+	predictions         int
+}
+
+type gmLogs []*gmLog
+
+func extend(haystack []string, needle string) []string {
+	for _, straw := range haystack {
+		if straw == needle {
+			return haystack
+		}
+	}
+	return append(haystack, needle)
+}
+
+func (gl *gmLog) addRescaleError(err error) {
+	gl.rescaleErrors = extend(gl.rescaleErrors, err.Error())
+}
+
+func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) {
+	gl.ignoredMeasureCodes = extend(gl.ignoredMeasureCodes, string(mc))
+}
+
+func (gls gmLogs) find(gid string) *gmLog {
+	for _, gl := range gls {
+		if gl.gid == gid {
+			return gl
+		}
+	}
+	return nil
+}
+
+func (gls gmLogs) logging(feedback Feedback) {
+
+	gls.logBool(
+		(*gmLog).getUnkown,
+		"Cannot find following gauges: ",
+		feedback.Warn)
+
+	gls.logBool(
+		(*gmLog).getAssumedZPG,
+		"'Reference_code' not specified. Assuming 'ZPG': ",
+		feedback.Warn)
+
+	gls.logInt(
+		(*gmLog).getAssumedCM,
+		"'Unit' not specified. Assuming 'cm': ",
+		feedback.Warn)
+
+	gls.logInt(
+		(*gmLog).getBadValues,
+		"Ignored measurements with value -99999: ",
+		feedback.Warn)
+
+	gls.logString(
+		(*gmLog).getMissingValues,
+		"Missing mandatory values: ",
+		feedback.Warn)
+
+	gls.logString(
+		(*gmLog).getRescaleErrors,
+		"Cannot convert units: ",
+		feedback.Error)
+
+	gls.logString(
+		(*gmLog).getRescaleErrors,
+		"Ignored measure codes: ",
+		feedback.Warn)
+
+	gls.logInt(
+		(*gmLog).getPredictions,
+		"New predictions: ",
+		feedback.Info)
+
+	gls.logInt(
+		(*gmLog).getMeasurements,
+		"New measurements: ",
+		feedback.Info)
+
+	gls.logBool(
+		(*gmLog).nothingChanged,
+		"No changes for: ",
+		feedback.Info)
+}
+
+func (gl *gmLog) getAssumedZPG() bool              { return gl.assumedZPG }
+func (gl *gmLog) getUnkown() bool                  { return gl.unknown }
+func (gl *gmLog) getIgnoredMeasureCodes() []string { return gl.ignoredMeasureCodes }
+func (gl *gmLog) getRescaleErrors() []string       { return gl.rescaleErrors }
+func (gl *gmLog) getMissingValues() []string       { return gl.missingValues }
+func (gl *gmLog) getAssumedCM() int                { return gl.assumedCM }
+func (gl *gmLog) getBadValues() int                { return gl.badValues }
+func (gl *gmLog) getPredictions() int              { return gl.predictions }
+func (gl *gmLog) getMeasurements() int             { return gl.measurements }
+func (gl *gmLog) nothingChanged() bool             { return gl.measurements == 0 && gl.predictions == 0 }
+
+func (gls gmLogs) logBool(
+	access func(*gmLog) bool,
+	header string,
+	log func(string, ...interface{}),
+) {
+	var sb strings.Builder
+	for _, gl := range gls {
+		if access(gl) {
+			if sb.Len() == 0 {
+				sb.WriteString(header)
+			} else {
+				sb.WriteString(", ")
+			}
+			sb.WriteString(gl.gid)
+		}
+	}
+	if sb.Len() > 0 {
+		log(sb.String())
+	}
+}
+
+func (gls gmLogs) logInt(
+	access func(*gmLog) int,
+	header string,
+	log func(string, ...interface{}),
+) {
+	var sb strings.Builder
+	for _, gl := range gls {
+		if v := access(gl); v > 0 {
+			if sb.Len() == 0 {
+				sb.WriteString(header)
+			} else {
+				sb.WriteString(", ")
+			}
+			fmt.Fprintf(&sb, "%s (%d)", gl.gid, v)
+		}
+	}
+	if sb.Len() > 0 {
+		log(sb.String())
+	}
+}
+
+func (gls gmLogs) logString(
+	access func(*gmLog) []string,
+	header string,
+	log func(string, ...interface{}),
+) {
+	var sb strings.Builder
+	for _, gl := range gls {
+		if s := access(gl); len(s) > 0 {
+			if sb.Len() == 0 {
+				sb.WriteString(header)
+			} else {
+				sb.WriteString(", ")
+			}
+			fmt.Fprintf(&sb, "%s (", gl.gid)
+			for i, v := range s {
+				if i > 0 {
+					sb.WriteString("; ")
+				}
+				sb.WriteString(v)
+			}
+			sb.WriteByte(')')
+		}
+	}
+	if sb.Len() > 0 {
+		log(sb.String())
+	}
+}
+
+// logFinder is a helper to search recently used logs
+// or create a new one if no log for a given gauge
+// existed before.
+func logFinder(logs *gmLogs) func(string) *gmLog {
+	var lastLog *gmLog
+	return func(gid string) *gmLog {
+		if lastLog != nil && lastLog.gid == gid {
+			return lastLog
+		}
+		if ll := logs.find(gid); ll != nil {
+			lastLog = ll
+			return ll
+		}
+		lastLog = &gmLog{gid: gid}
+		*logs = append(*logs, lastLog)
+		return lastLog
+	}
+}
+
 func doForGM(
 	ctx context.Context,
 	gauges []string,
@@ -353,6 +547,14 @@
 	}
 
 	var gids []string
+
+	// To prevent spamming the log actual logging
+	// is defered to be presented in an aggregated way.
+	var logs gmLogs
+	defer func() { logs.logging(feedback) }()
+
+	findLog := logFinder(&logs)
+
 	for _, msg := range result {
 		for _, wrm := range msg.Wrm {
 			curr := string(*wrm.Geo_object.Id)
@@ -362,45 +564,45 @@
 				feedback.Warn("Invalid ISRS code %v", err)
 				continue
 			}
-			feedback.Info("Found measurements/predictions for %s", curr)
+			logger := findLog(curr)
+			gids = append(gids, curr)
+
 			if !isKnown(curr) {
-				feedback.Warn("Cannot find gauge %q for import", curr)
+				logger.unknown = true
 				continue
 			}
 
 			var referenceCode string
 			if wrm.Reference_code == nil {
-				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
+				logger.assumedZPG = true
 				referenceCode = "ZPG"
 			} else {
 				referenceCode = string(*wrm.Reference_code)
 			}
 
-			badValue := 0
-			newM, newP := 0, 0
 			for _, measure := range wrm.Measure {
 				var unit string
 				if *measure.Measure_code != nts.Measure_code_enumWAL {
-					feedback.Warn("Ignored message with measure_code %s",
-						*measure.Measure_code)
+					logger.ignoreMeasureCode(*measure.Measure_code)
 					continue
 				}
 				if measure.Unit == nil {
-					feedback.Info("'Unit' not specified. Assuming 'cm'")
+					logger.assumedCM++
 					unit = "cm"
 				} else {
 					unit = string(*measure.Unit)
 				}
 
 				if measure.Value == nil {
-					feedback.Warn("Missing mandatory value at %s. Ignored (bad service)",
-						measure.Measuredate.Format(time.RFC3339))
+					logger.missingValues = append(
+						logger.missingValues,
+						measure.Measuredate.Time.Format(time.RFC3339))
 					continue
 				}
 
 				convert, err := rescale(unit)
 				if err != nil {
-					feedback.Error(err.Error())
+					logger.addRescaleError(err)
 					continue
 				}
 				convert(measure.Value)
@@ -409,7 +611,7 @@
 
 				// -99999 is used by some gauges to signal an error
 				if *measure.Value == -99999 {
-					badValue++
+					logger.badValues++
 					continue
 				}
 
@@ -459,7 +661,7 @@
 					case err != nil:
 						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
 					default:
-						newP++
+						logger.predictions++
 					}
 				} else {
 					err = insertGMStmt.QueryRowContext(
@@ -485,19 +687,10 @@
 					case err != nil:
 						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
 					default:
-						newM++
+						logger.measurements++
 					}
 				}
 			}
-			if badValue > 0 {
-				feedback.Warn("Ignored %d measurements with value -99999",
-					badValue)
-			}
-			feedback.Info("Inserted %d measurements for %s",
-				newM, curr)
-			feedback.Info("Inserted %d predictions for %s",
-				newP, curr)
-			gids = append(gids, curr)
 		}
 	}
 	return gids, nil
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1469/aggregate-gm-logs.sql	Tue Oct 26 00:05:28 2021 +0200
@@ -0,0 +1,231 @@
+DO $$
+DECLARE
+  logs CURSOR FOR SELECT
+       lo.* FROM import.imports im JOIN import.import_logs lo
+       ON lo.import_id = im.id
+       WHERE im.kind = 'gm'
+       ORDER BY lo.import_id, lo.time;
+  last_id integer;
+  last_time timestamp with time zone;
+  curr_gid text;
+  value text;
+  joined text;
+  num integer;
+BEGIN
+
+  CREATE TEMP TABLE filtered_logs (
+      import_id integer                  NOT NULL,
+      time      timestamp with time zone NOT NULL,
+      kind      log_type                 NOT NULL default 'info'::log_type,
+      msg       text                     NOT NULL
+  );
+
+  CREATE TEMP TABLE agg_tracker (
+      gid            text    NOT NULL UNIQUE,
+      unknown        boolean NOT NULL DEFAULT false,
+      assume_zpg     boolean NOT NULL DEFAULT false,
+      ign_meas_codes text ARRAY,
+      rescale_errors text ARRAY,
+      missing_values text ARRAY,
+      assume_cm      integer NOT NULL DEFAULT 0,
+      bad_values     integer NOT NULL DEFAULT 0,
+      measurements   integer NOT NULL DEFAULT 0,
+      predictions    integer NOT NULL DEFAULT 0
+  );
+  last_id := -1;
+
+  FOR line IN logs LOOP
+
+    IF last_id <> line.import_id THEN
+
+        -- unknown
+        SELECT INTO joined string_agg(gid, ', ')
+            FROM agg_tracker
+            WHERE unknown;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'warn'::log_type,
+                'Cannot find following gauges: ' || joined);
+        END IF;
+
+        -- assume zpg
+        SELECT INTO joined string_agg(gid, ', ')
+            FROM agg_tracker
+            WHERE assume_zpg;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'warn'::log_type,
+                '''Reference_code'' not specified. Assuming ''ZPG'': ' || joined);
+        END IF;
+
+        -- assume cm
+        SELECT INTO joined string_agg(
+            gid || ' (' || assume_cm || ')', ', ')
+            FROM agg_tracker
+            WHERE assume_cm > 0;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'warn'::log_type,
+                '''Unit'' not specified. Assuming ''cm'': ' || joined);
+        END IF;
+
+        -- bad values
+        SELECT INTO joined string_agg(
+            gid || ' (' || bad_values || ')', ', ')
+            FROM agg_tracker
+            WHERE bad_values > 0;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'warn'::log_type,
+                'Ignored measurements with value -99999: ' || joined);
+        END IF;
+
+
+        -- TODO: ""Missing mandatory values: "
+        -- TODO: "Cannot convert units: "
+
+        -- ignored measure codes
+        SELECT INTO joined string_agg(
+            gid || ' ('
+                || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ')
+                || ')', ', ')
+            FROM agg_tracker
+            WHERE ign_meas_codes IS NOT NULL;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'warn'::log_type,
+                'Ignored measure codes: ' || joined);
+        END IF;
+
+        -- predictions
+        SELECT INTO joined string_agg(
+            gid || ' (' || predictions || ')', ', ')
+            FROM agg_tracker
+            WHERE predictions > 0;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'info'::log_type,
+                'New predictions: ' || joined);
+        END IF;
+
+        -- measurements
+        SELECT INTO joined string_agg(
+            gid || ' (' || measurements || ')', ', ')
+            FROM agg_tracker
+            WHERE measurements > 0;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'info'::log_type,
+                'New measurements: ' || joined);
+        END IF;
+
+        -- nothing changed
+        SELECT INTO joined string_agg(gid, ', ')
+            FROM agg_tracker
+            WHERE measurements = 0 and predictions = 0;
+
+        IF joined IS NOT NULL THEN
+            INSERT INTO filtered_logs VALUES (
+                last_id, last_time, 'info'::log_type,
+                'No changes for: ' || joined);
+        END IF;
+
+
+       TRUNCATE agg_tracker;
+       last_id := line.import_id;
+
+    ELSIF line.msg ~ '^Found measurements/predictions for ....................$' THEN
+       curr_gid := substring(
+          line.msg from '^Found measurements/predictions for (....................)$');
+
+    ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN
+        num := substring(
+            line.msg from '^Inserted (\d+)')::integer;
+        value := substring(
+            line.msg from '^Inserted \d+ measurements for (....................)$');
+
+        INSERT INTO agg_tracker (gid, measurements) VALUES (value, num)
+          ON CONFLICT (gid)
+          DO UPDATE SET measurements = EXCLUDED.measurements + num;
+
+    ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN
+        num := substring(
+            line.msg from '^Ignored (\d+)')::integer;
+
+        INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num)
+          ON CONFLICT (gid)
+          DO UPDATE SET bad_values = EXCLUDED.bad_values + num;
+
+    ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN
+        num := substring(
+            line.msg from '^Inserted (\d+)')::integer;
+        value := substring(
+            line.msg from '^Inserted \d+ predictions for (....................)$');
+
+        INSERT INTO agg_tracker (gid, predictions) VALUES (value, num)
+          ON CONFLICT (gid)
+          DO UPDATE SET predictions = EXCLUDED.predictions + num;
+
+    ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN
+
+        INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true)
+          ON CONFLICT (gid)
+          DO UPDATE SET assume_zpg = true;
+
+    ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN
+        value := substring(
+            line.msg from '^Cannot find gauge "(....................)" for import$');
+
+        INSERT INTO agg_tracker (gid, unknown) VALUES (value, true)
+          ON CONFLICT (gid)
+          DO UPDATE SET unknown = true;
+
+    ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN
+
+        INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1)
+          ON CONFLICT (gid)
+          DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1;
+
+    ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN
+        value := substring(
+            line.msg from '^Ignored message with measure_code (.+)$');
+
+        INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value])
+          ON CONFLICT (gid)
+          DO UPDATE SET ign_meas_codes =
+            array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes);
+
+    ELSIF line.msg ~ '^Importing gauge measurements took ' THEN
+
+        -- TODO: Flush aggregation because its likely the last entry.
+    ELSE
+        -- TODO: Handle 'Missing mandatory value at %s. Ignored (bad service)'
+        -- TODO: Handle "unknown unit '%s'"
+        -- Not handled, copy through ..
+        INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg);
+
+    END IF;
+
+    last_time := line.time;
+  END LOOP;
+
+  -- TODO: Handle remains from agg_tracker
+
+  -- DELETE FROM import.import_logs WHERE import_id IN (
+  --  SELECT id FROM import.imports WHERE kind = 'gm')
+
+  -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs;
+
+  DROP TABLE filtered_logs;
+  DROP TABLE agg_tracker;
+END $$;
+-- VACUUM FULL;
+
+