Mercurial > gemma
comparison pkg/scheduler/scheduler.go @ 1654:85386ad17d34
Scheduled imports: Don't track the user in the running scheduler.
The user is stored in the database already.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 20 Dec 2018 18:39:19 +0100 |
parents | f39957ea08aa |
children | 6caf5cd6249e |
comparison
equal
deleted
inserted
replaced
1653:06f08458d666 | 1654:85386ad17d34 |
---|---|
22 ) | 22 ) |
23 | 23 |
24 // ErrNoSuchAction if no fitting action was found. | 24 // ErrNoSuchAction if no fitting action was found. |
25 var ErrNoSuchAction = errors.New("No such action") | 25 var ErrNoSuchAction = errors.New("No such action") |
26 | 26 |
27 // Action is called with a user and a configuration id. | 27 // Action is called with a configuration id. |
28 type Action func(user string, cfgID int64) | 28 type Action func(cfgID int64) |
29 | 29 |
30 type userAction struct { | 30 type userAction struct { |
31 scheduler *scheduler | 31 scheduler *scheduler |
32 user string | |
33 name string | 32 name string |
34 cfgID int64 | 33 cfgID int64 |
35 } | 34 } |
36 | 35 |
37 type scheduler struct { | 36 type scheduler struct { |
41 } | 40 } |
42 | 41 |
43 // Run implements cron.Job. | 42 // Run implements cron.Job. |
44 func (ua *userAction) Run() { | 43 func (ua *userAction) Run() { |
45 if a := ua.scheduler.action(ua.name); a != nil { | 44 if a := ua.scheduler.action(ua.name); a != nil { |
46 a(ua.user, ua.cfgID) | 45 a(ua.cfgID) |
47 } else { | 46 } else { |
48 log.Printf("warn: scheduled action '%s' not found.", ua.name) | 47 log.Printf("warn: scheduled action '%s' not found.", ua.name) |
49 } | 48 } |
50 } | 49 } |
51 | 50 |
63 func UnregisterAction(name string) { | 62 func UnregisterAction(name string) { |
64 global.unregisterAction(name) | 63 global.unregisterAction(name) |
65 } | 64 } |
66 | 65 |
67 // BoundAction is a complete set of infos for | 66 // BoundAction is a complete set of infos for |
68 // an action to be bound to a user, schedule and | 67 // an action to be bound to a schedule and |
69 // configuration id. | 68 // configuration id. |
70 type BoundAction struct { | 69 type BoundAction struct { |
71 Name string | 70 Name string |
72 Spec string | 71 Spec string |
73 User string | |
74 CfgID int64 | 72 CfgID int64 |
75 } | 73 } |
76 | 74 |
77 // BootActions setup the global scheduler with a set | 75 // BootActions setup the global scheduler with a set |
78 // of bound actions delivered by the next function. | 76 // of bound actions delivered by the next function. |
97 if err != nil { | 95 if err != nil { |
98 return err | 96 return err |
99 } | 97 } |
100 job := &userAction{ | 98 job := &userAction{ |
101 scheduler: s, | 99 scheduler: s, |
102 user: ba.User, | |
103 name: ba.Name, | |
104 cfgID: ba.CfgID, | 100 cfgID: ba.CfgID, |
105 } | 101 } |
106 cr.Schedule(schedule, job) | 102 cr.Schedule(schedule, job) |
107 } | 103 } |
108 | 104 |
114 cr.Start() | 110 cr.Start() |
115 | 111 |
116 return nil | 112 return nil |
117 } | 113 } |
118 | 114 |
119 // BindAction binds a named action to a user, a cron spec and | 115 // BindAction binds a named action to a cron spec and |
120 // a configuration id. | 116 // a configuration id. |
121 func BindAction(name, spec, user string, cfgID int64) error { | 117 func BindAction(name, spec string, cfgID int64) error { |
122 return global.bindAction(name, spec, user, cfgID) | 118 return global.bindAction(name, spec, cfgID) |
123 } | 119 } |
124 | 120 |
125 // UnbindAction unbinds a named action from a user and | 121 // UnbindAction unbinds a named action from a user and |
126 // a configuration id. | 122 // a configuration id. |
127 func UnbindAction(name, user string, cfgID int64) { | 123 func UnbindAction(name string, cfgID int64) { |
128 global.unbindAction(name, user, cfgID) | 124 global.unbindAction(name, cfgID) |
129 } | 125 } |
130 | 126 |
131 // UnbindByID unbinds all schedules with a given id. | 127 // UnbindByID unbinds all schedules with a given id. |
132 func UnbindByID(cfgID int64) { | 128 func UnbindByID(cfgID int64) { |
133 global.unbindByID(cfgID) | 129 global.unbindByID(cfgID) |
134 } | 130 } |
135 | 131 |
136 // UnbindUser unbinds all schedules for a given user. | 132 // UnbindUser unbinds all schedules for a given user. |
137 func UnbindUser(user string) { | 133 func UnbindByIDs(ids map[int64]struct{}) { |
138 global.unbindUser(user) | 134 global.unbindByIDs(ids) |
139 } | 135 } |
140 | 136 |
141 // HasAction asks if there is an action with a given name. | 137 // HasAction asks if there is an action with a given name. |
142 func HasAction(name string) bool { | 138 func HasAction(name string) bool { |
143 return global.hasAction(name) | 139 return global.hasAction(name) |
147 s.mu.Lock() | 143 s.mu.Lock() |
148 defer s.mu.Unlock() | 144 defer s.mu.Unlock() |
149 return s.actions[name] != nil | 145 return s.actions[name] != nil |
150 } | 146 } |
151 | 147 |
152 func (s *scheduler) unbindUser(user string) { | 148 func (s *scheduler) unbindByIDs(ids map[int64]struct{}) { |
153 s.mu.Lock() | 149 s.mu.Lock() |
154 defer s.mu.Unlock() | 150 defer s.mu.Unlock() |
155 | 151 |
156 entries := s.cr.Entries() | 152 entries := s.cr.Entries() |
157 | 153 |
160 } | 156 } |
161 | 157 |
162 var found bool | 158 var found bool |
163 for _, entry := range entries { | 159 for _, entry := range entries { |
164 ua := entry.Job.(*userAction) | 160 ua := entry.Job.(*userAction) |
165 if ua.user == user { | 161 if _, found = ids[ua.cfgID]; found { |
166 found = true | |
167 break | 162 break |
168 } | 163 } |
169 } | 164 } |
170 if !found { | 165 if !found { |
171 return | 166 return |
173 | 168 |
174 s.cr.Stop() | 169 s.cr.Stop() |
175 s.cr = cron.New() | 170 s.cr = cron.New() |
176 for _, entry := range entries { | 171 for _, entry := range entries { |
177 ua := entry.Job.(*userAction) | 172 ua := entry.Job.(*userAction) |
178 if ua.user != user { | 173 if _, found := ids[ua.cfgID]; !found { |
179 s.cr.Schedule(entry.Schedule, entry.Job) | 174 s.cr.Schedule(entry.Schedule, entry.Job) |
180 } | 175 } |
181 } | 176 } |
182 s.cr.Start() | 177 s.cr.Start() |
183 } | 178 } |
210 } | 205 } |
211 } | 206 } |
212 s.cr.Start() | 207 s.cr.Start() |
213 } | 208 } |
214 | 209 |
215 func (s *scheduler) unbindAction(name, user string, cfgID int64) { | 210 func (s *scheduler) unbindAction(name string, cfgID int64) { |
216 s.mu.Lock() | 211 s.mu.Lock() |
217 defer s.mu.Unlock() | 212 defer s.mu.Unlock() |
218 | 213 |
219 entries := s.cr.Entries() | 214 entries := s.cr.Entries() |
220 | 215 |
221 var found *userAction | 216 var found *userAction |
222 for _, entry := range entries { | 217 for _, entry := range entries { |
223 ua := entry.Job.(*userAction) | 218 ua := entry.Job.(*userAction) |
224 if ua.name == name && ua.user == user && cfgID == ua.cfgID { | 219 if ua.name == name && cfgID == ua.cfgID { |
225 // Already have such a user/action/cfg tuple -> re-schedule. | 220 // Already have such a action/cfg tuple -> re-schedule. |
226 found = ua | 221 found = ua |
227 break | 222 break |
228 } | 223 } |
229 } | 224 } |
230 | 225 |
241 } | 236 } |
242 } | 237 } |
243 s.cr.Start() | 238 s.cr.Start() |
244 } | 239 } |
245 | 240 |
246 func (s *scheduler) bindAction(name, spec, user string, cfgID int64) error { | 241 func (s *scheduler) bindAction(name, spec string, cfgID int64) error { |
247 | 242 |
248 schedule, err := cron.Parse(spec) | 243 schedule, err := cron.Parse(spec) |
249 if err != nil { | 244 if err != nil { |
250 return err | 245 return err |
251 } | 246 } |
256 entries := s.cr.Entries() | 251 entries := s.cr.Entries() |
257 | 252 |
258 var found *userAction | 253 var found *userAction |
259 for _, entry := range entries { | 254 for _, entry := range entries { |
260 ua := entry.Job.(*userAction) | 255 ua := entry.Job.(*userAction) |
261 if ua.name == name && ua.user == user && cfgID == ua.cfgID { | 256 if ua.name == name && cfgID == ua.cfgID { |
262 // Already have such a user/action/cfg tuple -> re-schedule. | 257 // Already have such a user/action/cfg tuple -> re-schedule. |
263 found = ua | 258 found = ua |
264 break | 259 break |
265 } | 260 } |
266 } | 261 } |
267 | 262 |
268 if found == nil { | 263 if found == nil { |
269 // Add to current plan. | 264 // Add to current plan. |
270 job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID} | 265 job := &userAction{scheduler: s, name: name, cfgID: cfgID} |
271 s.cr.Schedule(schedule, job) | 266 s.cr.Schedule(schedule, job) |
272 } else { | 267 } else { |
273 // If found re-build all. | 268 // If found re-build all. |
274 s.cr.Stop() | 269 s.cr.Stop() |
275 s.cr = cron.New() | 270 s.cr = cron.New() |