Mercurial > kallithea
comparison pylons_app/lib/celerylib/__init__.py @ 497:fb0c3af6031b celery
Implemented locking for task, to prevent for running the same tasks,
moved out pidlock library. Added dirsize display
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Thu, 23 Sep 2010 01:08:33 +0200 |
parents | b12ea84fb906 |
children | ac32a026c306 |
comparison
equal
deleted
inserted
replaced
496:47f4c7ff245b | 497:fb0c3af6031b |
---|---|
1 from pylons_app.lib.pidlock import DaemonLock, LockHeld | |
1 from vcs.utils.lazy import LazyProperty | 2 from vcs.utils.lazy import LazyProperty |
3 from decorator import decorator | |
2 import logging | 4 import logging |
3 import os | 5 import os |
4 import sys | 6 import sys |
5 import traceback | 7 import traceback |
6 | 8 from hashlib import md5 |
7 log = logging.getLogger(__name__) | 9 log = logging.getLogger(__name__) |
8 | 10 |
9 class ResultWrapper(object): | 11 class ResultWrapper(object): |
10 def __init__(self, task): | 12 def __init__(self, task): |
11 self.task = task | 13 self.task = task |
18 try: | 20 try: |
19 t = task.delay(*args, **kwargs) | 21 t = task.delay(*args, **kwargs) |
20 log.info('running task %s', t.task_id) | 22 log.info('running task %s', t.task_id) |
21 return t | 23 return t |
22 except Exception, e: | 24 except Exception, e: |
25 print e | |
23 if e.errno == 111: | 26 if e.errno == 111: |
24 log.debug('Unnable to connect. Sync execution') | 27 log.debug('Unnable to connect. Sync execution') |
25 else: | 28 else: |
26 log.error(traceback.format_exc()) | 29 log.error(traceback.format_exc()) |
27 #pure sync version | 30 #pure sync version |
28 return ResultWrapper(task(*args, **kwargs)) | 31 return ResultWrapper(task(*args, **kwargs)) |
32 | |
33 | |
34 class LockTask(object): | |
35 """LockTask decorator""" | |
29 | 36 |
37 def __init__(self, func): | |
38 self.func = func | |
39 | |
40 def __call__(self, func): | |
41 return decorator(self.__wrapper, func) | |
42 | |
43 def __wrapper(self, func, *fargs, **fkwargs): | |
44 params = [] | |
45 params.extend(fargs) | |
46 params.extend(fkwargs.values()) | |
47 lockkey = 'task_%s' % \ | |
48 md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest() | |
49 log.info('running task with lockkey %s', lockkey) | |
50 try: | |
51 l = DaemonLock(lockkey) | |
52 return func(*fargs, **fkwargs) | |
53 l.release() | |
54 except LockHeld: | |
55 log.info('LockHeld') | |
56 return 'Task with key %s already running' % lockkey | |
57 | |
58 | |
59 | |
60 | |
61 | |
62 | |
63 | |
64 | |
65 | |
66 |