diff rhodecode/lib/celerypylons/commands.py @ 776:f6c613fba757 beta

Celery is configured by the .ini files and run from paster now removed celeryconfig, added homebrew celery-pylons, added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 27 Nov 2010 01:27:24 +0100
parents
children 277427ac29a9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/celerypylons/commands.py	Sat Nov 27 01:27:24 2010 +0100
@@ -0,0 +1,143 @@
+import os
+from paste.script.command import Command, BadCommand
+import paste.deploy
+from pylons import config
+
+
+__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
+           'CAMQPAdminCommand', 'CeleryEventCommand']
+
+
+class CeleryCommand(Command):
+    """
+    Abstract Base Class for celery commands.
+
+    The celery commands are somewhat aggressive about loading
+    celery.conf, and since our module sets the `CELERY_LOADER`
+    environment variable to our loader, we have to bootstrap a bit and
+    make sure we've had a chance to load the pylons config off of the
+    command line, otherwise everything fails.
+    """
+    min_args = 1
+    min_args_error = "Please provide a paster config file as an argument."
+    takes_config_file = 1
+    requires_config_file = True
+
+    def run(self, args):
+        """
+        Overrides Command.run
+        
+        Checks for a config file argument and loads it.
+        """
+        if len(args) < self.min_args:
+            raise BadCommand(
+                self.min_args_error % {'min_args': self.min_args,
+                                       'actual_args': len(args)})
+        # Decrement because we're going to lob off the first argument.
+        # @@ This is hacky
+        self.min_args -= 1
+        self.bootstrap_config(args[0])
+        self.update_parser()
+        return super(CeleryCommand, self).run(args[1:])
+
+    def update_parser(self):
+        """
+        Abstract method.  Allows for the class's parser to be updated
+        before the superclass's `run` method is called.  Necessary to
+        allow options/arguments to be passed through to the underlying
+        celery command.
+        """
+        raise NotImplementedError("Abstract Method.")
+
+    def bootstrap_config(self, conf):
+        """
+        Loads the pylons configuration.
+        """
+        path_to_ini_file = os.path.realpath(conf)
+        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
+        config.init_app(conf.global_conf, conf.local_conf)
+
+
+class CeleryDaemonCommand(CeleryCommand):
+    """Start the celery worker
+
+    Starts the celery worker that uses a paste.deploy configuration
+    file.
+    """
+    usage = 'CONFIG_FILE [celeryd options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celeryd
+        for x in celeryd.WorkerCommand().get_options():
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celeryd
+        return celeryd.WorkerCommand().run(**vars(self.options))
+
+
+class CeleryBeatCommand(CeleryCommand):
+    """Start the celery beat server
+
+    Starts the celery beat server using a paste.deploy configuration
+    file.
+    """
+    usage = 'CONFIG_FILE [celerybeat options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celerybeat
+        for x in celerybeat.BeatCommand().get_options():
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celerybeat
+        return celerybeat.BeatCommand(**vars(self.options))
+
+class CAMQPAdminCommand(CeleryCommand):
+    """CAMQP Admin
+
+    CAMQP celery admin tool.
+    """
+    usage = 'CONFIG_FILE [camqadm options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import camqadm
+        for x in camqadm.OPTION_LIST:
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import camqadm
+        return camqadm.camqadm(*self.args, **vars(self.options))
+
+
+class CeleryEventCommand(CeleryCommand):
+    """Celery event commandd.
+
+    Capture celery events.
+    """
+    usage = 'CONFIG_FILE [celeryev options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celeryev
+        for x in celeryev.OPTION_LIST:
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celeryev
+        return celeryev.run_celeryev(**vars(self.options))