comparison pkg/scheduler/scheduler.go @ 1654:85386ad17d34

Scheduled imports: Don't track the user in the running scheduler. The user is stored in the database already.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 20 Dec 2018 18:39:19 +0100
parents f39957ea08aa
children 6caf5cd6249e
comparison
equal deleted inserted replaced
1653:06f08458d666 1654:85386ad17d34
22 ) 22 )
23 23
24 // ErrNoSuchAction if no fitting action was found. 24 // ErrNoSuchAction if no fitting action was found.
25 var ErrNoSuchAction = errors.New("No such action") 25 var ErrNoSuchAction = errors.New("No such action")
26 26
27 // Action is called with a user and a configuration id. 27 // Action is called with a configuration id.
28 type Action func(user string, cfgID int64) 28 type Action func(cfgID int64)
29 29
30 type userAction struct { 30 type userAction struct {
31 scheduler *scheduler 31 scheduler *scheduler
32 user string
33 name string 32 name string
34 cfgID int64 33 cfgID int64
35 } 34 }
36 35
37 type scheduler struct { 36 type scheduler struct {
41 } 40 }
42 41
43 // Run implements cron.Job. 42 // Run implements cron.Job.
44 func (ua *userAction) Run() { 43 func (ua *userAction) Run() {
45 if a := ua.scheduler.action(ua.name); a != nil { 44 if a := ua.scheduler.action(ua.name); a != nil {
46 a(ua.user, ua.cfgID) 45 a(ua.cfgID)
47 } else { 46 } else {
48 log.Printf("warn: scheduled action '%s' not found.", ua.name) 47 log.Printf("warn: scheduled action '%s' not found.", ua.name)
49 } 48 }
50 } 49 }
51 50
63 func UnregisterAction(name string) { 62 func UnregisterAction(name string) {
64 global.unregisterAction(name) 63 global.unregisterAction(name)
65 } 64 }
66 65
67 // BoundAction is a complete set of infos for 66 // BoundAction is a complete set of infos for
68 // an action to be bound to a user, schedule and 67 // an action to be bound to a schedule and
69 // configuration id. 68 // configuration id.
70 type BoundAction struct { 69 type BoundAction struct {
71 Name string 70 Name string
72 Spec string 71 Spec string
73 User string
74 CfgID int64 72 CfgID int64
75 } 73 }
76 74
77 // BootActions setup the global scheduler with a set 75 // BootActions setup the global scheduler with a set
78 // of bound actions delivered by the next function. 76 // of bound actions delivered by the next function.
97 if err != nil { 95 if err != nil {
98 return err 96 return err
99 } 97 }
100 job := &userAction{ 98 job := &userAction{
101 scheduler: s, 99 scheduler: s,
102 user: ba.User,
103 name: ba.Name,
104 cfgID: ba.CfgID, 100 cfgID: ba.CfgID,
105 } 101 }
106 cr.Schedule(schedule, job) 102 cr.Schedule(schedule, job)
107 } 103 }
108 104
114 cr.Start() 110 cr.Start()
115 111
116 return nil 112 return nil
117 } 113 }
118 114
119 // BindAction binds a named action to a user, a cron spec and 115 // BindAction binds a named action to a cron spec and
120 // a configuration id. 116 // a configuration id.
121 func BindAction(name, spec, user string, cfgID int64) error { 117 func BindAction(name, spec string, cfgID int64) error {
122 return global.bindAction(name, spec, user, cfgID) 118 return global.bindAction(name, spec, cfgID)
123 } 119 }
124 120
125 // UnbindAction unbinds a named action from a user and 121 // UnbindAction unbinds a named action from a user and
126 // a configuration id. 122 // a configuration id.
127 func UnbindAction(name, user string, cfgID int64) { 123 func UnbindAction(name string, cfgID int64) {
128 global.unbindAction(name, user, cfgID) 124 global.unbindAction(name, cfgID)
129 } 125 }
130 126
131 // UnbindByID unbinds all schedules with a given id. 127 // UnbindByID unbinds all schedules with a given id.
132 func UnbindByID(cfgID int64) { 128 func UnbindByID(cfgID int64) {
133 global.unbindByID(cfgID) 129 global.unbindByID(cfgID)
134 } 130 }
135 131
136 // UnbindUser unbinds all schedules for a given user. 132 // UnbindUser unbinds all schedules for a given user.
137 func UnbindUser(user string) { 133 func UnbindByIDs(ids map[int64]struct{}) {
138 global.unbindUser(user) 134 global.unbindByIDs(ids)
139 } 135 }
140 136
141 // HasAction asks if there is an action with a given name. 137 // HasAction asks if there is an action with a given name.
142 func HasAction(name string) bool { 138 func HasAction(name string) bool {
143 return global.hasAction(name) 139 return global.hasAction(name)
147 s.mu.Lock() 143 s.mu.Lock()
148 defer s.mu.Unlock() 144 defer s.mu.Unlock()
149 return s.actions[name] != nil 145 return s.actions[name] != nil
150 } 146 }
151 147
152 func (s *scheduler) unbindUser(user string) { 148 func (s *scheduler) unbindByIDs(ids map[int64]struct{}) {
153 s.mu.Lock() 149 s.mu.Lock()
154 defer s.mu.Unlock() 150 defer s.mu.Unlock()
155 151
156 entries := s.cr.Entries() 152 entries := s.cr.Entries()
157 153
160 } 156 }
161 157
162 var found bool 158 var found bool
163 for _, entry := range entries { 159 for _, entry := range entries {
164 ua := entry.Job.(*userAction) 160 ua := entry.Job.(*userAction)
165 if ua.user == user { 161 if _, found = ids[ua.cfgID]; found {
166 found = true
167 break 162 break
168 } 163 }
169 } 164 }
170 if !found { 165 if !found {
171 return 166 return
173 168
174 s.cr.Stop() 169 s.cr.Stop()
175 s.cr = cron.New() 170 s.cr = cron.New()
176 for _, entry := range entries { 171 for _, entry := range entries {
177 ua := entry.Job.(*userAction) 172 ua := entry.Job.(*userAction)
178 if ua.user != user { 173 if _, found := ids[ua.cfgID]; !found {
179 s.cr.Schedule(entry.Schedule, entry.Job) 174 s.cr.Schedule(entry.Schedule, entry.Job)
180 } 175 }
181 } 176 }
182 s.cr.Start() 177 s.cr.Start()
183 } 178 }
210 } 205 }
211 } 206 }
212 s.cr.Start() 207 s.cr.Start()
213 } 208 }
214 209
215 func (s *scheduler) unbindAction(name, user string, cfgID int64) { 210 func (s *scheduler) unbindAction(name string, cfgID int64) {
216 s.mu.Lock() 211 s.mu.Lock()
217 defer s.mu.Unlock() 212 defer s.mu.Unlock()
218 213
219 entries := s.cr.Entries() 214 entries := s.cr.Entries()
220 215
221 var found *userAction 216 var found *userAction
222 for _, entry := range entries { 217 for _, entry := range entries {
223 ua := entry.Job.(*userAction) 218 ua := entry.Job.(*userAction)
224 if ua.name == name && ua.user == user && cfgID == ua.cfgID { 219 if ua.name == name && cfgID == ua.cfgID {
225 // Already have such a user/action/cfg tuple -> re-schedule. 220 // Already have such a action/cfg tuple -> re-schedule.
226 found = ua 221 found = ua
227 break 222 break
228 } 223 }
229 } 224 }
230 225
241 } 236 }
242 } 237 }
243 s.cr.Start() 238 s.cr.Start()
244 } 239 }
245 240
246 func (s *scheduler) bindAction(name, spec, user string, cfgID int64) error { 241 func (s *scheduler) bindAction(name, spec string, cfgID int64) error {
247 242
248 schedule, err := cron.Parse(spec) 243 schedule, err := cron.Parse(spec)
249 if err != nil { 244 if err != nil {
250 return err 245 return err
251 } 246 }
256 entries := s.cr.Entries() 251 entries := s.cr.Entries()
257 252
258 var found *userAction 253 var found *userAction
259 for _, entry := range entries { 254 for _, entry := range entries {
260 ua := entry.Job.(*userAction) 255 ua := entry.Job.(*userAction)
261 if ua.name == name && ua.user == user && cfgID == ua.cfgID { 256 if ua.name == name && cfgID == ua.cfgID {
262 // Already have such a user/action/cfg tuple -> re-schedule. 257 // Already have such a user/action/cfg tuple -> re-schedule.
263 found = ua 258 found = ua
264 break 259 break
265 } 260 }
266 } 261 }
267 262
268 if found == nil { 263 if found == nil {
269 // Add to current plan. 264 // Add to current plan.
270 job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID} 265 job := &userAction{scheduler: s, name: name, cfgID: cfgID}
271 s.cr.Schedule(schedule, job) 266 s.cr.Schedule(schedule, job)
272 } else { 267 } else {
273 // If found re-build all. 268 // If found re-build all.
274 s.cr.Stop() 269 s.cr.Stop()
275 s.cr = cron.New() 270 s.cr = cron.New()