Mercurial > kallithea
comparison rhodecode/lib/celerylib/__init__.py @ 1003:9037456bb17f beta
Another better solution for establishing connection with messaging broker in celery.
This one doesn't require celeryconfig.py
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Tue, 08 Feb 2011 03:45:22 +0100 |
parents | 110a00c181de |
children | 7fd45bf17d07 |
comparison
equal
deleted
inserted
replaced
1002:3a7f5b1a19dd | 1003:9037456bb17f |
---|---|
36 from vcs.utils.lazy import LazyProperty | 36 from vcs.utils.lazy import LazyProperty |
37 | 37 |
38 from rhodecode.lib import str2bool | 38 from rhodecode.lib import str2bool |
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld | 39 from rhodecode.lib.pidlock import DaemonLock, LockHeld |
40 | 40 |
41 from celery.messaging import establish_connection | |
41 from pylons import config | 42 from pylons import config |
42 | 43 |
43 log = logging.getLogger(__name__) | 44 log = logging.getLogger(__name__) |
44 | 45 |
45 try: | 46 try: |
56 return self.task | 57 return self.task |
57 | 58 |
58 def run_task(task, *args, **kwargs): | 59 def run_task(task, *args, **kwargs): |
59 if CELERY_ON: | 60 if CELERY_ON: |
60 try: | 61 try: |
61 t = task.delay(*args, **kwargs) | 62 kw = { |
63 'hostname':config['app_conf'].get('broker.host'), | |
64 'userid':config['app_conf'].get('broker.user'), | |
65 'password':config['app_conf'].get('broker.password'), | |
66 'virtual_host':config['app_conf'].get('broker.vhost'), | |
67 'port':config['app_conf'].get('broker.port'), | |
68 } | |
69 conn = establish_connection(**kw) | |
70 publisher = task.get_publisher(connection=conn) | |
71 t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher) | |
72 | |
62 log.info('running task %s:%s', t.task_id, task) | 73 log.info('running task %s:%s', t.task_id, task) |
63 return t | 74 return t |
64 except socket.error, e: | 75 except socket.error, e: |
65 if e.errno == 111: | 76 if e.errno == 111: |
66 log.debug('Unable to connect to celeryd. Sync execution') | 77 log.debug('Unable to connect to celeryd. Sync execution') |