Mercurial > gemma
diff pkg/imports/queue.go @ 1392:0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 28 Nov 2018 09:52:34 +0100 |
parents | d753ce6cf588 |
children | 74b0df86b6e6 |
line wrap: on
line diff
--- a/pkg/imports/queue.go Wed Nov 28 08:11:23 2018 +0100 +++ b/pkg/imports/queue.go Wed Nov 28 09:52:34 2018 +0100 @@ -16,9 +16,11 @@ import ( "context" "database/sql" + "encoding/json" "fmt" "log" "runtime/debug" + "strings" "sync" "time" @@ -36,7 +38,7 @@ } Job interface { - Do(context.Context, int64, *sql.Conn, Feedback) error + Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error) CleanUp() error } @@ -114,12 +116,18 @@ WHERE state = 'queued'::waterway.import_state AND kind = ANY($1) ) -LIMIT 1 -` +LIMIT 1` + updateStateSQL = ` UPDATE waterway.imports SET state = $1::waterway.import_state -WHERE id = $2 -` +WHERE id = $2` + + updateStateSummarySQL = ` +UPDATE waterway.imports SET + state = $1::waterway.import_state, + summary = $2 +WHERE id = $3` + logMessageSQL = ` INSERT INTO waterway.import_logs ( import_id, @@ -273,10 +281,22 @@ return &ji, nil } -func updateState(id int64, state string) error { - ctx := context.Background() +func updateStateSummary( + ctx context.Context, + id int64, + state string, + summary interface{}, +) error { + var s sql.NullString + if summary != nil { + var b strings.Builder + if err := json.NewEncoder(&b).Encode(summary); err != nil { + return err + } + s = sql.NullString{String: b.String(), Valid: true} + } return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { - _, err := conn.ExecContext(ctx, updateStateSQL, state, id) + _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } @@ -370,11 +390,15 @@ feedback.Info("import #%d started", idj.id) + ctx := context.Background() + var summary interface{} + errDo := survive(func() error { - ctx := context.Background() return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { - return job.Do(ctx, idj.id, conn, feedback) + var err error + summary, err = job.Do(ctx, idj.id, conn, feedback) + return err }) })() if errDo != nil { @@ -391,7 +415,7 @@ } else { state = "pending" } - if err := updateState(idj.id, state); err != nil { + if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Printf("setting state of job %d failed: %v\n", idj.id, err) } log.Printf("import #%d finished: %s\n", idj.id, state)