Mercurial > kallithea
comparison rhodecode/lib/celerylib/__init__.py @ 2031:82a88013a3fd
merge 1.3 into stable
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Sun, 26 Feb 2012 17:25:09 +0200 |
parents | afe8cfa32a0f 324ac367a4da |
children | dc2584ba5fbc |
comparison
equal
deleted
inserted
replaced
2005:ab0e122b38a7 | 2031:82a88013a3fd |
---|---|
32 from pylons import config | 32 from pylons import config |
33 | 33 |
34 from hashlib import md5 | 34 from hashlib import md5 |
35 from decorator import decorator | 35 from decorator import decorator |
36 | 36 |
37 from vcs.utils.lazy import LazyProperty | 37 from rhodecode.lib.vcs.utils.lazy import LazyProperty |
38 | 38 from rhodecode import CELERY_ON |
39 from rhodecode.lib import str2bool, safe_str | 39 from rhodecode.lib import str2bool, safe_str |
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld | 40 from rhodecode.lib.pidlock import DaemonLock, LockHeld |
41 from rhodecode.model import init_model | 41 from rhodecode.model import init_model |
42 from rhodecode.model import meta | 42 from rhodecode.model import meta |
43 from rhodecode.model.db import Statistics, Repository, User | 43 from rhodecode.model.db import Statistics, Repository, User |
45 from sqlalchemy import engine_from_config | 45 from sqlalchemy import engine_from_config |
46 | 46 |
47 from celery.messaging import establish_connection | 47 from celery.messaging import establish_connection |
48 | 48 |
49 log = logging.getLogger(__name__) | 49 log = logging.getLogger(__name__) |
50 | |
51 try: | |
52 CELERY_ON = str2bool(config['app_conf'].get('use_celery')) | |
53 except KeyError: | |
54 CELERY_ON = False | |
55 | 50 |
56 | 51 |
57 class ResultWrapper(object): | 52 class ResultWrapper(object): |
58 def __init__(self, task): | 53 def __init__(self, task): |
59 self.task = task | 54 self.task = task |
65 | 60 |
66 def run_task(task, *args, **kwargs): | 61 def run_task(task, *args, **kwargs): |
67 if CELERY_ON: | 62 if CELERY_ON: |
68 try: | 63 try: |
69 t = task.apply_async(args=args, kwargs=kwargs) | 64 t = task.apply_async(args=args, kwargs=kwargs) |
70 log.info('running task %s:%s', t.task_id, task) | 65 log.info('running task %s:%s' % (t.task_id, task)) |
71 return t | 66 return t |
72 | 67 |
73 except socket.error, e: | 68 except socket.error, e: |
74 if isinstance(e, IOError) and e.errno == 111: | 69 if isinstance(e, IOError) and e.errno == 111: |
75 log.debug('Unable to connect to celeryd. Sync execution') | 70 log.debug('Unable to connect to celeryd. Sync execution') |
78 except KeyError, e: | 73 except KeyError, e: |
79 log.debug('Unable to connect to celeryd. Sync execution') | 74 log.debug('Unable to connect to celeryd. Sync execution') |
80 except Exception, e: | 75 except Exception, e: |
81 log.error(traceback.format_exc()) | 76 log.error(traceback.format_exc()) |
82 | 77 |
83 log.debug('executing task %s in sync mode', task) | 78 log.debug('executing task %s in sync mode' % task) |
84 return ResultWrapper(task(*args, **kwargs)) | 79 return ResultWrapper(task(*args, **kwargs)) |
85 | 80 |
86 | 81 |
87 def __get_lockkey(func, *fargs, **fkwargs): | 82 def __get_lockkey(func, *fargs, **fkwargs): |
88 params = list(fargs) | 83 params = list(fargs) |
98 def locked_task(func): | 93 def locked_task(func): |
99 def __wrapper(func, *fargs, **fkwargs): | 94 def __wrapper(func, *fargs, **fkwargs): |
100 lockkey = __get_lockkey(func, *fargs, **fkwargs) | 95 lockkey = __get_lockkey(func, *fargs, **fkwargs) |
101 lockkey_path = config['here'] | 96 lockkey_path = config['here'] |
102 | 97 |
103 log.info('running task with lockkey %s', lockkey) | 98 log.info('running task with lockkey %s' % lockkey) |
104 try: | 99 try: |
105 l = DaemonLock(file_=jn(lockkey_path, lockkey)) | 100 l = DaemonLock(file_=jn(lockkey_path, lockkey)) |
106 ret = func(*fargs, **fkwargs) | 101 ret = func(*fargs, **fkwargs) |
107 l.release() | 102 l.release() |
108 return ret | 103 return ret |