Mercurial > gemma
comparison pkg/imports/queue.go @ 1392:0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 28 Nov 2018 09:52:34 +0100 |
parents | d753ce6cf588 |
children | 74b0df86b6e6 |
comparison
equal
deleted
inserted
replaced
1391:801ae5f4bc5b | 1392:0e1d89241cda |
---|---|
14 package imports | 14 package imports |
15 | 15 |
16 import ( | 16 import ( |
17 "context" | 17 "context" |
18 "database/sql" | 18 "database/sql" |
19 "encoding/json" | |
19 "fmt" | 20 "fmt" |
20 "log" | 21 "log" |
21 "runtime/debug" | 22 "runtime/debug" |
23 "strings" | |
22 "sync" | 24 "sync" |
23 "time" | 25 "time" |
24 | 26 |
25 "github.com/jackc/pgx/pgtype" | 27 "github.com/jackc/pgx/pgtype" |
26 | 28 |
34 Warn(fmt string, args ...interface{}) | 36 Warn(fmt string, args ...interface{}) |
35 Error(fmt string, args ...interface{}) | 37 Error(fmt string, args ...interface{}) |
36 } | 38 } |
37 | 39 |
38 Job interface { | 40 Job interface { |
39 Do(context.Context, int64, *sql.Conn, Feedback) error | 41 Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error) |
40 CleanUp() error | 42 CleanUp() error |
41 } | 43 } |
42 | 44 |
43 JobKind string | 45 JobKind string |
44 | 46 |
112 SELECT min(enqueued) | 114 SELECT min(enqueued) |
113 FROM waterway.imports | 115 FROM waterway.imports |
114 WHERE state = 'queued'::waterway.import_state AND | 116 WHERE state = 'queued'::waterway.import_state AND |
115 kind = ANY($1) | 117 kind = ANY($1) |
116 ) | 118 ) |
117 LIMIT 1 | 119 LIMIT 1` |
118 ` | 120 |
119 updateStateSQL = ` | 121 updateStateSQL = ` |
120 UPDATE waterway.imports SET state = $1::waterway.import_state | 122 UPDATE waterway.imports SET state = $1::waterway.import_state |
121 WHERE id = $2 | 123 WHERE id = $2` |
122 ` | 124 |
125 updateStateSummarySQL = ` | |
126 UPDATE waterway.imports SET | |
127 state = $1::waterway.import_state, | |
128 summary = $2 | |
129 WHERE id = $3` | |
130 | |
123 logMessageSQL = ` | 131 logMessageSQL = ` |
124 INSERT INTO waterway.import_logs ( | 132 INSERT INTO waterway.import_logs ( |
125 import_id, | 133 import_id, |
126 kind, | 134 kind, |
127 msg | 135 msg |
271 return nil, err | 279 return nil, err |
272 } | 280 } |
273 return &ji, nil | 281 return &ji, nil |
274 } | 282 } |
275 | 283 |
276 func updateState(id int64, state string) error { | 284 func updateStateSummary( |
277 ctx := context.Background() | 285 ctx context.Context, |
286 id int64, | |
287 state string, | |
288 summary interface{}, | |
289 ) error { | |
290 var s sql.NullString | |
291 if summary != nil { | |
292 var b strings.Builder | |
293 if err := json.NewEncoder(&b).Encode(summary); err != nil { | |
294 return err | |
295 } | |
296 s = sql.NullString{String: b.String(), Valid: true} | |
297 } | |
278 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { | 298 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
279 _, err := conn.ExecContext(ctx, updateStateSQL, state, id) | 299 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) |
280 return err | 300 return err |
281 }) | 301 }) |
282 } | 302 } |
283 | 303 |
284 func errorAndFail(id int64, format string, args ...interface{}) error { | 304 func errorAndFail(id int64, format string, args ...interface{}) error { |
368 | 388 |
369 feedback := logFeedback(idj.id) | 389 feedback := logFeedback(idj.id) |
370 | 390 |
371 feedback.Info("import #%d started", idj.id) | 391 feedback.Info("import #%d started", idj.id) |
372 | 392 |
393 ctx := context.Background() | |
394 var summary interface{} | |
395 | |
373 errDo := survive(func() error { | 396 errDo := survive(func() error { |
374 ctx := context.Background() | |
375 return auth.RunAs(ctx, idj.user, | 397 return auth.RunAs(ctx, idj.user, |
376 func(conn *sql.Conn) error { | 398 func(conn *sql.Conn) error { |
377 return job.Do(ctx, idj.id, conn, feedback) | 399 var err error |
400 summary, err = job.Do(ctx, idj.id, conn, feedback) | |
401 return err | |
378 }) | 402 }) |
379 })() | 403 })() |
380 if errDo != nil { | 404 if errDo != nil { |
381 feedback.Error("error do: %v", errDo) | 405 feedback.Error("error do: %v", errDo) |
382 } | 406 } |
389 if errDo != nil || errCleanup != nil { | 413 if errDo != nil || errCleanup != nil { |
390 state = "failed" | 414 state = "failed" |
391 } else { | 415 } else { |
392 state = "pending" | 416 state = "pending" |
393 } | 417 } |
394 if err := updateState(idj.id, state); err != nil { | 418 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
395 log.Printf("setting state of job %d failed: %v\n", idj.id, err) | 419 log.Printf("setting state of job %d failed: %v\n", idj.id, err) |
396 } | 420 } |
397 log.Printf("import #%d finished: %s\n", idj.id, state) | 421 log.Printf("import #%d finished: %s\n", idj.id, state) |
398 }(jc, idj) | 422 }(jc, idj) |
399 } | 423 } |