Mercurial > gemma
annotate pkg/imports/queue.go @ 1000:14425e35e3c2 persistent-import-queue
Wait with start of import queue until configuration is fully loaded.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 22 Oct 2018 17:16:02 +0200 |
parents | 75e65599ea52 |
children | d789f19877f4 |
rev | line source |
---|---|
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1 package imports |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
2 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
3 import ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
4 "context" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
5 "database/sql" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
6 "fmt" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
7 "log" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
8 "runtime/debug" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
9 "sync" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
10 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
11 "gemma.intevation.de/gemma/pkg/auth" |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
12 "gemma.intevation.de/gemma/pkg/config" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
13 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
14 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
15 type ( |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
16 Feedback interface { |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
17 Info(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
18 Warn(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
19 Error(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
20 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
21 |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
22 Job interface { |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
23 Do(*sql.Conn, Feedback) error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
24 CleanUp() error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
25 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
26 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
27 JobKind string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
28 |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
29 JobCreator func(kind JobKind, data string) (Job, error) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
30 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
31 idJob struct { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
32 id int64 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
33 kind JobKind |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
34 user string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
35 data string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
36 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
37 ) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
38 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
39 var ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
40 queueCond = sync.NewCond(new(sync.Mutex)) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
41 |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
42 creatorsMu sync.Mutex |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
43 creators = map[JobKind]JobCreator{} |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
44 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
45 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
46 const ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
47 queueUser = "sys_admin" |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
48 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
49 insertJobSQL = ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
50 INSERT INTO waterway.imports ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
51 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
52 username, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
53 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
54 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
55 $1, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
56 $2, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
57 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
58 ) RETURNING id` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
59 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
60 selectJobSQL = ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
61 SELECT |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
62 id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
63 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
64 username, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
65 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
66 FROM waterway.imports |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
67 WHERE state = 'queued'::waterway.import_state AND enqueued IN ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
68 SELECT min(enqueued) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
69 FROM waterway.imports |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
70 WHERE state = 'queued'::waterway.import_state |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
71 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
72 LIMIT 1 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
73 ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
74 updateStateSQL = ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
75 UPDATE waterway.imports SET state = $1::waterway.import_state |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
76 WHERE id = $2 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
77 ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
78 logMessageSQL = ` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
79 INSERT INTO waterway.import_logs ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
80 import_id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
81 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
82 msg |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
83 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
84 $1, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
85 $2::waterway.log_type, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
86 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
87 )` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
88 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
89 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
90 func init() { |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
91 go importLoop() |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
92 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
93 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
94 func RegisterJobCreator(kind JobKind, jc JobCreator) { |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
95 log.Printf("info: register import job creator for kind '%s'\n", kind) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
96 creatorsMu.Lock() |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
97 defer creatorsMu.Unlock() |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
98 creators[kind] = jc |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
99 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
100 |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
101 func jobCreator(kind JobKind) JobCreator { |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
102 creatorsMu.Lock() |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
103 defer creatorsMu.Unlock() |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
104 return creators[kind] |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
105 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
106 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
107 func AddJob(kind JobKind, user, data string) (int64, error) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
108 ctx := context.Background() |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
109 queueCond.L.Lock() |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
110 defer queueCond.L.Unlock() |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
111 var id int64 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
112 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
113 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
114 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
115 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
116 queueCond.Signal() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
117 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
118 return id, err |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
119 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
120 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
121 type logFeedback int64 |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
122 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
123 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
124 ctx := context.Background() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
125 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
126 _, err := conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
127 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
128 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
129 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
130 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
131 log.Printf("logging failed: %v\n", err) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
132 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
133 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
134 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
135 func (lf logFeedback) Info(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
136 lf.log("info", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
137 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
138 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
139 func (lf logFeedback) Warn(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
140 lf.log("warn", format, args...) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
141 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
142 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
143 func (lf logFeedback) Error(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
144 lf.log("error", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
145 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
146 |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
147 func survive(fn func() error) func() error { |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
148 return func() error { |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
149 errCh := make(chan error) |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
150 go func() { |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
151 defer func() { |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
152 if err := recover(); err != nil { |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
153 errCh <- fmt.Errorf("%v: %s", |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
154 err, string(debug.Stack())) |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
155 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
156 }() |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
157 errCh <- fn() |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
158 }() |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
159 return <-errCh |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
160 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
161 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
162 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
163 func fetchJob() (*idJob, error) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
164 var ji idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
165 ctx := context.Background() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
166 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
167 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
168 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
169 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
170 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
171 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
172 if err = tx.QueryRowContext(ctx, selectJobSQL).Scan( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
173 &ji.id, &ji.kind, &ji.user, &ji.data); err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
174 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
175 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
176 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
177 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
178 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
179 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
180 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
181 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
182 switch { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
183 case err == sql.ErrNoRows: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
184 return nil, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
185 case err != nil: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
186 return nil, err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
187 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
188 return &ji, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
189 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
190 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
191 func updateState(id int64, state string) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
192 ctx := context.Background() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
193 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
194 _, err := conn.ExecContext(ctx, updateStateSQL, state, id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
195 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
196 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
197 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
198 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
199 func errorAndFail(id int64, format string, args ...interface{}) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
200 ctx := context.Background() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
201 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
202 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
203 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
204 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
205 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
206 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
207 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
208 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
209 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
210 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
211 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
212 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
213 ctx, updateStateSQL, "failed", id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
214 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
215 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
216 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
217 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
218 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
219 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
220 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
221 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
222 func importLoop() { |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
223 config.WaitReady() |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
224 for { |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
225 queueCond.L.Lock() |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
226 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
227 var idj *idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
228 var err error |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
229 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
230 for idj == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
231 if idj, err = fetchJob(); err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
232 log.Printf("db error: %v\n", err) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
233 queueCond.Wait() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
234 } else if idj == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
235 queueCond.Wait() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
236 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
237 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
238 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
239 queueCond.L.Unlock() |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
240 |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
241 log.Printf("starting import job %d\n", idj.id) |
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
242 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
243 jc := jobCreator(idj.kind) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
244 if jc == nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
245 errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
246 continue |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
247 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
248 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
249 job, err := jc(idj.kind, idj.data) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
250 if err != nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
251 errorAndFail(idj.id, "Faild to create job: %v", err) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
252 continue |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
253 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
254 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
255 feedback := logFeedback(idj.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
256 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
257 errDo := survive(func() error { |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
258 return auth.RunAs(idj.user, context.Background(), |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
259 func(conn *sql.Conn) error { return job.Do(conn, feedback) }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
260 })() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
261 if errDo != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
262 feedback.Error("error do: %v\n", errDo) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
263 } |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
264 errCleanup := survive(job.CleanUp)() |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
265 if errCleanup != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
266 feedback.Error("error cleanup: %v\n", errCleanup) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
267 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
268 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
269 if errDo != nil || errCleanup != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
270 err = updateState(idj.id, "failed") |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
271 } else { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
272 err = updateState(idj.id, "successful") |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
273 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
274 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
275 log.Printf("setting state of job %d failed: %v\n", idj.id, err) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
276 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
277 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
278 } |