# HG changeset patch # User Marcin Kuzminski # Date 1297130241 -3600 # Node ID 3a7f5b1a19ddff273490770a3d6d7b240ce90958 # Parent 94e0541a5283662a12930f5530bc47bc91075017 made rhodecode work with celery 2.2, made some tasks optimizations(forget results) added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem. diff -r 94e0541a5283 -r 3a7f5b1a19dd celeryconfig.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/celeryconfig.py Tue Feb 08 02:57:21 2011 +0100 @@ -0,0 +1,4 @@ +## Broker settings. +BROKER_VHOST = "rabbitmqhost" +BROKER_USER = "rabbitmq" +BROKER_PASSWORD = "qweqwe" diff -r 94e0541a5283 -r 3a7f5b1a19dd rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py Mon Feb 07 21:40:57 2011 +0100 +++ b/rhodecode/lib/celerylib/tasks.py Tue Feb 08 02:57:21 2011 +0100 @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ rhodecode.lib.celerylib.tasks - ~~~~~~~~~~~~~~ + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ RhodeCode task modules, containing all task that suppose to be run by celery daemon @@ -29,6 +29,8 @@ import os import traceback +import logging + from time import mktime from operator import itemgetter @@ -72,21 +74,25 @@ q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() return q.ui_value -@task +@task(ignore_result=True) @locked_task def whoosh_index(repo_location, full_index): - log = whoosh_index.get_logger() + #log = whoosh_index.get_logger() from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon index_location = config['index_dir'] WhooshIndexingDaemon(index_location=index_location, repo_location=repo_location, sa=get_session())\ .run(full_index=full_index) -@task +@task(ignore_result=True) @locked_task def get_commits_stats(repo_name, ts_min_y, ts_max_y): + try: + log = get_commits_stats.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.db import Statistics, Repository - log = get_commits_stats.get_logger() #for js data compatibilty author_key_cleaner = lambda k: person(k).replace('"', "") @@ -218,9 +224,13 @@ return True -@task +@task(ignore_result=True) def reset_user_password(user_email): - log = reset_user_password.get_logger() + try: + log = reset_user_password.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.lib import auth from rhodecode.model.db import User @@ -254,7 +264,7 @@ return True -@task +@task(ignore_result=True) def send_email(recipients, subject, body): """ Sends an email with defined parameters from the .ini files. @@ -265,7 +275,11 @@ :param subject: subject of the mail :param body: body of the mail """ - log = send_email.get_logger() + try: + log = send_email.get_logger() + except: + log = logging.getLogger(__name__) + email_config = config if not recipients: @@ -289,11 +303,16 @@ return False return True -@task +@task(ignore_result=True) def create_repo_fork(form_data, cur_user): + try: + log = create_repo_fork.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.repo import RepoModel from vcs import get_backend - log = create_repo_fork.get_logger() + repo_model = RepoModel(get_session()) repo_model.create(form_data, cur_user, just_db=True, fork=True) repo_name = form_data['repo_name'] diff -r 94e0541a5283 -r 3a7f5b1a19dd rhodecode/lib/celerypylons/commands.py --- a/rhodecode/lib/celerypylons/commands.py Mon Feb 07 21:40:57 2011 +0100 +++ b/rhodecode/lib/celerypylons/commands.py Tue Feb 08 02:57:21 2011 +0100 @@ -1,11 +1,35 @@ from rhodecode.lib.utils import BasePasterCommand, Command - +from celery.app import app_or_default +from celery.bin import camqadm, celerybeat, celeryd, celeryev __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand'] -class CeleryDaemonCommand(BasePasterCommand): +class CeleryCommand(BasePasterCommand): + """Abstract class implements run methods needed for celery + + Starts the celery worker that uses a paste.deploy configuration + file. + """ + + def update_parser(self): + """ + Abstract method. Allows for the class's parser to be updated + before the superclass's `run` method is called. Necessary to + allow options/arguments to be passed through to the underlying + celery command. + """ + + cmd = self.celery_command(app_or_default()) + for x in cmd.get_options(): + self.parser.add_option(x) + + def command(self): + cmd = self.celery_command(app_or_default()) + return cmd.run(**vars(self.options)) + +class CeleryDaemonCommand(CeleryCommand): """Start the celery worker Starts the celery worker that uses a paste.deploy configuration @@ -16,18 +40,10 @@ description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryd - for x in celeryd.WorkerCommand().get_options(): - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryd - return celeryd.WorkerCommand().run(**vars(self.options)) + celery_command = celeryd.WorkerCommand -class CeleryBeatCommand(BasePasterCommand): +class CeleryBeatCommand(CeleryCommand): """Start the celery beat server Starts the celery beat server using a paste.deploy configuration @@ -38,17 +54,10 @@ description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celerybeat - for x in celerybeat.BeatCommand().get_options(): - self.parser.add_option(x) + celery_command = celerybeat.BeatCommand - def command(self): - from celery.bin import celerybeat - return celerybeat.BeatCommand(**vars(self.options)) -class CAMQPAdminCommand(BasePasterCommand): +class CAMQPAdminCommand(CeleryCommand): """CAMQP Admin CAMQP celery admin tool. @@ -58,19 +67,10 @@ description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import camqadm - for x in camqadm.OPTION_LIST: - self.parser.add_option(x) + celery_command = camqadm.AMQPAdminCommand - def command(self): - from celery.bin import camqadm - return camqadm.camqadm(*self.args, **vars(self.options)) - - -class CeleryEventCommand(BasePasterCommand): - """Celery event commandd. +class CeleryEventCommand(CeleryCommand): + """Celery event command. Capture celery events. """ @@ -79,12 +79,4 @@ description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryev - for x in celeryev.OPTION_LIST: - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryev - return celeryev.run_celeryev(**vars(self.options)) + celery_command = celeryev.EvCommand diff -r 94e0541a5283 -r 3a7f5b1a19dd rhodecode/lib/celerypylons/loader.py --- a/rhodecode/lib/celerypylons/loader.py Mon Feb 07 21:40:57 2011 +0100 +++ b/rhodecode/lib/celerypylons/loader.py Tue Feb 08 02:57:21 2011 +0100 @@ -17,15 +17,29 @@ pylons_key = to_pylons(key) try: value = config[pylons_key] - if key in LIST_PARAMS: return value.split() + if key in LIST_PARAMS:return value.split() return self.type_converter(value) except KeyError: raise AttributeError(pylons_key) + def get(self, key): + try: + return self.__getattr__(key) + except AttributeError: + return None + + def __getitem__(self, key): + try: + return self.__getattr__(key) + except AttributeError: + raise KeyError() + def __setattr__(self, key, value): pylons_key = to_pylons(key) config[pylons_key] = value + def __setitem__(self, key, value): + self.__setattr__(key, value) def type_converter(self, value): #cast to int @@ -35,7 +49,6 @@ #cast to bool if value.lower() in ['true', 'false']: return value.lower() == 'true' - return value class PylonsLoader(BaseLoader): diff -r 94e0541a5283 -r 3a7f5b1a19dd setup.py --- a/setup.py Mon Feb 07 21:40:57 2011 +0100 +++ b/setup.py Tue Feb 08 02:57:21 2011 +0100 @@ -12,7 +12,7 @@ "pygments>=1.4", "mercurial>=1.7.3", "whoosh>=1.3.4", - "celery>=2.1.4", + "celery>=2.2.2", "py-bcrypt", "babel", ]