comparison pkg/controllers/importqueue.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 5f47eeea988d
children b91716d2acc6
comparison
equal deleted inserted replaced
5559:ce9a9a1bf92f 5560:f2204f91d286
63 SELECT 63 SELECT
64 imports.id AS id, 64 imports.id AS id,
65 state::varchar, 65 state::varchar,
66 enqueued, 66 enqueued,
67 changed, 67 changed,
68 kind, 68 imports.kind,
69 username, 69 username,
70 (SELECT country FROM users.list_users lu 70 (SELECT country FROM users.list_users lu
71 WHERE lu.username = import.imports.username) AS country, 71 WHERE lu.username = import.imports.username) AS country,
72 signer, 72 signer,
73 EXISTS(SELECT 1 FROM import.import_logs 73 EXISTS(SELECT 1 FROM import.import_logs
74 WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings, 74 WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings,
75 data 75 data,
76 FROM import.imports 76 il.msg
77 FROM import.imports RIGHT JOIN import.import_logs il
78 ON import.imports.id = il.import_id
77 WHERE 79 WHERE
78 ` 80 `
79 selectEnqueuedSQL = ` 81 selectEnqueuedSQL = `
80 SELECT enqueued FROM import.imports 82 SELECT enqueued FROM import.imports
81 WHERE 83 WHERE
120 a = append(a, term) 122 a = append(a, term)
121 b = append(b, term) 123 b = append(b, term)
122 } 124 }
123 125
124 // Always filter review jobs. They are only for internal use. 126 // Always filter review jobs. They are only for internal use.
125 cond(` NOT kind LIKE '%%` + imports.ReviewJobSuffix + `'`) 127 cond(` NOT imports.kind LIKE '%%` + imports.ReviewJobSuffix + `'`)
126 128
127 if query := req.FormValue("query"); query != "" { 129 if query := req.FormValue("query"); query != "" {
128 query = "%" + query + "%" 130 query = "%" + query + "%"
129 cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ 131 cond(` (imports.kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+
130 `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `, 132 `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `,
131 query, query, query, query) 133 query, query, query, query)
132 } 134 }
133 135
134 if cc := req.FormValue("cc"); cc != "" { 136 if cc := req.FormValue("cc"); cc != "" {
144 cond(" state = ANY($%d) ", states) 146 cond(" state = ANY($%d) ", states)
145 } 147 }
146 148
147 if ks := req.FormValue("kinds"); ks != "" { 149 if ks := req.FormValue("kinds"); ks != "" {
148 kinds := toTextArray(ks, imports.ImportKindNames()) 150 kinds := toTextArray(ks, imports.ImportKindNames())
149 cond(" kind = ANY($%d) ", kinds) 151 cond(" imports.kind = ANY($%d) ", kinds)
150 } 152 }
151 153
152 if idss := req.FormValue("ids"); idss != "" { 154 if idss := req.FormValue("ids"); idss != "" {
153 ids := toInt8Array(idss) 155 ids := toInt8Array(idss)
154 cond(" id = ANY($%d) ", ids) 156 cond(" id = ANY($%d) ", ids)
303 return "" 305 return ""
304 } 306 }
305 307
306 // Extract some meta infos from the import. 308 // Extract some meta infos from the import.
307 type Description interface { 309 type Description interface {
308 Description() (string, error) 310 Description([]string) (string, error)
309 } 311 }
312
313 type dataset struct {
314 id int64
315 state string
316 enqueued time.Time
317 changed time.Time
318 kind string
319 user string
320 country string
321 signer sql.NullString
322 warnings bool
323 data string
324 msgs []string
325 }
326
327 // Log unsupported description interfaces per kind only once.
328 unsupported := make(map[string]bool)
329
330 store := func(ds *dataset) error {
331 if ds == nil {
332 return nil
333 }
334
335 var description string
336
337 // Do some introspection on the job to be more verbose.
338 if jc := imports.FindJobCreator(imports.JobKind(ds.kind)); jc != nil {
339 job := jc.Create()
340 if err := common.FromJSONString(ds.data, job); err != nil {
341 log.Errorf("%v\n", err)
342 } else if desc, ok := job.(Description); ok {
343 description, err = desc.Description(ds.msgs)
344 if err != nil {
345 log.Errorf("%v\n", err)
346 }
347 description = strings.Replace(description, ",", "|", -1)
348 } else {
349 if !unsupported[ds.kind] {
350 unsupported[ds.kind] = true
351 log.Debugf("%s: description not supported\n", ds.kind)
352 }
353 }
354 }
355
356 record[0] = strconv.FormatInt(ds.id, 10)
357 record[1] = ds.kind
358 record[2] = ds.enqueued.UTC().Format(common.TimeFormat)
359 record[3] = ds.changed.UTC().Format(common.TimeFormat)
360 record[4] = ds.user
361 record[5] = ds.country
362 record[6] = stringString(ds.signer)
363 record[7] = ds.state
364 record[8] = strconv.FormatBool(ds.warnings)
365 record[9] = description
366
367 return out.Write(record)
368 }
369
370 var last *dataset
310 371
311 for rows.Next() { 372 for rows.Next() {
312 var ( 373 var (
313 id int64 374 curr dataset
314 state string 375 msg sql.NullString
315 enqueued time.Time
316 changed time.Time
317 kind string
318 user string
319 country string
320 signer sql.NullString
321 warnings bool
322 data string
323 description string
324 ) 376 )
325 if err = rows.Scan( 377 if err = rows.Scan(
326 &id, 378 &curr.id,
327 &state, 379 &curr.state,
328 &enqueued, 380 &curr.enqueued,
329 &changed, 381 &curr.changed,
330 &kind, 382 &curr.kind,
331 &user, 383 &curr.user,
332 &country, 384 &curr.country,
333 &signer, 385 &curr.signer,
334 &warnings, 386 &curr.warnings,
335 &data, 387 &curr.data,
388 &msg,
336 ); err != nil { 389 ); err != nil {
337 return 390 return
338 } 391 }
339 392
340 // Do some introspection on the job to be more verbose. 393 if last != nil && last.id == curr.id {
341 if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { 394 if msg.Valid {
342 job := jc.Create() 395 last.msgs = append(last.msgs, msg.String)
343 if err := common.FromJSONString(data, job); err != nil {
344 log.Errorf("%v\n", err)
345 } else if desc, ok := job.(Description); ok {
346 if description, err = desc.Description(); err != nil {
347 log.Errorf("%v\n", err)
348 }
349 } 396 }
350 } 397 continue
351 398 }
352 record[0] = strconv.FormatInt(id, 10) 399 if msg.Valid {
353 record[1] = kind 400 curr.msgs = append(curr.msgs, msg.String)
354 record[2] = enqueued.UTC().Format(common.TimeFormat) 401 }
355 record[3] = changed.UTC().Format(common.TimeFormat) 402
356 record[4] = user 403 if err := store(last); err != nil {
357 record[5] = country
358 record[6] = stringString(signer)
359 record[7] = state
360 record[8] = strconv.FormatBool(warnings)
361 record[9] = strings.Replace(description, ",", "|", -1)
362
363 if err := out.Write(record); err != nil {
364 log.Errorf("%v\n", err) 404 log.Errorf("%v\n", err)
365 return 405 return
366 } 406 }
407 last = &curr
408 }
409
410 if err := store(last); err != nil {
411 log.Errorf("%v\n", err)
412 return
367 } 413 }
368 414
369 out.Flush() 415 out.Flush()
370 if err := out.Error(); err != nil { 416 if err := out.Error(); err != nil {
371 log.Errorf("%v\n", err) 417 log.Errorf("%v\n", err)