Mercurial > gemma
comparison schema/updates/1469/aggregate-gm-logs.sql @ 5541:29804c8e817d aggregate-gm-import-logging
WIP: Handle rest correctly.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 26 Oct 2021 02:02:23 +0200 |
parents | 4d815f295e57 |
children | 0acb06fc77e6 |
comparison
equal
deleted
inserted
replaced
5540:4d815f295e57 | 5541:29804c8e817d |
---|---|
9 last_time timestamp with time zone; | 9 last_time timestamp with time zone; |
10 curr_gid text; | 10 curr_gid text; |
11 value text; | 11 value text; |
12 joined text; | 12 joined text; |
13 num integer; | 13 num integer; |
14 line record; | |
15 hold record; | |
16 done boolean; | |
14 BEGIN | 17 BEGIN |
15 | 18 |
16 CREATE TEMP TABLE filtered_logs ( | 19 CREATE TEMP TABLE filtered_logs ( |
17 import_id integer NOT NULL, | 20 import_id integer NOT NULL, |
18 time timestamp with time zone NOT NULL, | 21 time timestamp with time zone NOT NULL, |
30 assume_cm integer NOT NULL DEFAULT 0, | 33 assume_cm integer NOT NULL DEFAULT 0, |
31 bad_values integer NOT NULL DEFAULT 0, | 34 bad_values integer NOT NULL DEFAULT 0, |
32 measurements integer NOT NULL DEFAULT 0, | 35 measurements integer NOT NULL DEFAULT 0, |
33 predictions integer NOT NULL DEFAULT 0 | 36 predictions integer NOT NULL DEFAULT 0 |
34 ); | 37 ); |
38 | |
35 last_id := -1; | 39 last_id := -1; |
36 | 40 done := false; |
37 FOR line IN logs LOOP | 41 hold := NULL; |
42 | |
43 OPEN logs; | |
44 | |
45 LOOP | |
46 | |
47 FETCH logs INTO line; | |
48 | |
49 IF NOT FOUND THEN | |
50 done := true; | |
51 END IF; | |
38 | 52 |
39 -- RAISE NOTICE '%', line.msg; | 53 -- RAISE NOTICE '%', line.msg; |
40 | 54 |
41 IF last_id <> line.import_id THEN | 55 IF done OR last_id <> line.import_id THEN |
42 | 56 |
43 -- unknown | 57 -- unknown |
44 SELECT INTO joined string_agg(gid, ', ') | 58 SELECT INTO joined string_agg(gid, ', ') |
45 FROM agg_tracker | 59 FROM agg_tracker |
46 WHERE unknown; | 60 WHERE unknown; |
83 IF joined IS NOT NULL THEN | 97 IF joined IS NOT NULL THEN |
84 INSERT INTO filtered_logs VALUES ( | 98 INSERT INTO filtered_logs VALUES ( |
85 last_id, last_time, 'warn'::log_type, | 99 last_id, last_time, 'warn'::log_type, |
86 'Ignored measurements with value -99999: ' || joined); | 100 'Ignored measurements with value -99999: ' || joined); |
87 END IF; | 101 END IF; |
88 | |
89 | 102 |
90 -- missing mandatory values | 103 -- missing mandatory values |
91 SELECT INTO joined string_agg( | 104 SELECT INTO joined string_agg( |
92 gid || ' (' | 105 gid || ' (' |
93 || array_to_string(ARRAY(SELECT DISTINCT unnest(missing_values)), '; ') | 106 || array_to_string(ARRAY(SELECT DISTINCT unnest(missing_values)), '; ') |
162 INSERT INTO filtered_logs VALUES ( | 175 INSERT INTO filtered_logs VALUES ( |
163 last_id, last_time, 'info'::log_type, | 176 last_id, last_time, 'info'::log_type, |
164 'No changes for: ' || joined); | 177 'No changes for: ' || joined); |
165 END IF; | 178 END IF; |
166 | 179 |
180 IF hold is NOT NULL THEN | |
181 INSERT INTO filtered_logs VALUES (hold.import_id, hold.time, hold.kind, hold.msg); | |
182 hold := NULL; | |
183 END IF; | |
184 | |
185 IF done THEN | |
186 EXIT; | |
187 END IF; | |
167 | 188 |
168 -- reset aggregate table | 189 -- reset aggregate table |
169 TRUNCATE agg_tracker; | 190 TRUNCATE agg_tracker; |
170 last_id := line.import_id; | 191 last_id := line.import_id; |
171 | 192 |
181 | 202 |
182 INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) | 203 INSERT INTO agg_tracker (gid, measurements) VALUES (value, num) |
183 ON CONFLICT (gid) | 204 ON CONFLICT (gid) |
184 DO UPDATE SET measurements = EXCLUDED.measurements + num; | 205 DO UPDATE SET measurements = EXCLUDED.measurements + num; |
185 | 206 |
207 ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN | |
208 num := substring( | |
209 line.msg from '^Inserted (\d+)')::integer; | |
210 value := substring( | |
211 line.msg from '^Inserted \d+ predictions for (....................)$'); | |
212 | |
213 INSERT INTO agg_tracker (gid, predictions) VALUES (value, num) | |
214 ON CONFLICT (gid) | |
215 DO UPDATE SET predictions = EXCLUDED.predictions + num; | |
216 | |
217 ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN | |
218 | |
219 INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true) | |
220 ON CONFLICT (gid) | |
221 DO UPDATE SET assume_zpg = true; | |
222 | |
186 ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN | 223 ELSIF line.msg ~ '^Ignored \d+ measurements with value -99999$' THEN |
187 num := substring( | 224 num := substring( |
188 line.msg from '^Ignored (\d+)')::integer; | 225 line.msg from '^Ignored (\d+)')::integer; |
189 | 226 |
190 INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num) | 227 INSERT INTO agg_tracker (gid, bad_values) VALUES (curr_gid, num) |
191 ON CONFLICT (gid) | 228 ON CONFLICT (gid) |
192 DO UPDATE SET bad_values = EXCLUDED.bad_values + num; | 229 DO UPDATE SET bad_values = EXCLUDED.bad_values + num; |
193 | 230 |
194 ELSIF line.msg ~ '^Inserted \d+ predictions for ....................$' THEN | |
195 num := substring( | |
196 line.msg from '^Inserted (\d+)')::integer; | |
197 value := substring( | |
198 line.msg from '^Inserted \d+ predictions for (....................)$'); | |
199 | |
200 INSERT INTO agg_tracker (gid, predictions) VALUES (value, num) | |
201 ON CONFLICT (gid) | |
202 DO UPDATE SET predictions = EXCLUDED.predictions + num; | |
203 | |
204 ELSIF line.msg ~ '^''Reference_code'' not specified. Assuming ''ZPG''$' THEN | |
205 | |
206 INSERT INTO agg_tracker (gid, assume_zpg) VALUES (curr_gid, true) | |
207 ON CONFLICT (gid) | |
208 DO UPDATE SET assume_zpg = true; | |
209 | |
210 ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN | 231 ELSIF line.msg ~ '^Cannot find gauge "...................." for import$' THEN |
211 value := substring( | 232 value := substring( |
212 line.msg from '^Cannot find gauge "(....................)" for import$'); | 233 line.msg from '^Cannot find gauge "(....................)" for import$'); |
213 | 234 |
214 INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) | 235 INSERT INTO agg_tracker (gid, unknown) VALUES (value, true) |
248 ON CONFLICT (gid) | 269 ON CONFLICT (gid) |
249 DO UPDATE SET rescale_errors = | 270 DO UPDATE SET rescale_errors = |
250 array_cat(agg_tracker.rescale_errors, EXCLUDED.rescale_errors); | 271 array_cat(agg_tracker.rescale_errors, EXCLUDED.rescale_errors); |
251 | 272 |
252 ELSIF line.msg ~ '^Importing gauge measurements took ' THEN | 273 ELSIF line.msg ~ '^Importing gauge measurements took ' THEN |
253 | 274 -- Likely the last entry of this import. |
254 -- TODO: Flush aggregation because its likely the last entry. | 275 hold := line; |
255 ELSE | 276 ELSE |
256 -- Not handled, copy through .. | 277 -- Not handled, copy through .. |
257 RAISE NOTICE '%', line.msg; | 278 -- RAISE NOTICE '%', line.msg; |
258 INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg); | 279 INSERT INTO filtered_logs VALUES (line.import_id, line.time, line.kind, line.msg); |
259 | 280 |
260 END IF; | 281 END IF; |
261 | 282 |
262 last_time := line.time; | 283 last_time := line.time; |
263 END LOOP; | 284 END LOOP; |
264 | |
265 -- TODO: Handle remains from agg_tracker | |
266 | 285 |
267 -- DELETE FROM import.import_logs WHERE import_id IN ( | 286 -- DELETE FROM import.import_logs WHERE import_id IN ( |
268 -- SELECT id FROM import.imports WHERE kind = 'gm') | 287 -- SELECT id FROM import.imports WHERE kind = 'gm') |
269 | 288 |
270 -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs; | 289 -- INSERT INTO import.import_logs SELECT * FROM import.filtered_logs; |