view schema/updates/1469/aggregate-gm-logs.sql @ 5538:ff95d3603e4d aggregate-gm-import-logging

WIP: More aggregation.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 25 Oct 2021 23:05:45 +0200
parents dc2000b807bd
children 4d815f295e57
line wrap: on
line source

DO $$
DECLARE
  logs CURSOR FOR SELECT
       lo.* FROM import.imports im JOIN import.import_logs lo
       ON lo.import_id = im.id
       WHERE im.kind = 'gm'
       ORDER BY lo.import_id, lo.time;
  last_id integer;
  last_time timestamp with time zone;
  curr_gid text;
  value text;
  joined text;
  num integer;
BEGIN

  CREATE TEMP TABLE filtered_logs (
      import_id integer                  NOT NULL,
      time      timestamp with time zone NOT NULL,
      kind      log_type                 NOT NULL default 'info'::log_type,
      msg       text                     NOT NULL
  );

  CREATE TEMP TABLE agg_tracker (
      gid            text    NOT NULL UNIQUE,
      unknown        boolean NOT NULL DEFAULT false,
      assume_zpg     boolean NOT NULL DEFAULT false,
      ign_meas_codes text ARRAY,
      rescale_errors text ARRAY,
      missing_values text ARRAY,
      assume_cm      integer NOT NULL DEFAULT 0,
      bad_values     integer NOT NULL DEFAULT 0,
      measurements   integer NOT NULL DEFAULT 0,
      predictions    integer NOT NULL DEFAULT 0
  );
  last_id := -1;

  FOR line IN logs LOOP

    IF last_id <> line.import_id THEN

        -- unknown
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE unknown;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Cannot find following gauges: ' || joined);
        END IF;

        -- assume zpg
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE assume_zpg;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                '''Reference_code'' not specified. Assuming ''ZPG'': ' || joined);
        END IF;

        -- assume cm
        SELECT INTO joined string_agg(
            gid || ' (' || assume_cm || ')', ', ')
            FROM agg_tracker
            WHERE assume_cm > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                '''Unit'' not specified. Assuming ''cm'': ' || joined);
        END IF;

        -- bad values
        SELECT INTO joined string_agg(
            gid || ' (' || bad_values || ')', ', ')
            FROM agg_tracker
            WHERE bad_values > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Ignored measurements with value -99999: ' || joined);
        END IF;


        -- TODO: ""Missing mandatory values: "
        -- TODO: "Cannot convert units: "

        -- ignored measure codes
        SELECT INTO joined string_agg(
            gid || ' ('
                || array_to_string(ARRAY(SELECT DISTINCT unnest(ign_meas_codes)), '; ')
                || ')', ', ')
            FROM agg_tracker
            WHERE ign_meas_codes IS NOT NULL;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'warn'::log_type,
                'Ignored measure codes: ' || joined);
        END IF;

        -- predictions
        SELECT INTO joined string_agg(
            gid || ' (' || predictions || ')', ', ')
            FROM agg_tracker
            WHERE predictions > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'New predictions: ' || joined);
        END IF;

        -- measurements
        SELECT INTO joined string_agg(
            gid || ' (' || measurements || ')', ', ')
            FROM agg_tracker
            WHERE measurements > 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'New measurements: ' || joined);
        END IF;

        -- nothing changed
        SELECT INTO joined string_agg(gid, ', ')
            FROM agg_tracker
            WHERE measurements = 0 and predictions = 0;

        IF joined IS NOT NULL THEN
            INSERT INTO filtered_logs VALUES (
                last_id, last_time, 'info'::log_type,
                'No changes for: ' || joined);
        END IF;


       TRUNCATE agg_tracker;
       last_id := line.import_id;

    ELSIF line.msg ~ '^Found measurements/predictions for ....................$' THEN
       curr_gid := substring(
          line.msg from '^Found measurements/predictions for (....................)$');

    ELSIF line.msg ~ '^Inserted \d+ measurements for ....................$' THEN
        num := substring(
            line.msg from '^Inserted (\d+)')::integer;
        value := substring(
            line.msg from '^Inserted \d+ measurements for (....................)$');

        INSERT INTO agg_tracker (gid, measurements) VALUES (value, num)
          ON CONFLICT (gid)
          DO UPDATE SET measurements = EXCLUDED.measurements + num;

    ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN
        num := substring(
            line.msg from '^Ignored (\d+)')::integer;

        INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num)
          ON CONFLICT (gid)
          DO UPDATE SET bad_values = EXCLUDED.bad_values + num;

    ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN
        num := substring(
            line.msg from '^Inserted (\d+)')::integer;
        value := substring(
            line.msg from '^Inserted \d+ predictions for (....................)$');

        INSERT INTO agg_tracker (gid, predictions) VALUES (value, num)
          ON CONFLICT (gid)
          DO UPDATE SET predictions = EXCLUDED.predictions + num;

    ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN

        INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true)
          ON CONFLICT (gid)
          DO UPDATE SET assume_zpg = true;

    ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN
        value := substring(
            line.msg from '^Cannot find gauge "(....................)" for import$');

        INSERT INTO agg_tracker (gid, unknown) VALUES (value, true)
          ON CONFLICT (gid)
          DO UPDATE SET unknown = true;

    ELSIF line.msg ~ '^''Unit'' not specified. Assuming ''cm''$' THEN

        INSERT INTO agg_tracker (gid, assume_cm) VALUES (curr_gid, 1)
          ON CONFLICT (gid)
          DO UPDATE SET assume_cm = EXCLUDED.assume_cm + 1;

    ELSIF line.msg ~ '^Ignored message with measure_code .+' THEN
        value := substring(
            line.msg from '^Ignored message with measure_code (.+)$');

        INSERT INTO agg_tracker (gid, ign_meas_codes) VALUES (curr_gid, array[value])
          ON CONFLICT (gid)
          DO UPDATE SET ign_meas_codes =
            array_cat(agg_tracker.ign_meas_codes, EXCLUDED.ign_meas_codes);

    ELSIF line.msg ~ '^Importing gauge measurements took ' THEN

        -- TODO: Flush aggregation because its likely the last entry.
    ELSE
        -- TODO: Handle 'Missing mandatory value at %s. Ignored (bad service)'
        -- TODO: Handle "unknown unit '%s'"
        -- Not handled, copy through ..
        INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg);

    END IF;

    last_time := line.time;
  END LOOP;

  -- TODO: Handle remains from agg_tracker

  -- DELETE FROM import.import_logs WHERE import_id IN (
  --  SELECT id FROM import.imports WHERE kind = 'gm')

  -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs;

  DROP TABLE filtered_logs;
  DROP TABLE agg_tracker;
END $$;
-- VACUUM FULL;