diff pylons_app/lib/celerylib/__init__.py @ 497:fb0c3af6031b celery

Implemented locking for task, to prevent for running the same tasks, moved out pidlock library. Added dirsize display
author Marcin Kuzminski <marcin@python-works.com>
date Thu, 23 Sep 2010 01:08:33 +0200
parents b12ea84fb906
children ac32a026c306
line wrap: on
line diff
--- a/pylons_app/lib/celerylib/__init__.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/celerylib/__init__.py	Thu Sep 23 01:08:33 2010 +0200
@@ -1,9 +1,11 @@
+from pylons_app.lib.pidlock import DaemonLock, LockHeld
 from vcs.utils.lazy import LazyProperty
+from decorator import decorator
 import logging
 import os
 import sys
 import traceback
-
+from hashlib import md5
 log = logging.getLogger(__name__)
 
 class ResultWrapper(object):
@@ -20,10 +22,45 @@
         log.info('running task %s', t.task_id)
         return t
     except Exception, e:
+        print e
         if e.errno == 111:
             log.debug('Unnable to connect. Sync execution')
         else:
             log.error(traceback.format_exc())
         #pure sync version
         return ResultWrapper(task(*args, **kwargs))
+
+
+class LockTask(object):
+    """LockTask decorator"""
     
+    def __init__(self, func):
+        self.func = func
+        
+    def __call__(self, func):
+        return decorator(self.__wrapper, func)
+    
+    def __wrapper(self, func, *fargs, **fkwargs):
+        params = []
+        params.extend(fargs)
+        params.extend(fkwargs.values())
+        lockkey = 'task_%s' % \
+           md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
+        log.info('running task with lockkey %s', lockkey)
+        try:
+            l = DaemonLock(lockkey)
+            return func(*fargs, **fkwargs)
+            l.release()
+        except LockHeld:
+            log.info('LockHeld')
+            return 'Task with key %s already running' % lockkey   
+
+            
+            
+
+        
+        
+    
+    
+    
+