comparison rhodecode/lib/celerylib/__init__.py @ 776:f6c613fba757 beta

Celery is configured by the .ini files and run from paster now removed celeryconfig, added homebrew celery-pylons, added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 27 Nov 2010 01:27:24 +0100
parents 14559eb34003
children 71113f64b2d8
comparison
equal deleted inserted replaced
775:aaf2fc59a39a 776:f6c613fba757
1 import os
2 import sys
3 import socket
4 import traceback
5 import logging
6
1 from rhodecode.lib.pidlock import DaemonLock, LockHeld 7 from rhodecode.lib.pidlock import DaemonLock, LockHeld
2 from vcs.utils.lazy import LazyProperty 8 from vcs.utils.lazy import LazyProperty
3 from decorator import decorator 9 from decorator import decorator
4 import logging
5 import os
6 import sys
7 import traceback
8 from hashlib import md5 10 from hashlib import md5
9 import socket 11 from pylons import config
12
10 log = logging.getLogger(__name__) 13 log = logging.getLogger(__name__)
14
15 def str2bool(v):
16 return v.lower() in ["yes", "true", "t", "1"] if v else None
17
18 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
11 19
12 class ResultWrapper(object): 20 class ResultWrapper(object):
13 def __init__(self, task): 21 def __init__(self, task):
14 self.task = task 22 self.task = task
15 23
16 @LazyProperty 24 @LazyProperty
17 def result(self): 25 def result(self):
18 return self.task 26 return self.task
19 27
20 def run_task(task, *args, **kwargs): 28 def run_task(task, *args, **kwargs):
21 try: 29 if CELERY_ON:
22 t = task.delay(*args, **kwargs) 30 try:
23 log.info('running task %s', t.task_id) 31 t = task.delay(*args, **kwargs)
24 return t 32 log.info('running task %s:%s', t.task_id, task)
25 except socket.error, e: 33 return t
26 if e.errno == 111: 34 except socket.error, e:
27 log.debug('Unable to connect to celeryd. Sync execution') 35 if e.errno == 111:
28 else: 36 log.debug('Unable to connect to celeryd. Sync execution')
29 log.error(traceback.format_exc()) 37 else:
30 except KeyError, e: 38 log.error(traceback.format_exc())
31 log.debug('Unable to connect to celeryd. Sync execution') 39 except KeyError, e:
32 except Exception, e: 40 log.debug('Unable to connect to celeryd. Sync execution')
33 log.error(traceback.format_exc()) 41 except Exception, e:
34 42 log.error(traceback.format_exc())
43
44 log.debug('executing task %s in sync mode', task)
35 return ResultWrapper(task(*args, **kwargs)) 45 return ResultWrapper(task(*args, **kwargs))
36 46
37 47
38 def locked_task(func): 48 def locked_task(func):
39 def __wrapper(func, *fargs, **fkwargs): 49 def __wrapper(func, *fargs, **fkwargs):
40 params = list(fargs) 50 params = list(fargs)
41 params.extend(['%s-%s' % ar for ar in fkwargs.items()]) 51 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
42 52
43 lockkey = 'task_%s' % \ 53 lockkey = 'task_%s' % \
44 md5(str(func.__name__) + '-' + \ 54 md5(str(func.__name__) + '-' + \
45 '-'.join(map(str, params))).hexdigest() 55 '-'.join(map(str, params))).hexdigest()
46 log.info('running task with lockkey %s', lockkey) 56 log.info('running task with lockkey %s', lockkey)
47 try: 57 try:
49 ret = func(*fargs, **fkwargs) 59 ret = func(*fargs, **fkwargs)
50 l.release() 60 l.release()
51 return ret 61 return ret
52 except LockHeld: 62 except LockHeld:
53 log.info('LockHeld') 63 log.info('LockHeld')
54 return 'Task with key %s already running' % lockkey 64 return 'Task with key %s already running' % lockkey
55 65
56 return decorator(__wrapper, func) 66 return decorator(__wrapper, func)
57
58 67
59 68
60 69
61 70
62 71
63 72
64 73
74