changeset 776:f6c613fba757 beta

Celery is configured by the .ini files and run from paster now removed celeryconfig, added homebrew celery-pylons, added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 27 Nov 2010 01:27:24 +0100
parents aaf2fc59a39a
children aac24db58ce8
files celeryconfig.py development.ini production.ini rhodecode/config/deployment.ini_tmpl rhodecode/lib/celerylib/__init__.py rhodecode/lib/celerylib/tasks.py rhodecode/lib/celerypylons/__init__.py rhodecode/lib/celerypylons/commands.py rhodecode/lib/celerypylons/loader.py test.ini
diffstat 10 files changed, 453 insertions(+), 161 deletions(-) [+]
line wrap: on
line diff
--- a/celeryconfig.py	Fri Nov 26 23:49:19 2010 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,79 +0,0 @@
-# List of modules to import when celery starts.
-import sys
-import os
-import ConfigParser
-root = os.getcwd()
-
-PYLONS_CONFIG_NAME = 'production.ini'
-
-sys.path.append(root)
-config = ConfigParser.ConfigParser({'here':root})
-config.read('%s/%s' % (root, PYLONS_CONFIG_NAME))
-PYLONS_CONFIG = config
-
-CELERY_IMPORTS = ("rhodecode.lib.celerylib.tasks",)
-
-## Result store settings.
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_AMQP_TASK_RESULT_EXPIRES = 18000  # 5 hours.
-
-#CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
-CELERY_RESULT_SERIALIZER = 'json'
-
-
-BROKER_CONNECTION_MAX_RETRIES = 30
-
-## Broker settings.
-BROKER_HOST = "localhost"
-BROKER_PORT = 5672
-BROKER_VHOST = "rabbitmqhost"
-BROKER_USER = "rabbitmq"
-BROKER_PASSWORD = "qweqwe"
-
-## Worker settings
-## If you're doing mostly I/O you can have more processes,
-## but if mostly spending CPU, try to keep it close to the
-## number of CPUs on your machine. If not set, the number of CPUs/cores
-## available will be used.
-CELERYD_CONCURRENCY = 2
-# CELERYD_LOG_FILE = "celeryd.log"
-CELERYD_LOG_LEVEL = "DEBUG"
-CELERYD_MAX_TASKS_PER_CHILD = 3
-
-#Tasks will never be sent to the queue, but executed locally instead.
-CELERY_ALWAYS_EAGER = False
-if PYLONS_CONFIG_NAME == 'test.ini':
-    #auto eager for tests
-    CELERY_ALWAYS_EAGER = True
-
-#===============================================================================
-# EMAIL SETTINGS
-#===============================================================================
-pylons_email_config = dict(config.items('DEFAULT'))
-
-CELERY_SEND_TASK_ERROR_EMAILS = True
-
-#List of (name, email_address) tuples for the admins that should receive error e-mails.
-ADMINS = [('Administrator', pylons_email_config.get('email_to'))]
-
-#The e-mail address this worker sends e-mails from. Default is "celery@localhost".
-SERVER_EMAIL = pylons_email_config.get('error_email_from')
-
-#The mail server to use. Default is "localhost".
-MAIL_HOST = pylons_email_config.get('smtp_server')
-
-#Username (if required) to log on to the mail server with.
-MAIL_HOST_USER = pylons_email_config.get('smtp_username')
-
-#Password (if required) to log on to the mail server with.
-MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password')
-
-MAIL_PORT = pylons_email_config.get('smtp_port')
-
-
-#===============================================================================
-# INSTRUCTIONS FOR RABBITMQ
-#===============================================================================
-# rabbitmqctl add_user rabbitmq qweqwe
-# rabbitmqctl add_vhost rabbitmqhost
-# rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
--- a/development.ini	Fri Nov 26 23:49:19 2010 +0100
+++ b/development.ini	Sat Nov 27 01:27:24 2010 +0100
@@ -1,6 +1,6 @@
 ################################################################################
 ################################################################################
-# rhodecode - Pylons environment configuration                                 #
+# RhodeCode - Pylons environment configuration                                 #
 #                                                                              # 
 # The %(here)s variable will be replaced with the parent directory of this file#
 ################################################################################
@@ -9,8 +9,8 @@
 debug = true
 ################################################################################
 ## Uncomment and replace with the address which should receive                ## 
-## any error reports after application crash								  ##
-## Additionally those settings will be used by rhodecode mailing system       ##
+## any error reports after application crash                                  ##
+## Additionally those settings will be used by RhodeCode mailing system       ##
 ################################################################################
 #email_to = admin@localhost
 #error_email_from = paste_error@localhost
@@ -19,15 +19,16 @@
 
 #smtp_server = mail.server.com
 #smtp_username = 
-#smtp_password =
+#smtp_password = 
 #smtp_port = 
-#smtp_use_tls = 
+#smtp_use_tls = false
+#smtp_use_ssl = true
 
 [server:main]
 ##nr of threads to spawn
 threadpool_workers = 5
 
-##max request before
+##max request before thread respawn
 threadpool_max_requests = 6
 
 ##option to use threads of process
@@ -46,6 +47,33 @@
 index_dir = %(here)s/data/index
 
 ####################################
+###        CELERY CONFIG        ####
+####################################
+use_celery = false
+broker.host = localhost
+broker.vhost = rabbitmqhost
+broker.port = 5672
+broker.user = rabbitmq
+broker.password = qweqwe
+
+celery.imports = rhodecode.lib.celerylib.tasks
+
+celery.result.backend = amqp
+celery.result.dburi = amqp://
+celery.result.serialier = json
+
+#celery.send.task.error.emails = true
+#celery.amqp.task.result.expires = 18000
+
+celeryd.concurrency = 2
+#celeryd.log.file = celeryd.log
+celeryd.log.level = debug
+celeryd.max.tasks.per.child = 3
+
+#tasks will never be sent to the queue, but executed locally instead.
+celery.always.eager = false
+
+####################################
 ###         BEAKER CACHE        ####
 ####################################
 beaker.cache.data_dir=/%(here)s/data/cache/data
@@ -61,9 +89,8 @@
 beaker.cache.long_term.type=memory
 beaker.cache.long_term.expire=36000
 
-
 beaker.cache.sql_cache_short.type=memory
-beaker.cache.sql_cache_short.expire=5
+beaker.cache.sql_cache_short.expire=10
 
 beaker.cache.sql_cache_med.type=memory
 beaker.cache.sql_cache_med.expire=360
@@ -75,7 +102,7 @@
 ###       BEAKER SESSION        ####
 ####################################
 ## Type of storage used for the session, current types are 
-## "dbm", "file", "memcached", "database", and "memory". 
+## dbm, file, memcached, database, and memory. 
 ## The storage uses the Container API 
 ##that is also used by the cache system.
 beaker.session.type = file
--- a/production.ini	Fri Nov 26 23:49:19 2010 +0100
+++ b/production.ini	Sat Nov 27 01:27:24 2010 +0100
@@ -47,6 +47,33 @@
 index_dir = %(here)s/data/index
 
 ####################################
+###        CELERY CONFIG        ####
+####################################
+use_celery = false
+broker.host = localhost
+broker.vhost = rabbitmqhost
+broker.port = 5672
+broker.user = rabbitmq
+broker.password = qweqwe
+
+celery.imports = rhodecode.lib.celerylib.tasks
+
+celery.result.backend = amqp
+celery.result.dburi = amqp://
+celery.result.serialier = json
+
+#celery.send.task.error.emails = true
+#celery.amqp.task.result.expires = 18000
+
+celeryd.concurrency = 2
+#celeryd.log.file = celeryd.log
+celeryd.log.level = debug
+celeryd.max.tasks.per.child = 3
+
+#tasks will never be sent to the queue, but executed locally instead.
+celery.always.eager = false
+
+####################################
 ###         BEAKER CACHE        ####
 ####################################
 beaker.cache.data_dir=/%(here)s/data/cache/data
--- a/rhodecode/config/deployment.ini_tmpl	Fri Nov 26 23:49:19 2010 +0100
+++ b/rhodecode/config/deployment.ini_tmpl	Sat Nov 27 01:27:24 2010 +0100
@@ -1,6 +1,6 @@
 ################################################################################
 ################################################################################
-# rhodecode - Pylons environment configuration                                 #
+# RhodeCode - Pylons environment configuration                                 #
 #                                                                              # 
 # The %(here)s variable will be replaced with the parent directory of this file#
 ################################################################################
@@ -10,7 +10,7 @@
 ################################################################################
 ## Uncomment and replace with the address which should receive                ## 
 ## any error reports after application crash                                  ##
-## Additionally those settings will be used by rhodecode mailing system       ##
+## Additionally those settings will be used by RhodeCode mailing system       ##
 ################################################################################
 #email_to = admin@localhost
 #error_email_from = paste_error@localhost
@@ -48,6 +48,33 @@
 app_instance_uuid = ${app_instance_uuid}
 
 ####################################
+###        CELERY CONFIG        ####
+####################################
+use_celery = false
+broker.host = localhost
+broker.vhost = rabbitmqhost
+broker.port = 5672
+broker.user = rabbitmq
+broker.password = qweqwe
+
+celery.imports = rhodecode.lib.celerylib.tasks
+
+celery.result.backend = amqp
+celery.result.dburi = amqp://
+celery.result.serialier = json
+
+#celery.send.task.error.emails = true
+#celery.amqp.task.result.expires = 18000
+
+celeryd.concurrency = 2
+#celeryd.log.file = celeryd.log
+celeryd.log.level = debug
+celeryd.max.tasks.per.child = 3
+
+#tasks will never be sent to the queue, but executed locally instead.
+celery.always.eager = false
+
+####################################
 ###         BEAKER CACHE        ####
 ####################################
 beaker.cache.data_dir=/%(here)s/data/cache/data
@@ -64,7 +91,7 @@
 beaker.cache.long_term.expire=36000
 
 beaker.cache.sql_cache_short.type=memory
-beaker.cache.sql_cache_short.expire=5
+beaker.cache.sql_cache_short.expire=10
 
 beaker.cache.sql_cache_med.type=memory
 beaker.cache.sql_cache_med.expire=360
--- a/rhodecode/lib/celerylib/__init__.py	Fri Nov 26 23:49:19 2010 +0100
+++ b/rhodecode/lib/celerylib/__init__.py	Sat Nov 27 01:27:24 2010 +0100
@@ -1,37 +1,47 @@
+import os
+import sys
+import socket
+import traceback
+import logging
+
 from rhodecode.lib.pidlock import DaemonLock, LockHeld
 from vcs.utils.lazy import LazyProperty
 from decorator import decorator
-import logging
-import os
-import sys
-import traceback
 from hashlib import md5
-import socket
+from pylons import  config
+
 log = logging.getLogger(__name__)
 
+def str2bool(v):
+    return v.lower() in ["yes", "true", "t", "1"] if v else None
+
+CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
+
 class ResultWrapper(object):
     def __init__(self, task):
         self.task = task
-        
+
     @LazyProperty
     def result(self):
         return self.task
 
 def run_task(task, *args, **kwargs):
-    try:
-        t = task.delay(*args, **kwargs)
-        log.info('running task %s', t.task_id)
-        return t
-    except socket.error, e:
-        if  e.errno == 111:
-            log.debug('Unable to connect to celeryd. Sync execution')
-        else:
-            log.error(traceback.format_exc())    
-    except KeyError, e:
-            log.debug('Unable to connect to celeryd. Sync execution')
-    except Exception, e:
-        log.error(traceback.format_exc())
-    
+    if CELERY_ON:
+        try:
+            t = task.delay(*args, **kwargs)
+            log.info('running task %s:%s', t.task_id, task)
+            return t
+        except socket.error, e:
+            if  e.errno == 111:
+                log.debug('Unable to connect to celeryd. Sync execution')
+            else:
+                log.error(traceback.format_exc())
+        except KeyError, e:
+                log.debug('Unable to connect to celeryd. Sync execution')
+        except Exception, e:
+            log.error(traceback.format_exc())
+
+    log.debug('executing task %s in sync mode', task)
     return ResultWrapper(task(*args, **kwargs))
 
 
@@ -39,7 +49,7 @@
     def __wrapper(func, *fargs, **fkwargs):
         params = list(fargs)
         params.extend(['%s-%s' % ar for ar in fkwargs.items()])
-            
+
         lockkey = 'task_%s' % \
             md5(str(func.__name__) + '-' + \
                 '-'.join(map(str, params))).hexdigest()
@@ -51,14 +61,14 @@
             return ret
         except LockHeld:
             log.info('LockHeld')
-            return 'Task with key %s already running' % lockkey   
+            return 'Task with key %s already running' % lockkey
 
-    return decorator(__wrapper, func)      
-            
+    return decorator(__wrapper, func)
+
+
 
-        
-        
-    
-    
-    
-  
+
+
+
+
+
--- a/rhodecode/lib/celerylib/tasks.py	Fri Nov 26 23:49:19 2010 +0100
+++ b/rhodecode/lib/celerylib/tasks.py	Sat Nov 27 01:27:24 2010 +0100
@@ -2,16 +2,24 @@
 
 import os
 import traceback
+import beaker
 from time import mktime
-
 from operator import itemgetter
+
+from pylons import config
 from pylons.i18n.translation import _
-from rhodecode.lib.celerylib import run_task, locked_task
+
+from rhodecode.lib.celerylib import run_task, locked_task, str2bool
 from rhodecode.lib.helpers import person
 from rhodecode.lib.smtp_mailer import SmtpMailer
 from rhodecode.lib.utils import OrderedDict
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import RhodeCodeUi
+
 from vcs.backends import get_repo
-from rhodecode.model.db import RhodeCodeUi
+
+from sqlalchemy import engine_from_config
 
 try:
     import json
@@ -19,31 +27,16 @@
     #python 2.5 compatibility
     import simplejson as json
 
-try:
-    from celeryconfig import PYLONS_CONFIG as config
-    celery_on = True
-except ImportError:
-    #if celeryconfig is not present let's just load our pylons
-    #config instead
-    from pylons import config
-    celery_on = False
-
-
 __all__ = ['whoosh_index', 'get_commits_stats',
            'reset_user_password', 'send_email']
 
+CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
+
 def get_session():
-    if celery_on:
-        from sqlalchemy import engine_from_config
-        from sqlalchemy.orm import sessionmaker, scoped_session
-        engine = engine_from_config(dict(config.items('app:main')),
-                                    'sqlalchemy.db1.')
-        sa = scoped_session(sessionmaker(bind=engine))
-    else:
-        #If we don't use celery reuse our current application Session
-        from rhodecode.model.meta import Session
-        sa = Session()
-
+    if CELERY_ON:
+        engine = engine_from_config(config, 'sqlalchemy.db1.')
+        init_model(engine)
+    sa = meta.Session()
     return sa
 
 def get_repos_path():
@@ -56,7 +49,7 @@
 def whoosh_index(repo_location, full_index):
     log = whoosh_index.get_logger()
     from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
-    index_location = dict(config.items('app:main'))['index_dir']
+    index_location = config['index_dir']
     WhooshIndexingDaemon(index_location=index_location,
                          repo_location=repo_location).run(full_index=full_index)
 
@@ -235,6 +228,7 @@
     except:
         log.error('Failed to update user password')
         log.error(traceback.format_exc())
+
     return True
 
 @task
@@ -249,14 +243,11 @@
     :param body: body of the mail
     """
     log = send_email.get_logger()
-    email_config = dict(config.items('DEFAULT'))
+    email_config = config
 
     if not recipients:
         recipients = [email_config.get('email_to')]
 
-    def str2bool(v):
-        return v.lower() in ["yes", "true", "t", "1"] if v else None
-
     mail_from = email_config.get('app_email_from')
     user = email_config.get('smtp_username')
     passwd = email_config.get('smtp_password')
@@ -293,12 +284,58 @@
     backend(str(repo_fork_path), create=True, src_url=str(repo_path))
 
 def __get_codes_stats(repo_name):
-    LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx',
-    'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el',
-    'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml',
-    'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst',
-    's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws']
-
+    LANGUAGES_EXTENSIONS_MAP = {'scm': 'Scheme', 'asmx': 'VbNetAspx', 'Rout':
+    'RConsole', 'rest': 'Rst', 'abap': 'ABAP', 'go': 'Go', 'phtml': 'HtmlPhp',
+    'ns2': 'Newspeak', 'xml': 'EvoqueXml', 'sh-session': 'BashSession', 'ads':
+    'Ada', 'clj': 'Clojure', 'll': 'Llvm', 'ebuild': 'Bash', 'adb': 'Ada',
+    'ada': 'Ada', 'c++-objdump': 'CppObjdump', 'aspx':
+    'VbNetAspx', 'ksh': 'Bash', 'coffee': 'CoffeeScript', 'vert': 'GLShader',
+    'Makefile.*': 'Makefile', 'di': 'D', 'dpatch': 'DarcsPatch', 'rake':
+    'Ruby', 'moo': 'MOOCode', 'erl-sh': 'ErlangShell', 'geo': 'GLShader',
+    'pov': 'Povray', 'bas': 'VbNet', 'bat': 'Batch', 'd': 'D', 'lisp':
+    'CommonLisp', 'h': 'C', 'rbx': 'Ruby', 'tcl': 'Tcl', 'c++': 'Cpp', 'md':
+    'MiniD', '.vimrc': 'Vim', 'xsd': 'Xml', 'ml': 'Ocaml', 'el': 'CommonLisp',
+    'befunge': 'Befunge', 'xsl': 'Xslt', 'pyx': 'Cython', 'cfm':
+    'ColdfusionHtml', 'evoque': 'Evoque', 'cfg': 'Ini', 'htm': 'Html',
+    'Makefile': 'Makefile', 'cfc': 'ColdfusionHtml', 'tex': 'Tex', 'cs':
+    'CSharp', 'mxml': 'Mxml', 'patch': 'Diff', 'apache.conf': 'ApacheConf',
+    'scala': 'Scala', 'applescript': 'AppleScript', 'GNUmakefile': 'Makefile',
+    'c-objdump': 'CObjdump', 'lua': 'Lua', 'apache2.conf': 'ApacheConf', 'rb':
+    'Ruby', 'gemspec': 'Ruby', 'rl': 'RagelObjectiveC', 'vala': 'Vala', 'tmpl':
+    'Cheetah', 'bf': 'Brainfuck', 'plt': 'Gnuplot', 'G': 'AntlrRuby', 'xslt':
+    'Xslt', 'flxh': 'Felix', 'asax': 'VbNetAspx', 'Rakefile': 'Ruby', 'S': 'S',
+    'wsdl': 'Xml', 'js': 'Javascript', 'autodelegate': 'Myghty', 'properties':
+    'Ini', 'bash': 'Bash', 'c': 'C', 'g': 'AntlrRuby', 'r3': 'Rebol', 's':
+    'Gas', 'ashx': 'VbNetAspx', 'cxx': 'Cpp', 'boo': 'Boo', 'prolog': 'Prolog',
+    'sqlite3-console': 'SqliteConsole', 'cl': 'CommonLisp', 'cc': 'Cpp', 'pot':
+    'Gettext', 'vim': 'Vim', 'pxi': 'Cython', 'yaml': 'Yaml', 'SConstruct':
+    'Python', 'diff': 'Diff', 'txt': 'Text', 'cw': 'Redcode', 'pxd': 'Cython',
+    'plot': 'Gnuplot', 'java': 'Java', 'hrl': 'Erlang', 'py': 'Python',
+    'makefile': 'Makefile', 'squid.conf': 'SquidConf', 'asm': 'Nasm', 'toc':
+    'Tex', 'kid': 'Genshi', 'rhtml': 'Rhtml', 'po': 'Gettext', 'pl': 'Prolog',
+    'pm': 'Perl', 'hx': 'Haxe', 'ascx': 'VbNetAspx', 'ooc': 'Ooc', 'asy':
+    'Asymptote', 'hs': 'Haskell', 'SConscript': 'Python', 'pytb':
+    'PythonTraceback', 'myt': 'Myghty', 'hh': 'Cpp', 'R': 'S', 'aux': 'Tex',
+    'rst': 'Rst', 'cpp-objdump': 'CppObjdump', 'lgt': 'Logtalk', 'rss': 'Xml',
+    'flx': 'Felix', 'b': 'Brainfuck', 'f': 'Fortran', 'rbw': 'Ruby',
+    '.htaccess': 'ApacheConf', 'cxx-objdump': 'CppObjdump', 'j': 'ObjectiveJ',
+    'mll': 'Ocaml', 'yml': 'Yaml', 'mu': 'MuPAD', 'r': 'Rebol', 'ASM': 'Nasm',
+    'erl': 'Erlang', 'mly': 'Ocaml', 'mo': 'Modelica', 'def': 'Modula2', 'ini':
+    'Ini', 'control': 'DebianControl', 'vb': 'VbNet', 'vapi': 'Vala', 'pro':
+    'Prolog', 'spt': 'Cheetah', 'mli': 'Ocaml', 'as': 'ActionScript3', 'cmd':
+    'Batch', 'cpp': 'Cpp', 'io': 'Io', 'tac': 'Python', 'haml': 'Haml', 'rkt':
+    'Racket', 'st':'Smalltalk', 'inc': 'Povray', 'pas': 'Delphi', 'cmake':
+    'CMake', 'csh':'Tcsh', 'hpp': 'Cpp', 'feature': 'Gherkin', 'html': 'Html',
+    'php':'Php', 'php3':'Php', 'php4':'Php', 'php5':'Php', 'xhtml': 'Html',
+    'hxx': 'Cpp', 'eclass': 'Bash', 'css': 'Css',
+    'frag': 'GLShader', 'd-objdump': 'DObjdump', 'weechatlog': 'IrcLogs',
+    'tcsh': 'Tcsh', 'objdump': 'Objdump', 'pyw': 'Python', 'h++': 'Cpp',
+    'py3tb': 'Python3Traceback', 'jsp': 'Jsp', 'sql': 'Sql', 'mak': 'Makefile',
+    'php': 'Php', 'mao': 'Mako', 'man': 'Groff', 'dylan': 'Dylan', 'sass':
+    'Sass', 'cfml': 'ColdfusionHtml', 'darcspatch': 'DarcsPatch', 'tpl':
+    'Smarty', 'm': 'ObjectiveC', 'f90': 'Fortran', 'mod': 'Modula2', 'sh':
+    'Bash', 'lhs': 'LiterateHaskell', 'sources.list': 'SourcesList', 'axd':
+    'VbNetAspx', 'sc': 'Python'}
 
     repos_path = get_repos_path()
     p = os.path.join(repos_path, repo_name)
@@ -308,12 +345,14 @@
 
     def aggregate(cs):
         for f in cs[2]:
-            k = f.mimetype
-            if f.extension in LANGUAGES_EXTENSIONS:
-                if code_stats.has_key(k):
-                    code_stats[k] += 1
+            ext = f.extension
+            key = LANGUAGES_EXTENSIONS_MAP.get(ext, ext)
+            key = key or ext
+            if ext in LANGUAGES_EXTENSIONS_MAP.keys():
+                if code_stats.has_key(key):
+                    code_stats[key] += 1
                 else:
-                    code_stats[k] = 1
+                    code_stats[key] = 1
 
     map(aggregate, tip.walk('/'))
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/celerypylons/__init__.py	Sat Nov 27 01:27:24 2010 +0100
@@ -0,0 +1,16 @@
+"""
+Automatically sets the environment variable `CELERY_LOADER` to
+`celerypylons.loader:PylonsLoader`.  This ensures the loader is
+specified when accessing the rest of this package, and allows celery
+to be installed in a webapp just by importing celerypylons::
+
+    import celerypylons
+
+"""
+import os
+import warnings
+
+CELERYPYLONS_LOADER = 'rhodecode.lib.celerypylons.loader.PylonsLoader'
+if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
+    warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
+os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/celerypylons/commands.py	Sat Nov 27 01:27:24 2010 +0100
@@ -0,0 +1,143 @@
+import os
+from paste.script.command import Command, BadCommand
+import paste.deploy
+from pylons import config
+
+
+__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
+           'CAMQPAdminCommand', 'CeleryEventCommand']
+
+
+class CeleryCommand(Command):
+    """
+    Abstract Base Class for celery commands.
+
+    The celery commands are somewhat aggressive about loading
+    celery.conf, and since our module sets the `CELERY_LOADER`
+    environment variable to our loader, we have to bootstrap a bit and
+    make sure we've had a chance to load the pylons config off of the
+    command line, otherwise everything fails.
+    """
+    min_args = 1
+    min_args_error = "Please provide a paster config file as an argument."
+    takes_config_file = 1
+    requires_config_file = True
+
+    def run(self, args):
+        """
+        Overrides Command.run
+        
+        Checks for a config file argument and loads it.
+        """
+        if len(args) < self.min_args:
+            raise BadCommand(
+                self.min_args_error % {'min_args': self.min_args,
+                                       'actual_args': len(args)})
+        # Decrement because we're going to lob off the first argument.
+        # @@ This is hacky
+        self.min_args -= 1
+        self.bootstrap_config(args[0])
+        self.update_parser()
+        return super(CeleryCommand, self).run(args[1:])
+
+    def update_parser(self):
+        """
+        Abstract method.  Allows for the class's parser to be updated
+        before the superclass's `run` method is called.  Necessary to
+        allow options/arguments to be passed through to the underlying
+        celery command.
+        """
+        raise NotImplementedError("Abstract Method.")
+
+    def bootstrap_config(self, conf):
+        """
+        Loads the pylons configuration.
+        """
+        path_to_ini_file = os.path.realpath(conf)
+        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
+        config.init_app(conf.global_conf, conf.local_conf)
+
+
+class CeleryDaemonCommand(CeleryCommand):
+    """Start the celery worker
+
+    Starts the celery worker that uses a paste.deploy configuration
+    file.
+    """
+    usage = 'CONFIG_FILE [celeryd options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celeryd
+        for x in celeryd.WorkerCommand().get_options():
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celeryd
+        return celeryd.WorkerCommand().run(**vars(self.options))
+
+
+class CeleryBeatCommand(CeleryCommand):
+    """Start the celery beat server
+
+    Starts the celery beat server using a paste.deploy configuration
+    file.
+    """
+    usage = 'CONFIG_FILE [celerybeat options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celerybeat
+        for x in celerybeat.BeatCommand().get_options():
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celerybeat
+        return celerybeat.BeatCommand(**vars(self.options))
+
+class CAMQPAdminCommand(CeleryCommand):
+    """CAMQP Admin
+
+    CAMQP celery admin tool.
+    """
+    usage = 'CONFIG_FILE [camqadm options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import camqadm
+        for x in camqadm.OPTION_LIST:
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import camqadm
+        return camqadm.camqadm(*self.args, **vars(self.options))
+
+
+class CeleryEventCommand(CeleryCommand):
+    """Celery event commandd.
+
+    Capture celery events.
+    """
+    usage = 'CONFIG_FILE [celeryev options...]'
+    summary = __doc__.splitlines()[0]
+    description = "".join(__doc__.splitlines()[2:])
+
+    parser = Command.standard_parser(quiet=True)
+
+    def update_parser(self):
+        from celery.bin import celeryev
+        for x in celeryev.OPTION_LIST:
+            self.parser.add_option(x)
+
+    def command(self):
+        from celery.bin import celeryev
+        return celeryev.run_celeryev(**vars(self.options))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/celerypylons/loader.py	Sat Nov 27 01:27:24 2010 +0100
@@ -0,0 +1,55 @@
+from celery.loaders.base import BaseLoader
+from pylons import config
+
+to_pylons = lambda x: x.replace('_', '.').lower()
+to_celery = lambda x: x.replace('.', '_').upper()
+
+LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
+
+
+class PylonsSettingsProxy(object):
+    """Pylons Settings Proxy
+
+    Proxies settings from pylons.config
+
+    """
+    def __getattr__(self, key):
+        pylons_key = to_pylons(key)
+        try:
+            value = config[pylons_key]
+            if key in LIST_PARAMS: return value.split()
+            return self.type_converter(value)
+        except KeyError:
+            raise AttributeError(pylons_key)
+
+    def __setattr__(self, key, value):
+        pylons_key = to_pylons(key)
+        config[pylons_key] = value
+
+
+    def type_converter(self, value):
+        #cast to int
+        if value.isdigit():
+            return int(value)
+
+        #cast to bool
+        if value.lower() in ['true', 'false']:
+            return value.lower() == 'true'
+
+        return value
+
+class PylonsLoader(BaseLoader):
+    """Pylons celery loader
+
+    Maps the celery config onto pylons.config
+
+    """
+    def read_configuration(self):
+        self.configured = True
+        return PylonsSettingsProxy()
+
+    def on_worker_init(self):
+        """
+        Import task modules.
+        """
+        self.import_default_modules()
--- a/test.ini	Fri Nov 26 23:49:19 2010 +0100
+++ b/test.ini	Sat Nov 27 01:27:24 2010 +0100
@@ -46,6 +46,33 @@
 index_dir = /tmp/index
 
 ####################################
+###        CELERY CONFIG        ####
+####################################
+use_celery = false
+broker.host = localhost
+broker.vhost = rabbitmqhost
+broker.port = 5672
+broker.user = rabbitmq
+broker.password = qweqwe
+
+celery.imports = rhodecode.lib.celerylib.tasks
+
+celery.result.backend = amqp
+celery.result.dburi = amqp://
+celery.result.serialier = json
+
+#celery.send.task.error.emails = true
+#celery.amqp.task.result.expires = 18000
+
+celeryd.concurrency = 2
+#celeryd.log.file = celeryd.log
+celeryd.log.level = debug
+celeryd.max.tasks.per.child = 3
+
+#tasks will never be sent to the queue, but executed locally instead.
+celery.always.eager = false
+
+####################################
 ###         BEAKER CACHE        ####
 ####################################
 beaker.cache.data_dir=/%(here)s/data/cache/data