Mercurial > gemma
view schema/updates/1469/aggregate-gm-logs.sql @ 5538:ff95d3603e4d aggregate-gm-import-logging
WIP: More aggregation.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 25 Oct 2021 23:05:45 +0200 |
parents | dc2000b807bd |
children | 4d815f295e57 |
line wrap: on
line source
DO $$ DECLARE logs CURSOR FOR SELECT lo.* FROM import.imports im JOIN import.import_logs lo ON lo.import_id = im.id WHERE im.kind = 'gm' ORDER BY lo.import_id, lo.time; last_id integer; last_time timestamp with time zone; curr_gid text; value text; joined text; num integer; BEGIN CREATE TEMP TABLE filtered_logs ( import_id integer NOT NULL, time timestamp with time zone NOT NULL, kind log_type NOT NULL default 'info'::log_type, msg text NOT NULL ); CREATE TEMP TABLE agg_tracker ( gid text NOT NULL UNIQUE, unknown boolean NOT NULL DEFAULT false, assume_zpg boolean NOT NULL DEFAULT false, ign_meas_codes text ARRAY, rescale_errors text ARRAY, missing_values text ARRAY, assume_cm integer NOT NULL DEFAULT 0, bad_values integer NOT NULL DEFAULT 0, measurements integer NOT NULL DEFAULT 0, predictions integer NOT NULL DEFAULT 0 ); last_id := -1; FOR line IN logs LOOP IF last_id <> line.import_id THEN -- unknown SELECT INTO joined string_agg(gid, ', ') FROM agg_tracker WHERE unknown; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'warn'::log_type, 'Cannot find following gauges: ' || joined); END IF; -- assume zpg SELECT INTO joined string_agg(gid, ', ') FROM agg_tracker WHERE assume_zpg; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'warn'::log_type, '''Reference_code'' not specified. Assuming ''ZPG'': ' || joined); END IF; -- assume cm SELECT INTO joined string_agg( gid || ' (' || assume_cm || ')', ', ') FROM agg_tracker WHERE assume_cm > 0; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'warn'::log_type, '''Unit'' not specified. Assuming ''cm'': ' || joined); END IF; -- bad values SELECT INTO joined string_agg( gid || ' (' || bad_values || ')', ', ') FROM agg_tracker WHERE bad_values > 0; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'warn'::log_type, 'Ignored measurements with value -99999: ' || joined); END IF; -- TODO: ""Missing mandatory values: " -- TODO: "Cannot convert units: " -- ignored measure codes SELECT INTO joined string_agg( gid || ' (' || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ') || ')', ', ') FROM agg_tracker WHERE ign_meas_codes IS NOT NULL; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'warn'::log_type, 'Ignored measure codes: ' || joined); END IF; -- predictions SELECT INTO joined string_agg( gid || ' (' || predictions || ')', ', ') FROM agg_tracker WHERE predictions > 0; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'info'::log_type, 'New predictions: ' || joined); END IF; -- measurements SELECT INTO joined string_agg( gid || ' (' || measurements || ')', ', ') FROM agg_tracker WHERE measurements > 0; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'info'::log_type, 'New measurements: ' || joined); END IF; -- nothing changed SELECT INTO joined string_agg(gid, ', ') FROM agg_tracker WHERE measurements = 0 and predictions = 0; IF joined IS NOT NULL THEN INSERT INTO filtered_logs VALUES ( last_id, last_time, 'info'::log_type, 'No changes for: ' || joined); END IF; TRUNCATE agg_tracker; last_id := line.import_id; ELSIF line.msg ~ '^Found measurements/predictions for ....................$' THEN curr_gid := substring( line.msg from '^Found measurements/predictions for (....................)$'); ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN num := substring( line.msg from '^Inserted (\d+)')::integer; value := substring( line.msg from '^Inserted \d+ measurements for (....................)$'); INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) ON CONFLICT (gid) DO UPDATE SET measurements = EXCLUDED.measurements + num; ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN num := substring( line.msg from '^Ignored (\d+)')::integer; INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num) ON CONFLICT (gid) DO UPDATE SET bad_values = EXCLUDED.bad_values + num; ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN num := substring( line.msg from '^Inserted (\d+)')::integer; value := substring( line.msg from '^Inserted \d+ predictions for (....................)$'); INSERT INTO agg_tracker (gid, predictions) VALUES (value, num) ON CONFLICT (gid) DO UPDATE SET predictions = EXCLUDED.predictions + num; ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true) ON CONFLICT (gid) DO UPDATE SET assume_zpg = true; ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN value := substring( line.msg from '^Cannot find gauge "(....................)" for import$'); INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) ON CONFLICT (gid) DO UPDATE SET unknown = true; ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1) ON CONFLICT (gid) DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1; ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN value := substring( line.msg from '^Ignored message with measure_code (.+)$'); INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value]) ON CONFLICT (gid) DO UPDATE SET ign_meas_codes = array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes); ELSIF line.msg ~ '^Importing gauge measurements took ' THEN -- TODO: Flush aggregation because its likely the last entry. ELSE -- TODO: Handle 'Missing mandatory value at %s. Ignored (bad service)' -- TODO: Handle "unknown unit '%s'" -- Not handled, copy through .. INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg); END IF; last_time := line.time; END LOOP; -- TODO: Handle remains from agg_tracker -- DELETE FROM import.import_logs WHERE import_id IN ( -- SELECT id FROM import.imports WHERE kind = 'gm') -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs; DROP TABLE filtered_logs; DROP TABLE agg_tracker; END $$; -- VACUUM FULL;