Mercurial > gemma
comparison pkg/controllers/importqueue.go @ 5560:f2204f91d286
Join the log lines of imports to the log exports to recover data from them.
Used in SR export to extract information that where in the meta json
but now are only found in the log.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Feb 2022 18:34:40 +0100 |
parents | 5f47eeea988d |
children | b91716d2acc6 |
comparison
equal
deleted
inserted
replaced
5559:ce9a9a1bf92f | 5560:f2204f91d286 |
---|---|
63 SELECT | 63 SELECT |
64 imports.id AS id, | 64 imports.id AS id, |
65 state::varchar, | 65 state::varchar, |
66 enqueued, | 66 enqueued, |
67 changed, | 67 changed, |
68 kind, | 68 imports.kind, |
69 username, | 69 username, |
70 (SELECT country FROM users.list_users lu | 70 (SELECT country FROM users.list_users lu |
71 WHERE lu.username = import.imports.username) AS country, | 71 WHERE lu.username = import.imports.username) AS country, |
72 signer, | 72 signer, |
73 EXISTS(SELECT 1 FROM import.import_logs | 73 EXISTS(SELECT 1 FROM import.import_logs |
74 WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings, | 74 WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings, |
75 data | 75 data, |
76 FROM import.imports | 76 il.msg |
77 FROM import.imports RIGHT JOIN import.import_logs il | |
78 ON import.imports.id = il.import_id | |
77 WHERE | 79 WHERE |
78 ` | 80 ` |
79 selectEnqueuedSQL = ` | 81 selectEnqueuedSQL = ` |
80 SELECT enqueued FROM import.imports | 82 SELECT enqueued FROM import.imports |
81 WHERE | 83 WHERE |
120 a = append(a, term) | 122 a = append(a, term) |
121 b = append(b, term) | 123 b = append(b, term) |
122 } | 124 } |
123 | 125 |
124 // Always filter review jobs. They are only for internal use. | 126 // Always filter review jobs. They are only for internal use. |
125 cond(` NOT kind LIKE '%%` + imports.ReviewJobSuffix + `'`) | 127 cond(` NOT imports.kind LIKE '%%` + imports.ReviewJobSuffix + `'`) |
126 | 128 |
127 if query := req.FormValue("query"); query != "" { | 129 if query := req.FormValue("query"); query != "" { |
128 query = "%" + query + "%" | 130 query = "%" + query + "%" |
129 cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ | 131 cond(` (imports.kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ |
130 `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `, | 132 `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `, |
131 query, query, query, query) | 133 query, query, query, query) |
132 } | 134 } |
133 | 135 |
134 if cc := req.FormValue("cc"); cc != "" { | 136 if cc := req.FormValue("cc"); cc != "" { |
144 cond(" state = ANY($%d) ", states) | 146 cond(" state = ANY($%d) ", states) |
145 } | 147 } |
146 | 148 |
147 if ks := req.FormValue("kinds"); ks != "" { | 149 if ks := req.FormValue("kinds"); ks != "" { |
148 kinds := toTextArray(ks, imports.ImportKindNames()) | 150 kinds := toTextArray(ks, imports.ImportKindNames()) |
149 cond(" kind = ANY($%d) ", kinds) | 151 cond(" imports.kind = ANY($%d) ", kinds) |
150 } | 152 } |
151 | 153 |
152 if idss := req.FormValue("ids"); idss != "" { | 154 if idss := req.FormValue("ids"); idss != "" { |
153 ids := toInt8Array(idss) | 155 ids := toInt8Array(idss) |
154 cond(" id = ANY($%d) ", ids) | 156 cond(" id = ANY($%d) ", ids) |
303 return "" | 305 return "" |
304 } | 306 } |
305 | 307 |
306 // Extract some meta infos from the import. | 308 // Extract some meta infos from the import. |
307 type Description interface { | 309 type Description interface { |
308 Description() (string, error) | 310 Description([]string) (string, error) |
309 } | 311 } |
312 | |
313 type dataset struct { | |
314 id int64 | |
315 state string | |
316 enqueued time.Time | |
317 changed time.Time | |
318 kind string | |
319 user string | |
320 country string | |
321 signer sql.NullString | |
322 warnings bool | |
323 data string | |
324 msgs []string | |
325 } | |
326 | |
327 // Log unsupported description interfaces per kind only once. | |
328 unsupported := make(map[string]bool) | |
329 | |
330 store := func(ds *dataset) error { | |
331 if ds == nil { | |
332 return nil | |
333 } | |
334 | |
335 var description string | |
336 | |
337 // Do some introspection on the job to be more verbose. | |
338 if jc := imports.FindJobCreator(imports.JobKind(ds.kind)); jc != nil { | |
339 job := jc.Create() | |
340 if err := common.FromJSONString(ds.data, job); err != nil { | |
341 log.Errorf("%v\n", err) | |
342 } else if desc, ok := job.(Description); ok { | |
343 description, err = desc.Description(ds.msgs) | |
344 if err != nil { | |
345 log.Errorf("%v\n", err) | |
346 } | |
347 description = strings.Replace(description, ",", "|", -1) | |
348 } else { | |
349 if !unsupported[ds.kind] { | |
350 unsupported[ds.kind] = true | |
351 log.Debugf("%s: description not supported\n", ds.kind) | |
352 } | |
353 } | |
354 } | |
355 | |
356 record[0] = strconv.FormatInt(ds.id, 10) | |
357 record[1] = ds.kind | |
358 record[2] = ds.enqueued.UTC().Format(common.TimeFormat) | |
359 record[3] = ds.changed.UTC().Format(common.TimeFormat) | |
360 record[4] = ds.user | |
361 record[5] = ds.country | |
362 record[6] = stringString(ds.signer) | |
363 record[7] = ds.state | |
364 record[8] = strconv.FormatBool(ds.warnings) | |
365 record[9] = description | |
366 | |
367 return out.Write(record) | |
368 } | |
369 | |
370 var last *dataset | |
310 | 371 |
311 for rows.Next() { | 372 for rows.Next() { |
312 var ( | 373 var ( |
313 id int64 | 374 curr dataset |
314 state string | 375 msg sql.NullString |
315 enqueued time.Time | |
316 changed time.Time | |
317 kind string | |
318 user string | |
319 country string | |
320 signer sql.NullString | |
321 warnings bool | |
322 data string | |
323 description string | |
324 ) | 376 ) |
325 if err = rows.Scan( | 377 if err = rows.Scan( |
326 &id, | 378 &curr.id, |
327 &state, | 379 &curr.state, |
328 &enqueued, | 380 &curr.enqueued, |
329 &changed, | 381 &curr.changed, |
330 &kind, | 382 &curr.kind, |
331 &user, | 383 &curr.user, |
332 &country, | 384 &curr.country, |
333 &signer, | 385 &curr.signer, |
334 &warnings, | 386 &curr.warnings, |
335 &data, | 387 &curr.data, |
388 &msg, | |
336 ); err != nil { | 389 ); err != nil { |
337 return | 390 return |
338 } | 391 } |
339 | 392 |
340 // Do some introspection on the job to be more verbose. | 393 if last != nil && last.id == curr.id { |
341 if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { | 394 if msg.Valid { |
342 job := jc.Create() | 395 last.msgs = append(last.msgs, msg.String) |
343 if err := common.FromJSONString(data, job); err != nil { | |
344 log.Errorf("%v\n", err) | |
345 } else if desc, ok := job.(Description); ok { | |
346 if description, err = desc.Description(); err != nil { | |
347 log.Errorf("%v\n", err) | |
348 } | |
349 } | 396 } |
350 } | 397 continue |
351 | 398 } |
352 record[0] = strconv.FormatInt(id, 10) | 399 if msg.Valid { |
353 record[1] = kind | 400 curr.msgs = append(curr.msgs, msg.String) |
354 record[2] = enqueued.UTC().Format(common.TimeFormat) | 401 } |
355 record[3] = changed.UTC().Format(common.TimeFormat) | 402 |
356 record[4] = user | 403 if err := store(last); err != nil { |
357 record[5] = country | |
358 record[6] = stringString(signer) | |
359 record[7] = state | |
360 record[8] = strconv.FormatBool(warnings) | |
361 record[9] = strings.Replace(description, ",", "|", -1) | |
362 | |
363 if err := out.Write(record); err != nil { | |
364 log.Errorf("%v\n", err) | 404 log.Errorf("%v\n", err) |
365 return | 405 return |
366 } | 406 } |
407 last = &curr | |
408 } | |
409 | |
410 if err := store(last); err != nil { | |
411 log.Errorf("%v\n", err) | |
412 return | |
367 } | 413 } |
368 | 414 |
369 out.Flush() | 415 out.Flush() |
370 if err := out.Error(); err != nil { | 416 if err := out.Error(); err != nil { |
371 log.Errorf("%v\n", err) | 417 log.Errorf("%v\n", err) |