annotate pkg/imports/queue.go @ 1005:fcf016ebdef4 persistent-import-queue

Re-enqueue import jobs in state running if the the gemma server starts. These are jobs that where started but did not finish before the server died before.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 22 Oct 2018 18:14:16 +0200
parents d789f19877f4
children 8f23ec811afb
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1 package imports
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
2
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
3 import (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
4 "context"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
5 "database/sql"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
6 "fmt"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
7 "log"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
8 "runtime/debug"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
9 "sync"
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
10 "time"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
11
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
12 "gemma.intevation.de/gemma/pkg/auth"
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
13 "gemma.intevation.de/gemma/pkg/config"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
16 type (
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
17 Feedback interface {
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
18 Info(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
19 Warn(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
20 Error(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
21 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
22
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
23 Job interface {
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
24 Do(*sql.Conn, Feedback) error
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
25 CleanUp() error
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
26 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
27
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
28 JobKind string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
29
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
30 JobCreator func(kind JobKind, data string) (Job, error)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
31
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
32 idJob struct {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
33 id int64
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
34 kind JobKind
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
35 user string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
36 data string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
37 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
38 )
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
39
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
40 const pollDuration = time.Second * 10
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
41
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
42 var (
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
43 signalChan = make(chan struct{})
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
44
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
45 creatorsMu sync.Mutex
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
46 creators = map[JobKind]JobCreator{}
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
47 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
48
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
49 const (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
50 queueUser = "sys_admin"
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
51
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
52 reEnqueueRunningSQL = `
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
53 UPDATE waterway.imports SET state = 'queued'::waterway.import_state
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
54 WHERE state = 'running'::waterway.import_state
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
55 `
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
56
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
57 insertJobSQL = `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
58 INSERT INTO waterway.imports (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
59 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
60 username,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
61 data
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
62 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
63 $1,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
64 $2,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
65 $3
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
66 ) RETURNING id`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
67
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
68 selectJobSQL = `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
69 SELECT
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
70 id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
71 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
72 username,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
73 data
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
74 FROM waterway.imports
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
75 WHERE state = 'queued'::waterway.import_state AND enqueued IN (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
76 SELECT min(enqueued)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
77 FROM waterway.imports
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
78 WHERE state = 'queued'::waterway.import_state
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
79 )
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
80 LIMIT 1
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
81 `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
82 updateStateSQL = `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
83 UPDATE waterway.imports SET state = $1::waterway.import_state
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
84 WHERE id = $2
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
85 `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
86 logMessageSQL = `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
87 INSERT INTO waterway.import_logs (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
88 import_id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
89 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
90 msg
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
91 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
92 $1,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
93 $2::waterway.log_type,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
94 $3
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
95 )`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
96 )
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
97
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
98 func init() {
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
99 go importLoop()
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
100 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
101
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
102 func RegisterJobCreator(kind JobKind, jc JobCreator) {
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
103 log.Printf("info: register import job creator for kind '%s'\n", kind)
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
104 creatorsMu.Lock()
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
105 defer creatorsMu.Unlock()
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
106 creators[kind] = jc
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
107 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
108
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
109 func jobCreator(kind JobKind) JobCreator {
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
110 creatorsMu.Lock()
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
111 defer creatorsMu.Unlock()
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
112 return creators[kind]
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
113 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
114
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
115 func AddJob(kind JobKind, user, data string) (int64, error) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
116 ctx := context.Background()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
117 var id int64
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
118 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
119 return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
120 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
121 if err == nil {
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
122 select {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
123 case signalChan <- struct{}{}:
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
124 default:
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
125 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
126 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
127 return id, err
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
128 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
129
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
130 type logFeedback int64
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
131
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
132 func (lf logFeedback) log(kind, format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
133 ctx := context.Background()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
134 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
135 _, err := conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
136 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
137 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
138 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
139 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
140 log.Printf("logging failed: %v\n", err)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
141 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
142 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
143
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
144 func (lf logFeedback) Info(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
145 lf.log("info", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
146 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
147
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
148 func (lf logFeedback) Warn(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
149 lf.log("warn", format, args...)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
150 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
151
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
152 func (lf logFeedback) Error(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
153 lf.log("error", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
154 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
155
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
156 func survive(fn func() error) func() error {
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
157 return func() error {
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
158 errCh := make(chan error)
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
159 go func() {
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
160 defer func() {
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
161 if err := recover(); err != nil {
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
162 errCh <- fmt.Errorf("%v: %s",
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
163 err, string(debug.Stack()))
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
164 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
165 }()
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
166 errCh <- fn()
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
167 }()
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
168 return <-errCh
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
169 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
170 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
171
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
172 func reEnqueueRunning() error {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
173 ctx := context.Background()
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
174 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
175 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
176 return err
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
177 })
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
178 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
179
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
180 func fetchJob() (*idJob, error) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
181 var ji idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
182 ctx := context.Background()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
183 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
184 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
185 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
186 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
187 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
188 defer tx.Rollback()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
189 if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
190 &ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
191 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
192 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
193 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
194 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
195 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
196 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
197 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
198 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
199 switch {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
200 case err == sql.ErrNoRows:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
201 return nil, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
202 case err != nil:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
203 return nil, err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
204 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
205 return &ji, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
206 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
207
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
208 func updateState(id int64, state string) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
209 ctx := context.Background()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
210 return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
211 _, err := conn.ExecContext(ctx, updateStateSQL, state, id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
212 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
213 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
214 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
215
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
216 func errorAndFail(id int64, format string, args ...interface{}) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
217 ctx := context.Background()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
218 err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
219 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
220 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
221 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
222 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
223 defer tx.Rollback()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
224 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
225 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
226 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
227 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
228 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
229 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
230 ctx, updateStateSQL, "failed", id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
231 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
232 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
233 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
234 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
235 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
236 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
237 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
238
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
239 func importLoop() {
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
240 config.WaitReady()
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
241 // re-enqueue the jobs that are in state running.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
242 // They where in progess when the server went down.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
243 if err := reEnqueueRunning(); err != nil {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
244 log.Printf("re-enquing failed: %v", err)
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
245 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
246
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
247 for {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
248 var idj *idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
249 var err error
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
250
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
251 for {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
252 if idj, err = fetchJob(); err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
253 log.Printf("db error: %v\n", err)
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
254 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
255 if idj != nil {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
256 break
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
257 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
258 select {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
259 case <-signalChan:
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
260 case <-time.After(pollDuration):
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
261 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
262 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
263
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
264 log.Printf("starting import job %d\n", idj.id)
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
265
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
266 jc := jobCreator(idj.kind)
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
267 if jc == nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
268 errorAndFail(idj.id, "No creator for kind '%s' found", idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
269 continue
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
270 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
271
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
272 job, err := jc(idj.kind, idj.data)
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
273 if err != nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
274 errorAndFail(idj.id, "Faild to create job: %v", err)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
275 continue
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
276 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
277
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
278 feedback := logFeedback(idj.id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
279
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
280 errDo := survive(func() error {
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
281 return auth.RunAs(idj.user, context.Background(),
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
282 func(conn *sql.Conn) error { return job.Do(conn, feedback) })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
283 })()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
284 if errDo != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
285 feedback.Error("error do: %v\n", errDo)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
286 }
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
287 errCleanup := survive(job.CleanUp)()
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
288 if errCleanup != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
289 feedback.Error("error cleanup: %v\n", errCleanup)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
290 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
291
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
292 var state string
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
293 if errDo != nil || errCleanup != nil {
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
294 state = "failed"
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
295 } else {
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
296 state = "successful"
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
297 }
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
298 if err := updateState(idj.id, state); err != nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
299 log.Printf("setting state of job %d failed: %v\n", idj.id, err)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
300 }
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
301 log.Printf("job %d finished: %s\n", idj.id, state)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
302 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
303 }