comparison rhodecode/lib/celerypylons/commands.py @ 1002:3a7f5b1a19dd beta

made rhodecode work with celery 2.2, made some tasks optimizations(forget results) added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem.
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 08 Feb 2011 02:57:21 +0100
parents 277427ac29a9
children 13d6da57b0cf
comparison
equal deleted inserted replaced
1001:94e0541a5283 1002:3a7f5b1a19dd
1 from rhodecode.lib.utils import BasePasterCommand, Command 1 from rhodecode.lib.utils import BasePasterCommand, Command
2 2 from celery.app import app_or_default
3 from celery.bin import camqadm, celerybeat, celeryd, celeryev
3 4
4 __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 5 __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
5 'CAMQPAdminCommand', 'CeleryEventCommand'] 6 'CAMQPAdminCommand', 'CeleryEventCommand']
6 7
7 8
8 class CeleryDaemonCommand(BasePasterCommand): 9 class CeleryCommand(BasePasterCommand):
10 """Abstract class implements run methods needed for celery
11
12 Starts the celery worker that uses a paste.deploy configuration
13 file.
14 """
15
16 def update_parser(self):
17 """
18 Abstract method. Allows for the class's parser to be updated
19 before the superclass's `run` method is called. Necessary to
20 allow options/arguments to be passed through to the underlying
21 celery command.
22 """
23
24 cmd = self.celery_command(app_or_default())
25 for x in cmd.get_options():
26 self.parser.add_option(x)
27
28 def command(self):
29 cmd = self.celery_command(app_or_default())
30 return cmd.run(**vars(self.options))
31
32 class CeleryDaemonCommand(CeleryCommand):
9 """Start the celery worker 33 """Start the celery worker
10 34
11 Starts the celery worker that uses a paste.deploy configuration 35 Starts the celery worker that uses a paste.deploy configuration
12 file. 36 file.
13 """ 37 """
14 usage = 'CONFIG_FILE [celeryd options...]' 38 usage = 'CONFIG_FILE [celeryd options...]'
15 summary = __doc__.splitlines()[0] 39 summary = __doc__.splitlines()[0]
16 description = "".join(__doc__.splitlines()[2:]) 40 description = "".join(__doc__.splitlines()[2:])
17 41
18 parser = Command.standard_parser(quiet=True) 42 parser = Command.standard_parser(quiet=True)
19 43 celery_command = celeryd.WorkerCommand
20 def update_parser(self):
21 from celery.bin import celeryd
22 for x in celeryd.WorkerCommand().get_options():
23 self.parser.add_option(x)
24
25 def command(self):
26 from celery.bin import celeryd
27 return celeryd.WorkerCommand().run(**vars(self.options))
28 44
29 45
30 class CeleryBeatCommand(BasePasterCommand): 46 class CeleryBeatCommand(CeleryCommand):
31 """Start the celery beat server 47 """Start the celery beat server
32 48
33 Starts the celery beat server using a paste.deploy configuration 49 Starts the celery beat server using a paste.deploy configuration
34 file. 50 file.
35 """ 51 """
36 usage = 'CONFIG_FILE [celerybeat options...]' 52 usage = 'CONFIG_FILE [celerybeat options...]'
37 summary = __doc__.splitlines()[0] 53 summary = __doc__.splitlines()[0]
38 description = "".join(__doc__.splitlines()[2:]) 54 description = "".join(__doc__.splitlines()[2:])
39 55
40 parser = Command.standard_parser(quiet=True) 56 parser = Command.standard_parser(quiet=True)
57 celery_command = celerybeat.BeatCommand
41 58
42 def update_parser(self):
43 from celery.bin import celerybeat
44 for x in celerybeat.BeatCommand().get_options():
45 self.parser.add_option(x)
46 59
47 def command(self): 60 class CAMQPAdminCommand(CeleryCommand):
48 from celery.bin import celerybeat
49 return celerybeat.BeatCommand(**vars(self.options))
50
51 class CAMQPAdminCommand(BasePasterCommand):
52 """CAMQP Admin 61 """CAMQP Admin
53 62
54 CAMQP celery admin tool. 63 CAMQP celery admin tool.
55 """ 64 """
56 usage = 'CONFIG_FILE [camqadm options...]' 65 usage = 'CONFIG_FILE [camqadm options...]'
57 summary = __doc__.splitlines()[0] 66 summary = __doc__.splitlines()[0]
58 description = "".join(__doc__.splitlines()[2:]) 67 description = "".join(__doc__.splitlines()[2:])
59 68
60 parser = Command.standard_parser(quiet=True) 69 parser = Command.standard_parser(quiet=True)
70 celery_command = camqadm.AMQPAdminCommand
61 71
62 def update_parser(self): 72 class CeleryEventCommand(CeleryCommand):
63 from celery.bin import camqadm 73 """Celery event command.
64 for x in camqadm.OPTION_LIST:
65 self.parser.add_option(x)
66
67 def command(self):
68 from celery.bin import camqadm
69 return camqadm.camqadm(*self.args, **vars(self.options))
70
71
72 class CeleryEventCommand(BasePasterCommand):
73 """Celery event commandd.
74 74
75 Capture celery events. 75 Capture celery events.
76 """ 76 """
77 usage = 'CONFIG_FILE [celeryev options...]' 77 usage = 'CONFIG_FILE [celeryev options...]'
78 summary = __doc__.splitlines()[0] 78 summary = __doc__.splitlines()[0]
79 description = "".join(__doc__.splitlines()[2:]) 79 description = "".join(__doc__.splitlines()[2:])
80 80
81 parser = Command.standard_parser(quiet=True) 81 parser = Command.standard_parser(quiet=True)
82 82 celery_command = celeryev.EvCommand
83 def update_parser(self):
84 from celery.bin import celeryev
85 for x in celeryev.OPTION_LIST:
86 self.parser.add_option(x)
87
88 def command(self):
89 from celery.bin import celeryev
90 return celeryev.run_celeryev(**vars(self.options))