Mercurial > kallithea
comparison rhodecode/lib/celerylib/__init__.py @ 776:f6c613fba757 beta
Celery is configured by the .ini files and run from paster now
removed celeryconfig, added homebrew celery-pylons,
added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Sat, 27 Nov 2010 01:27:24 +0100 |
parents | 14559eb34003 |
children | 71113f64b2d8 |
comparison
equal
deleted
inserted
replaced
775:aaf2fc59a39a | 776:f6c613fba757 |
---|---|
1 import os | |
2 import sys | |
3 import socket | |
4 import traceback | |
5 import logging | |
6 | |
1 from rhodecode.lib.pidlock import DaemonLock, LockHeld | 7 from rhodecode.lib.pidlock import DaemonLock, LockHeld |
2 from vcs.utils.lazy import LazyProperty | 8 from vcs.utils.lazy import LazyProperty |
3 from decorator import decorator | 9 from decorator import decorator |
4 import logging | |
5 import os | |
6 import sys | |
7 import traceback | |
8 from hashlib import md5 | 10 from hashlib import md5 |
9 import socket | 11 from pylons import config |
12 | |
10 log = logging.getLogger(__name__) | 13 log = logging.getLogger(__name__) |
14 | |
15 def str2bool(v): | |
16 return v.lower() in ["yes", "true", "t", "1"] if v else None | |
17 | |
18 CELERY_ON = str2bool(config['app_conf'].get('use_celery')) | |
11 | 19 |
12 class ResultWrapper(object): | 20 class ResultWrapper(object): |
13 def __init__(self, task): | 21 def __init__(self, task): |
14 self.task = task | 22 self.task = task |
15 | 23 |
16 @LazyProperty | 24 @LazyProperty |
17 def result(self): | 25 def result(self): |
18 return self.task | 26 return self.task |
19 | 27 |
20 def run_task(task, *args, **kwargs): | 28 def run_task(task, *args, **kwargs): |
21 try: | 29 if CELERY_ON: |
22 t = task.delay(*args, **kwargs) | 30 try: |
23 log.info('running task %s', t.task_id) | 31 t = task.delay(*args, **kwargs) |
24 return t | 32 log.info('running task %s:%s', t.task_id, task) |
25 except socket.error, e: | 33 return t |
26 if e.errno == 111: | 34 except socket.error, e: |
27 log.debug('Unable to connect to celeryd. Sync execution') | 35 if e.errno == 111: |
28 else: | 36 log.debug('Unable to connect to celeryd. Sync execution') |
29 log.error(traceback.format_exc()) | 37 else: |
30 except KeyError, e: | 38 log.error(traceback.format_exc()) |
31 log.debug('Unable to connect to celeryd. Sync execution') | 39 except KeyError, e: |
32 except Exception, e: | 40 log.debug('Unable to connect to celeryd. Sync execution') |
33 log.error(traceback.format_exc()) | 41 except Exception, e: |
34 | 42 log.error(traceback.format_exc()) |
43 | |
44 log.debug('executing task %s in sync mode', task) | |
35 return ResultWrapper(task(*args, **kwargs)) | 45 return ResultWrapper(task(*args, **kwargs)) |
36 | 46 |
37 | 47 |
38 def locked_task(func): | 48 def locked_task(func): |
39 def __wrapper(func, *fargs, **fkwargs): | 49 def __wrapper(func, *fargs, **fkwargs): |
40 params = list(fargs) | 50 params = list(fargs) |
41 params.extend(['%s-%s' % ar for ar in fkwargs.items()]) | 51 params.extend(['%s-%s' % ar for ar in fkwargs.items()]) |
42 | 52 |
43 lockkey = 'task_%s' % \ | 53 lockkey = 'task_%s' % \ |
44 md5(str(func.__name__) + '-' + \ | 54 md5(str(func.__name__) + '-' + \ |
45 '-'.join(map(str, params))).hexdigest() | 55 '-'.join(map(str, params))).hexdigest() |
46 log.info('running task with lockkey %s', lockkey) | 56 log.info('running task with lockkey %s', lockkey) |
47 try: | 57 try: |
49 ret = func(*fargs, **fkwargs) | 59 ret = func(*fargs, **fkwargs) |
50 l.release() | 60 l.release() |
51 return ret | 61 return ret |
52 except LockHeld: | 62 except LockHeld: |
53 log.info('LockHeld') | 63 log.info('LockHeld') |
54 return 'Task with key %s already running' % lockkey | 64 return 'Task with key %s already running' % lockkey |
55 | 65 |
56 return decorator(__wrapper, func) | 66 return decorator(__wrapper, func) |
57 | |
58 | 67 |
59 | 68 |
60 | 69 |
61 | 70 |
62 | 71 |
63 | 72 |
64 | 73 |
74 |