changeset 1002:3a7f5b1a19dd beta

made rhodecode work with celery 2.2, made some tasks optimizations(forget results) added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem.
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 08 Feb 2011 02:57:21 +0100
parents 94e0541a5283
children 9037456bb17f
files celeryconfig.py rhodecode/lib/celerylib/tasks.py rhodecode/lib/celerypylons/commands.py rhodecode/lib/celerypylons/loader.py setup.py
diffstat 5 files changed, 84 insertions(+), 56 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/celeryconfig.py	Tue Feb 08 02:57:21 2011 +0100
@@ -0,0 +1,4 @@
+## Broker settings.
+BROKER_VHOST = "rabbitmqhost"
+BROKER_USER = "rabbitmq"
+BROKER_PASSWORD = "qweqwe"
--- a/rhodecode/lib/celerylib/tasks.py	Mon Feb 07 21:40:57 2011 +0100
+++ b/rhodecode/lib/celerylib/tasks.py	Tue Feb 08 02:57:21 2011 +0100
@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 """
     rhodecode.lib.celerylib.tasks
-    ~~~~~~~~~~~~~~
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     RhodeCode task modules, containing all task that suppose to be run
     by celery daemon
@@ -29,6 +29,8 @@
 
 import os
 import traceback
+import logging
+
 from time import mktime
 from operator import itemgetter
 
@@ -72,21 +74,25 @@
     q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
     return q.ui_value
 
-@task
+@task(ignore_result=True)
 @locked_task
 def whoosh_index(repo_location, full_index):
-    log = whoosh_index.get_logger()
+    #log = whoosh_index.get_logger()
     from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
     index_location = config['index_dir']
     WhooshIndexingDaemon(index_location=index_location,
                          repo_location=repo_location, sa=get_session())\
                          .run(full_index=full_index)
 
-@task
+@task(ignore_result=True)
 @locked_task
 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
+    try:
+        log = get_commits_stats.get_logger()
+    except:
+        log = logging.getLogger(__name__)
+
     from rhodecode.model.db import Statistics, Repository
-    log = get_commits_stats.get_logger()
 
     #for js data compatibilty
     author_key_cleaner = lambda k: person(k).replace('"', "")
@@ -218,9 +224,13 @@
 
     return True
 
-@task
+@task(ignore_result=True)
 def reset_user_password(user_email):
-    log = reset_user_password.get_logger()
+    try:
+        log = reset_user_password.get_logger()
+    except:
+        log = logging.getLogger(__name__)
+
     from rhodecode.lib import auth
     from rhodecode.model.db import User
 
@@ -254,7 +264,7 @@
 
     return True
 
-@task
+@task(ignore_result=True)
 def send_email(recipients, subject, body):
     """
     Sends an email with defined parameters from the .ini files.
@@ -265,7 +275,11 @@
     :param subject: subject of the mail
     :param body: body of the mail
     """
-    log = send_email.get_logger()
+    try:
+        log = send_email.get_logger()
+    except:
+        log = logging.getLogger(__name__)
+
     email_config = config
 
     if not recipients:
@@ -289,11 +303,16 @@
         return False
     return True
 
-@task
+@task(ignore_result=True)
 def create_repo_fork(form_data, cur_user):
+    try:
+        log = create_repo_fork.get_logger()
+    except:
+        log = logging.getLogger(__name__)
+
     from rhodecode.model.repo import RepoModel
     from vcs import get_backend
-    log = create_repo_fork.get_logger()
+
     repo_model = RepoModel(get_session())
     repo_model.create(form_data, cur_user, just_db=True, fork=True)
     repo_name = form_data['repo_name']
--- a/rhodecode/lib/celerypylons/commands.py	Mon Feb 07 21:40:57 2011 +0100
+++ b/rhodecode/lib/celerypylons/commands.py	Tue Feb 08 02:57:21 2011 +0100
@@ -1,11 +1,35 @@
 from rhodecode.lib.utils import BasePasterCommand, Command
-
+from celery.app import app_or_default
+from celery.bin import camqadm, celerybeat, celeryd, celeryev
 
 __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
            'CAMQPAdminCommand', 'CeleryEventCommand']
 
 
-class CeleryDaemonCommand(BasePasterCommand):
+class CeleryCommand(BasePasterCommand):
+    """Abstract class implements run methods needed for celery
+
+    Starts the celery worker that uses a paste.deploy configuration
+    file.
+    """
+
+    def update_parser(self):
+        """
+        Abstract method.  Allows for the class's parser to be updated
+        before the superclass's `run` method is called.  Necessary to
+        allow options/arguments to be passed through to the underlying
+        celery command.
+        """
+
+        cmd = self.celery_command(app_or_default())
+        for x in cmd.get_options():
+            self.parser.add_option(x)
+
+    def command(self):
+        cmd = self.celery_command(app_or_default())
+        return cmd.run(**vars(self.options))
+
+class CeleryDaemonCommand(CeleryCommand):
     """Start the celery worker
 
     Starts the celery worker that uses a paste.deploy configuration
@@ -16,18 +40,10 @@
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-
-    def update_parser(self):
-        from celery.bin import celeryd
-        for x in celeryd.WorkerCommand().get_options():
-            self.parser.add_option(x)
-
-    def command(self):
-        from celery.bin import celeryd
-        return celeryd.WorkerCommand().run(**vars(self.options))
+    celery_command = celeryd.WorkerCommand
 
 
-class CeleryBeatCommand(BasePasterCommand):
+class CeleryBeatCommand(CeleryCommand):
     """Start the celery beat server
 
     Starts the celery beat server using a paste.deploy configuration
@@ -38,17 +54,10 @@
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-
-    def update_parser(self):
-        from celery.bin import celerybeat
-        for x in celerybeat.BeatCommand().get_options():
-            self.parser.add_option(x)
+    celery_command = celerybeat.BeatCommand
 
-    def command(self):
-        from celery.bin import celerybeat
-        return celerybeat.BeatCommand(**vars(self.options))
 
-class CAMQPAdminCommand(BasePasterCommand):
+class CAMQPAdminCommand(CeleryCommand):
     """CAMQP Admin
 
     CAMQP celery admin tool.
@@ -58,19 +67,10 @@
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-
-    def update_parser(self):
-        from celery.bin import camqadm
-        for x in camqadm.OPTION_LIST:
-            self.parser.add_option(x)
+    celery_command = camqadm.AMQPAdminCommand
 
-    def command(self):
-        from celery.bin import camqadm
-        return camqadm.camqadm(*self.args, **vars(self.options))
-
-
-class CeleryEventCommand(BasePasterCommand):
-    """Celery event commandd.
+class CeleryEventCommand(CeleryCommand):
+    """Celery event command.
 
     Capture celery events.
     """
@@ -79,12 +79,4 @@
     description = "".join(__doc__.splitlines()[2:])
 
     parser = Command.standard_parser(quiet=True)
-
-    def update_parser(self):
-        from celery.bin import celeryev
-        for x in celeryev.OPTION_LIST:
-            self.parser.add_option(x)
-
-    def command(self):
-        from celery.bin import celeryev
-        return celeryev.run_celeryev(**vars(self.options))
+    celery_command = celeryev.EvCommand
--- a/rhodecode/lib/celerypylons/loader.py	Mon Feb 07 21:40:57 2011 +0100
+++ b/rhodecode/lib/celerypylons/loader.py	Tue Feb 08 02:57:21 2011 +0100
@@ -17,15 +17,29 @@
         pylons_key = to_pylons(key)
         try:
             value = config[pylons_key]
-            if key in LIST_PARAMS: return value.split()
+            if key in LIST_PARAMS:return value.split()
             return self.type_converter(value)
         except KeyError:
             raise AttributeError(pylons_key)
 
+    def get(self, key):
+        try:
+            return self.__getattr__(key)
+        except AttributeError:
+            return None
+
+    def __getitem__(self, key):
+        try:
+            return self.__getattr__(key)
+        except AttributeError:
+            raise KeyError()
+
     def __setattr__(self, key, value):
         pylons_key = to_pylons(key)
         config[pylons_key] = value
 
+    def __setitem__(self, key, value):
+        self.__setattr__(key, value)
 
     def type_converter(self, value):
         #cast to int
@@ -35,7 +49,6 @@
         #cast to bool
         if value.lower() in ['true', 'false']:
             return value.lower() == 'true'
-
         return value
 
 class PylonsLoader(BaseLoader):
--- a/setup.py	Mon Feb 07 21:40:57 2011 +0100
+++ b/setup.py	Tue Feb 08 02:57:21 2011 +0100
@@ -12,7 +12,7 @@
         "pygments>=1.4",
         "mercurial>=1.7.3",
         "whoosh>=1.3.4",
-        "celery>=2.1.4",
+        "celery>=2.2.2",
         "py-bcrypt",
         "babel",
     ]