view schema/updates/1469/aggregate-gm-logs.sql @ 5541:29804c8e817d aggregate-gm-import-logging

WIP: Handle rest correctly.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Oct 2021 02:02:23 +0200
parents 4d815f295e57
children 0acb06fc77e6
line wrap: on
line source

DO $$
DECLARE
  logs NO SCROLL CURSOR FOR SELECT
       lo.* FROM import.imports im JOIN import.import_logs lo
       ON lo.import_id = im.id
       WHERE im.kind = 'gm'
       ORDER BY lo.import_id, lo.time;
  last_id integer;
  last_time timestamp with time zone;
  curr_gid text;
  value text;
  joined text;
  num integer;
  line record;
  hold record;
  done boolean;
BEGIN

  CREATE TEMP TABLE filtered_logs (
      import_id integer                  NOT NULL,
      time      timestamp with time zone NOT NULL,
      kind      log_type                 NOT NULL default 'info'::log_type,
      msg       text                     NOT NULL
  );

  CREATE TEMP TABLE agg_tracker (
      gid            text    NOT NULL UNIQUE,
      unknown        boolean NOT NULL DEFAULT false,
      assume_zpg     boolean NOT NULL DEFAULT false,
      ign_meas_codes text ARRAY,
      rescale_errors text ARRAY,
      missing_values text ARRAY,
      assume_cm      integer NOT NULL DEFAULT 0,
      bad_values     integer NOT NULL DEFAULT 0,
      measurements   integer NOT NULL DEFAULT 0,
      predictions    integer NOT NULL DEFAULT 0
  );

  last_id := -1;
  done := false;
  hold := NULL;

  OPEN logs;

  LOOP

    FETCH logs INTO line;

    IF NOT FOUND THEN
        done := true;
    END IF;

    -- RAISE NOTICE '%', line.msg;

    IF done OR last_id <> line.import_id THEN

        -- unknown
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE unknown;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Cannot find following gauges: ' || joined);
        END IF;

        -- assume zpg
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE assume_zpg;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                '''Reference_code'' not specified. Assuming ''ZPG'': ' || joined);
        END IF;

        -- assume cm
        SELECT INTO joined string_agg(
            gid || ' (' || assume_cm || ')', ', ')
            FROM agg_tracker
            WHERE assume_cm > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                '''Unit'' not specified. Assuming ''cm'': ' || joined);
        END IF;

        -- bad values
        SELECT INTO joined string_agg(
            gid || ' (' || bad_values || ')', ', ')
            FROM agg_tracker
            WHERE bad_values > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Ignored measurements with value -99999: ' || joined);
        END IF;

        -- missing mandatory values
        SELECT INTO joined string_agg(
            gid || ' ('
                || array_to_string(ARRAY(SELECT DISTINCT unnest(missing_values)), '; ')
                || ')', ', ')
            FROM agg_tracker
            WHERE missing_values IS NOT NULL;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Missing mandatory values: ' || joined);
        END IF;

        -- convert units
        SELECT INTO joined string_agg(
            gid || ' ('
                || array_to_string(ARRAY(SELECT DISTINCT unnest(rescale_errors)), '; ')
                || ')', ', ')
            FROM agg_tracker
            WHERE rescale_errors IS NOT NULL;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'error'::log_type,
                'Cannot convert units: ' || joined);
        END IF;

        -- ignored measure codes
        SELECT INTO joined string_agg(
            gid || ' ('
                || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ')
                || ')', ', ')
            FROM agg_tracker
            WHERE ign_meas_codes IS NOT NULL;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Ignored measure codes: ' || joined);
        END IF;

        -- predictions
        SELECT INTO joined string_agg(
            gid || ' (' || predictions || ')', ', ')
            FROM agg_tracker
            WHERE predictions > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'New predictions: ' || joined);
        END IF;

        -- measurements
        SELECT INTO joined string_agg(
            gid || ' (' || measurements || ')', ', ')
            FROM agg_tracker
            WHERE measurements > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'New measurements: ' || joined);
        END IF;

        -- nothing changed
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE measurements = 0 and predictions = 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'No changes for: ' || joined);
        END IF;

        IF hold is NOT NULL THEN
            INSERT INTO filtered_logs VALUES (hold.import_id, hold.time, hold.kind, hold.msg);
            hold := NULL;
        END IF;

        IF done THEN
            EXIT;
        END IF;

        -- reset aggregate table
        TRUNCATE agg_tracker;
        last_id := line.import_id;

    ELSIF line.msg ~ '^Found measurements/predictions for ....................$' THEN
       curr_gid := substring(
          line.msg from '^Found measurements/predictions for (....................)$');

    ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN
        num := substring(
            line.msg from '^Inserted (\d+)')::integer;
        value := substring(
            line.msg from '^Inserted \d+ measurements for (....................)$');

        INSERT INTO agg_tracker (gid, measurements) VALUES (value, num)
          ON CONFLICT (gid)
          DO UPDATE SET measurements = EXCLUDED.measurements + num;

    ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN
        num := substring(
            line.msg from '^Inserted (\d+)')::integer;
        value := substring(
            line.msg from '^Inserted \d+ predictions for (....................)$');

        INSERT INTO agg_tracker (gid, predictions) VALUES (value, num)
          ON CONFLICT (gid)
          DO UPDATE SET predictions = EXCLUDED.predictions + num;

    ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN

        INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true)
          ON CONFLICT (gid)
          DO UPDATE SET assume_zpg = true;

    ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN
        num := substring(
            line.msg from '^Ignored (\d+)')::integer;

        INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num)
          ON CONFLICT (gid)
          DO UPDATE SET bad_values = EXCLUDED.bad_values + num;

    ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN
        value := substring(
            line.msg from '^Cannot find gauge "(....................)" for import$');

        INSERT INTO agg_tracker (gid, unknown) VALUES (value, true)
          ON CONFLICT (gid)
          DO UPDATE SET unknown = true;

    ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN

        INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1)
          ON CONFLICT (gid)
          DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1;

    ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN
        value := substring(
            line.msg from '^Ignored message with measure_code (.+)$');

        INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value])
          ON CONFLICT (gid)
          DO UPDATE SET ign_meas_codes =
            array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes);

    ELSIF line.msg ~ '^Missing mandatory value at [^.]+\.' THEN
        value := substring(
            line.msg from '^Missing mandatory value at ([^.]+)\.');

        INSERT INTO agg_tracker (gid, missing_values) VALUES (curr_gid, array[value])
          ON CONFLICT (gid)
          DO UPDATE SET missing_values =
            array_cat(agg_tracker.missing_values, EXCLUDED.missing_values);

    ELSIF line.msg ~ '^unknown unit ''[^'']*''' THEN

        value := substring(
            line.msg from '^unknown unit ''([^'']*)''');

        INSERT INTO agg_tracker (gid, rescale_errors) VALUES (curr_gid, array[value])
          ON CONFLICT (gid)
          DO UPDATE SET rescale_errors =
            array_cat(agg_tracker.rescale_errors, EXCLUDED.rescale_errors);

    ELSIF line.msg ~ '^Importing gauge measurements took ' THEN
        -- Likely the last entry of this import.
        hold := line;
    ELSE
        -- Not handled, copy through ..
        -- RAISE NOTICE '%', line.msg;
        INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg);

    END IF;

    last_time := line.time;
  END LOOP;

  -- DELETE FROM import.import_logs WHERE import_id IN (
  --  SELECT id FROM import.imports WHERE kind = 'gm')

  -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs;

  SELECT INTO num count(*) FROM filtered_logs;

  RAISE NOTICE 'number of new gm log message entries: %', num;

  DROP TABLE filtered_logs;
  DROP TABLE agg_tracker;
END $$;
-- VACUUM FULL;