comparison pylons_app/lib/celerylib/__init__.py @ 497:fb0c3af6031b celery

Implemented locking for task, to prevent for running the same tasks, moved out pidlock library. Added dirsize display
author Marcin Kuzminski <marcin@python-works.com>
date Thu, 23 Sep 2010 01:08:33 +0200
parents b12ea84fb906
children ac32a026c306
comparison
equal deleted inserted replaced
496:47f4c7ff245b 497:fb0c3af6031b
1 from pylons_app.lib.pidlock import DaemonLock, LockHeld
1 from vcs.utils.lazy import LazyProperty 2 from vcs.utils.lazy import LazyProperty
3 from decorator import decorator
2 import logging 4 import logging
3 import os 5 import os
4 import sys 6 import sys
5 import traceback 7 import traceback
6 8 from hashlib import md5
7 log = logging.getLogger(__name__) 9 log = logging.getLogger(__name__)
8 10
9 class ResultWrapper(object): 11 class ResultWrapper(object):
10 def __init__(self, task): 12 def __init__(self, task):
11 self.task = task 13 self.task = task
18 try: 20 try:
19 t = task.delay(*args, **kwargs) 21 t = task.delay(*args, **kwargs)
20 log.info('running task %s', t.task_id) 22 log.info('running task %s', t.task_id)
21 return t 23 return t
22 except Exception, e: 24 except Exception, e:
25 print e
23 if e.errno == 111: 26 if e.errno == 111:
24 log.debug('Unnable to connect. Sync execution') 27 log.debug('Unnable to connect. Sync execution')
25 else: 28 else:
26 log.error(traceback.format_exc()) 29 log.error(traceback.format_exc())
27 #pure sync version 30 #pure sync version
28 return ResultWrapper(task(*args, **kwargs)) 31 return ResultWrapper(task(*args, **kwargs))
32
33
34 class LockTask(object):
35 """LockTask decorator"""
29 36
37 def __init__(self, func):
38 self.func = func
39
40 def __call__(self, func):
41 return decorator(self.__wrapper, func)
42
43 def __wrapper(self, func, *fargs, **fkwargs):
44 params = []
45 params.extend(fargs)
46 params.extend(fkwargs.values())
47 lockkey = 'task_%s' % \
48 md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
49 log.info('running task with lockkey %s', lockkey)
50 try:
51 l = DaemonLock(lockkey)
52 return func(*fargs, **fkwargs)
53 l.release()
54 except LockHeld:
55 log.info('LockHeld')
56 return 'Task with key %s already running' % lockkey
57
58
59
60
61
62
63
64
65
66