comparison rhodecode/lib/celerylib/__init__.py @ 2031:82a88013a3fd

merge 1.3 into stable
author Marcin Kuzminski <marcin@python-works.com>
date Sun, 26 Feb 2012 17:25:09 +0200
parents afe8cfa32a0f 324ac367a4da
children dc2584ba5fbc
comparison
equal deleted inserted replaced
2005:ab0e122b38a7 2031:82a88013a3fd
32 from pylons import config 32 from pylons import config
33 33
34 from hashlib import md5 34 from hashlib import md5
35 from decorator import decorator 35 from decorator import decorator
36 36
37 from vcs.utils.lazy import LazyProperty 37 from rhodecode.lib.vcs.utils.lazy import LazyProperty
38 38 from rhodecode import CELERY_ON
39 from rhodecode.lib import str2bool, safe_str 39 from rhodecode.lib import str2bool, safe_str
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld 40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model 41 from rhodecode.model import init_model
42 from rhodecode.model import meta 42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User 43 from rhodecode.model.db import Statistics, Repository, User
45 from sqlalchemy import engine_from_config 45 from sqlalchemy import engine_from_config
46 46
47 from celery.messaging import establish_connection 47 from celery.messaging import establish_connection
48 48
49 log = logging.getLogger(__name__) 49 log = logging.getLogger(__name__)
50
51 try:
52 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
53 except KeyError:
54 CELERY_ON = False
55 50
56 51
57 class ResultWrapper(object): 52 class ResultWrapper(object):
58 def __init__(self, task): 53 def __init__(self, task):
59 self.task = task 54 self.task = task
65 60
66 def run_task(task, *args, **kwargs): 61 def run_task(task, *args, **kwargs):
67 if CELERY_ON: 62 if CELERY_ON:
68 try: 63 try:
69 t = task.apply_async(args=args, kwargs=kwargs) 64 t = task.apply_async(args=args, kwargs=kwargs)
70 log.info('running task %s:%s', t.task_id, task) 65 log.info('running task %s:%s' % (t.task_id, task))
71 return t 66 return t
72 67
73 except socket.error, e: 68 except socket.error, e:
74 if isinstance(e, IOError) and e.errno == 111: 69 if isinstance(e, IOError) and e.errno == 111:
75 log.debug('Unable to connect to celeryd. Sync execution') 70 log.debug('Unable to connect to celeryd. Sync execution')
78 except KeyError, e: 73 except KeyError, e:
79 log.debug('Unable to connect to celeryd. Sync execution') 74 log.debug('Unable to connect to celeryd. Sync execution')
80 except Exception, e: 75 except Exception, e:
81 log.error(traceback.format_exc()) 76 log.error(traceback.format_exc())
82 77
83 log.debug('executing task %s in sync mode', task) 78 log.debug('executing task %s in sync mode' % task)
84 return ResultWrapper(task(*args, **kwargs)) 79 return ResultWrapper(task(*args, **kwargs))
85 80
86 81
87 def __get_lockkey(func, *fargs, **fkwargs): 82 def __get_lockkey(func, *fargs, **fkwargs):
88 params = list(fargs) 83 params = list(fargs)
98 def locked_task(func): 93 def locked_task(func):
99 def __wrapper(func, *fargs, **fkwargs): 94 def __wrapper(func, *fargs, **fkwargs):
100 lockkey = __get_lockkey(func, *fargs, **fkwargs) 95 lockkey = __get_lockkey(func, *fargs, **fkwargs)
101 lockkey_path = config['here'] 96 lockkey_path = config['here']
102 97
103 log.info('running task with lockkey %s', lockkey) 98 log.info('running task with lockkey %s' % lockkey)
104 try: 99 try:
105 l = DaemonLock(file_=jn(lockkey_path, lockkey)) 100 l = DaemonLock(file_=jn(lockkey_path, lockkey))
106 ret = func(*fargs, **fkwargs) 101 ret = func(*fargs, **fkwargs)
107 l.release() 102 l.release()
108 return ret 103 return ret