# HG changeset patch # User Sascha L. Teichmann # Date 1635195945 -7200 # Node ID ff95d3603e4dcbbdf2eaab07a7e81badff19af67 # Parent 2e5e51288a6cef3a90095394ce74b8c7410dec90 WIP: More aggregation. diff -r 2e5e51288a6c -r ff95d3603e4d schema/updates/1469/aggregate-gm-logs.sql --- a/schema/updates/1469/aggregate-gm-logs.sql Mon Oct 25 21:35:54 2021 +0200 +++ b/schema/updates/1469/aggregate-gm-logs.sql Mon Oct 25 23:05:45 2021 +0200 @@ -8,7 +8,7 @@ last_id integer; last_time timestamp with time zone; curr_gid text; - gauge text; + value text; joined text; num integer; BEGIN @@ -85,9 +85,22 @@ END IF; - -- TODO: ""Missing mandatory values: " - -- TODO: "Cannot convert units: " - -- TODO: "Ignored measure codes:" + -- TODO: ""Missing mandatory values: " + -- TODO: "Cannot convert units: " + + -- ignored measure codes + SELECT INTO joined string_agg( + gid || ' (' + || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ') + || ')', ', ') + FROM agg_tracker + WHERE ign_meas_codes IS NOT NULL; + + IF joined IS NOT NULL THEN + INSERT INTO filtered_logs VALUES ( + last_id, last_time, 'warn'::log_type, + 'Ignored measure codes: ' || joined); + END IF; -- predictions SELECT INTO joined string_agg( @@ -135,10 +148,10 @@ ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN num := substring( line.msg from '^Inserted (\d+)')::integer; - gauge := substring( + value := substring( line.msg from '^Inserted \d+ measurements for (....................)$'); - INSERT INTO agg_tracker (gid, measurements) VALUES (gauge, num) + INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) ON CONFLICT (gid) DO UPDATE SET measurements = EXCLUDED.measurements + num; @@ -153,39 +166,47 @@ ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN num := substring( line.msg from '^Inserted (\d+)')::integer; - gauge := substring( + value := substring( line.msg from '^Inserted \d+ predictions for (....................)$'); - INSERT INTO agg_tracker (gid, predictions) VALUES (gauge, num) + INSERT INTO agg_tracker (gid, predictions) VALUES (value, num) ON CONFLICT (gid) DO UPDATE SET predictions = EXCLUDED.predictions + num; ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN - INSERT INTO agg_tracker (gid, assume_zpg) VALUES (gauge, true) + INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true) ON CONFLICT (gid) DO UPDATE SET assume_zpg = true; ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN - gauge := substring( + value := substring( line.msg from '^Cannot find gauge "(....................)" for import$'); - INSERT INTO agg_tracker (gid, unknown) VALUES (gauge, true) + INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) ON CONFLICT (gid) DO UPDATE SET unknown = true; ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN - INSERT INTO agg_tracker (gid, assume_cm) VALUES (gauge, 1) + INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1) ON CONFLICT (gid) DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1; + ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN + value := substring( + line.msg from '^Ignored message with measure_code (.+)$'); + + INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value]) + ON CONFLICT (gid) + DO UPDATE SET ign_meas_codes = + array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes); + ELSIF line.msg ~ '^Importing gauge measurements took ' THEN -- TODO: Flush aggregation because its likely the last entry. ELSE -- TODO: Handle 'Missing mandatory value at %s. Ignored (bad service)' - -- TODO: Handle 'Ignored message with measure_code %s' -- TODO: Handle "unknown unit '%s'" -- Not handled, copy through .. INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg);