comparison schema/updates/1469/aggregate-gm-logs.sql @ 5541:29804c8e817d aggregate-gm-import-logging

WIP: Handle rest correctly.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 26 Oct 2021 02:02:23 +0200
parents 4d815f295e57
children 0acb06fc77e6
comparison
equal deleted inserted replaced
5540:4d815f295e57 5541:29804c8e817d
9 last_time timestamp with time zone; 9 last_time timestamp with time zone;
10 curr_gid text; 10 curr_gid text;
11 value text; 11 value text;
12 joined text; 12 joined text;
13 num integer; 13 num integer;
14 line record;
15 hold record;
16 done boolean;
14 BEGIN 17 BEGIN
15 18
16 CREATE TEMP TABLE filtered_logs ( 19 CREATE TEMP TABLE filtered_logs (
17 import_id integer NOT NULL, 20 import_id integer NOT NULL,
18 time timestamp with time zone NOT NULL, 21 time timestamp with time zone NOT NULL,
30 assume_cm integer NOT NULL DEFAULT 0, 33 assume_cm integer NOT NULL DEFAULT 0,
31 bad_values integer NOT NULL DEFAULT 0, 34 bad_values integer NOT NULL DEFAULT 0,
32 measurements integer NOT NULL DEFAULT 0, 35 measurements integer NOT NULL DEFAULT 0,
33 predictions integer NOT NULL DEFAULT 0 36 predictions integer NOT NULL DEFAULT 0
34 ); 37 );
38
35 last_id := -1; 39 last_id := -1;
36 40 done := false;
37 FOR line IN logs LOOP 41 hold := NULL;
42
43 OPEN logs;
44
45 LOOP
46
47 FETCH logs INTO line;
48
49 IF NOT FOUND THEN
50 done := true;
51 END IF;
38 52
39 -- RAISE NOTICE '%', line.msg; 53 -- RAISE NOTICE '%', line.msg;
40 54
41 IF last_id <> line.import_id THEN 55 IF done OR last_id <> line.import_id THEN
42 56
43 -- unknown 57 -- unknown
44 SELECT INTO joined string_agg(gid, ', ') 58 SELECT INTO joined string_agg(gid, ', ')
45 FROM agg_tracker 59 FROM agg_tracker
46 WHERE unknown; 60 WHERE unknown;
83 IF joined IS NOT NULL THEN 97 IF joined IS NOT NULL THEN
84 INSERT INTO filtered_logs VALUES ( 98 INSERT INTO filtered_logs VALUES (
85 last_id, last_time, 'warn'::log_type, 99 last_id, last_time, 'warn'::log_type,
86 'Ignored measurements with value -99999: ' || joined); 100 'Ignored measurements with value -99999: ' || joined);
87 END IF; 101 END IF;
88
89 102
90 -- missing mandatory values 103 -- missing mandatory values
91 SELECT INTO joined string_agg( 104 SELECT INTO joined string_agg(
92 gid || ' (' 105 gid || ' ('
93 || array_to_string(ARRAY(SELECT DISTINCT unnest(missing_values)), '; ') 106 || array_to_string(ARRAY(SELECT DISTINCT unnest(missing_values)), '; ')
162 INSERT INTO filtered_logs VALUES ( 175 INSERT INTO filtered_logs VALUES (
163 last_id, last_time, 'info'::log_type, 176 last_id, last_time, 'info'::log_type,
164 'No changes for: ' || joined); 177 'No changes for: ' || joined);
165 END IF; 178 END IF;
166 179
180 IF hold is NOT NULL THEN
181 INSERT INTO filtered_logs VALUES (hold.import_id, hold.time, hold.kind, hold.msg);
182 hold := NULL;
183 END IF;
184
185 IF done THEN
186 EXIT;
187 END IF;
167 188
168 -- reset aggregate table 189 -- reset aggregate table
169 TRUNCATE agg_tracker; 190 TRUNCATE agg_tracker;
170 last_id := line.import_id; 191 last_id := line.import_id;
171 192
181 202
182 INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) 203 INSERT INTO agg_tracker (gid, measurements) VALUES (value, num)
183 ON CONFLICT (gid) 204 ON CONFLICT (gid)
184 DO UPDATE SET measurements = EXCLUDED.measurements + num; 205 DO UPDATE SET measurements = EXCLUDED.measurements + num;
185 206
207 ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN
208 num := substring(
209 line.msg from '^Inserted (\d+)')::integer;
210 value := substring(
211 line.msg from '^Inserted \d+ predictions for (....................)$');
212
213 INSERT INTO agg_tracker (gid, predictions) VALUES (value, num)
214 ON CONFLICT (gid)
215 DO UPDATE SET predictions = EXCLUDED.predictions + num;
216
217 ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN
218
219 INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true)
220 ON CONFLICT (gid)
221 DO UPDATE SET assume_zpg = true;
222
186 ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN 223 ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN
187 num := substring( 224 num := substring(
188 line.msg from '^Ignored (\d+)')::integer; 225 line.msg from '^Ignored (\d+)')::integer;
189 226
190 INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num) 227 INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num)
191 ON CONFLICT (gid) 228 ON CONFLICT (gid)
192 DO UPDATE SET bad_values = EXCLUDED.bad_values + num; 229 DO UPDATE SET bad_values = EXCLUDED.bad_values + num;
193 230
194 ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN
195 num := substring(
196 line.msg from '^Inserted (\d+)')::integer;
197 value := substring(
198 line.msg from '^Inserted \d+ predictions for (....................)$');
199
200 INSERT INTO agg_tracker (gid, predictions) VALUES (value, num)
201 ON CONFLICT (gid)
202 DO UPDATE SET predictions = EXCLUDED.predictions + num;
203
204 ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN
205
206 INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true)
207 ON CONFLICT (gid)
208 DO UPDATE SET assume_zpg = true;
209
210 ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN 231 ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN
211 value := substring( 232 value := substring(
212 line.msg from '^Cannot find gauge "(....................)" for import$'); 233 line.msg from '^Cannot find gauge "(....................)" for import$');
213 234
214 INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) 235 INSERT INTO agg_tracker (gid, unknown) VALUES (value, true)
248 ON CONFLICT (gid) 269 ON CONFLICT (gid)
249 DO UPDATE SET rescale_errors = 270 DO UPDATE SET rescale_errors =
250 array_cat(agg_tracker.rescale_errors, EXCLUDED.rescale_errors); 271 array_cat(agg_tracker.rescale_errors, EXCLUDED.rescale_errors);
251 272
252 ELSIF line.msg ~ '^Importing gauge measurements took ' THEN 273 ELSIF line.msg ~ '^Importing gauge measurements took ' THEN
253 274 -- Likely the last entry of this import.
254 -- TODO: Flush aggregation because its likely the last entry. 275 hold := line;
255 ELSE 276 ELSE
256 -- Not handled, copy through .. 277 -- Not handled, copy through ..
257 RAISE NOTICE '%', line.msg; 278 -- RAISE NOTICE '%', line.msg;
258 INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg); 279 INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg);
259 280
260 END IF; 281 END IF;
261 282
262 last_time := line.time; 283 last_time := line.time;
263 END LOOP; 284 END LOOP;
264
265 -- TODO: Handle remains from agg_tracker
266 285
267 -- DELETE FROM import.import_logs WHERE import_id IN ( 286 -- DELETE FROM import.import_logs WHERE import_id IN (
268 -- SELECT id FROM import.imports WHERE kind = 'gm') 287 -- SELECT id FROM import.imports WHERE kind = 'gm')
269 288
270 -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs; 289 -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs;