comparison rhodecode/lib/celerylib/__init__.py @ 1003:9037456bb17f beta

Another better solution for establishing connection with messaging broker in celery. This one doesn't require celeryconfig.py
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 08 Feb 2011 03:45:22 +0100
parents 110a00c181de
children 7fd45bf17d07
comparison
equal deleted inserted replaced
1002:3a7f5b1a19dd 1003:9037456bb17f
36 from vcs.utils.lazy import LazyProperty 36 from vcs.utils.lazy import LazyProperty
37 37
38 from rhodecode.lib import str2bool 38 from rhodecode.lib import str2bool
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld 39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 40
41 from celery.messaging import establish_connection
41 from pylons import config 42 from pylons import config
42 43
43 log = logging.getLogger(__name__) 44 log = logging.getLogger(__name__)
44 45
45 try: 46 try:
56 return self.task 57 return self.task
57 58
58 def run_task(task, *args, **kwargs): 59 def run_task(task, *args, **kwargs):
59 if CELERY_ON: 60 if CELERY_ON:
60 try: 61 try:
61 t = task.delay(*args, **kwargs) 62 kw = {
63 'hostname':config['app_conf'].get('broker.host'),
64 'userid':config['app_conf'].get('broker.user'),
65 'password':config['app_conf'].get('broker.password'),
66 'virtual_host':config['app_conf'].get('broker.vhost'),
67 'port':config['app_conf'].get('broker.port'),
68 }
69 conn = establish_connection(**kw)
70 publisher = task.get_publisher(connection=conn)
71 t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher)
72
62 log.info('running task %s:%s', t.task_id, task) 73 log.info('running task %s:%s', t.task_id, task)
63 return t 74 return t
64 except socket.error, e: 75 except socket.error, e:
65 if e.errno == 111: 76 if e.errno == 111:
66 log.debug('Unable to connect to celeryd. Sync execution') 77 log.debug('Unable to connect to celeryd. Sync execution')