comparison pkg/imports/queue.go @ 1392:0e1d89241cda

Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 28 Nov 2018 09:52:34 +0100
parents d753ce6cf588
children 74b0df86b6e6
comparison
equal deleted inserted replaced
1391:801ae5f4bc5b 1392:0e1d89241cda
14 package imports 14 package imports
15 15
16 import ( 16 import (
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "encoding/json"
19 "fmt" 20 "fmt"
20 "log" 21 "log"
21 "runtime/debug" 22 "runtime/debug"
23 "strings"
22 "sync" 24 "sync"
23 "time" 25 "time"
24 26
25 "github.com/jackc/pgx/pgtype" 27 "github.com/jackc/pgx/pgtype"
26 28
34 Warn(fmt string, args ...interface{}) 36 Warn(fmt string, args ...interface{})
35 Error(fmt string, args ...interface{}) 37 Error(fmt string, args ...interface{})
36 } 38 }
37 39
38 Job interface { 40 Job interface {
39 Do(context.Context, int64, *sql.Conn, Feedback) error 41 Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error)
40 CleanUp() error 42 CleanUp() error
41 } 43 }
42 44
43 JobKind string 45 JobKind string
44 46
112 SELECT min(enqueued) 114 SELECT min(enqueued)
113 FROM waterway.imports 115 FROM waterway.imports
114 WHERE state = 'queued'::waterway.import_state AND 116 WHERE state = 'queued'::waterway.import_state AND
115 kind = ANY($1) 117 kind = ANY($1)
116 ) 118 )
117 LIMIT 1 119 LIMIT 1`
118 ` 120
119 updateStateSQL = ` 121 updateStateSQL = `
120 UPDATE waterway.imports SET state = $1::waterway.import_state 122 UPDATE waterway.imports SET state = $1::waterway.import_state
121 WHERE id = $2 123 WHERE id = $2`
122 ` 124
125 updateStateSummarySQL = `
126 UPDATE waterway.imports SET
127 state = $1::waterway.import_state,
128 summary = $2
129 WHERE id = $3`
130
123 logMessageSQL = ` 131 logMessageSQL = `
124 INSERT INTO waterway.import_logs ( 132 INSERT INTO waterway.import_logs (
125 import_id, 133 import_id,
126 kind, 134 kind,
127 msg 135 msg
271 return nil, err 279 return nil, err
272 } 280 }
273 return &ji, nil 281 return &ji, nil
274 } 282 }
275 283
276 func updateState(id int64, state string) error { 284 func updateStateSummary(
277 ctx := context.Background() 285 ctx context.Context,
286 id int64,
287 state string,
288 summary interface{},
289 ) error {
290 var s sql.NullString
291 if summary != nil {
292 var b strings.Builder
293 if err := json.NewEncoder(&b).Encode(summary); err != nil {
294 return err
295 }
296 s = sql.NullString{String: b.String(), Valid: true}
297 }
278 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { 298 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
279 _, err := conn.ExecContext(ctx, updateStateSQL, state, id) 299 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
280 return err 300 return err
281 }) 301 })
282 } 302 }
283 303
284 func errorAndFail(id int64, format string, args ...interface{}) error { 304 func errorAndFail(id int64, format string, args ...interface{}) error {
368 388
369 feedback := logFeedback(idj.id) 389 feedback := logFeedback(idj.id)
370 390
371 feedback.Info("import #%d started", idj.id) 391 feedback.Info("import #%d started", idj.id)
372 392
393 ctx := context.Background()
394 var summary interface{}
395
373 errDo := survive(func() error { 396 errDo := survive(func() error {
374 ctx := context.Background()
375 return auth.RunAs(ctx, idj.user, 397 return auth.RunAs(ctx, idj.user,
376 func(conn *sql.Conn) error { 398 func(conn *sql.Conn) error {
377 return job.Do(ctx, idj.id, conn, feedback) 399 var err error
400 summary, err = job.Do(ctx, idj.id, conn, feedback)
401 return err
378 }) 402 })
379 })() 403 })()
380 if errDo != nil { 404 if errDo != nil {
381 feedback.Error("error do: %v", errDo) 405 feedback.Error("error do: %v", errDo)
382 } 406 }
389 if errDo != nil || errCleanup != nil { 413 if errDo != nil || errCleanup != nil {
390 state = "failed" 414 state = "failed"
391 } else { 415 } else {
392 state = "pending" 416 state = "pending"
393 } 417 }
394 if err := updateState(idj.id, state); err != nil { 418 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
395 log.Printf("setting state of job %d failed: %v\n", idj.id, err) 419 log.Printf("setting state of job %d failed: %v\n", idj.id, err)
396 } 420 }
397 log.Printf("import #%d finished: %s\n", idj.id, state) 421 log.Printf("import #%d finished: %s\n", idj.id, state)
398 }(jc, idj) 422 }(jc, idj)
399 } 423 }