changeset 497:fb0c3af6031b celery

Implemented locking for task, to prevent for running the same tasks, moved out pidlock library. Added dirsize display
author Marcin Kuzminski <marcin@python-works.com>
date Thu, 23 Sep 2010 01:08:33 +0200
parents 47f4c7ff245b
children 6aa7db1c083a
files pylons_app/lib/celerylib/__init__.py pylons_app/lib/celerylib/tasks.py pylons_app/lib/indexers/__init__.py pylons_app/lib/indexers/daemon.py pylons_app/lib/indexers/pidlock.py pylons_app/lib/pidlock.py pylons_app/lib/utils.py pylons_app/templates/files/files_browser.html
diffstat 8 files changed, 190 insertions(+), 150 deletions(-) [+]
line wrap: on
line diff
--- a/pylons_app/lib/celerylib/__init__.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/celerylib/__init__.py	Thu Sep 23 01:08:33 2010 +0200
@@ -1,9 +1,11 @@
+from pylons_app.lib.pidlock import DaemonLock, LockHeld
 from vcs.utils.lazy import LazyProperty
+from decorator import decorator
 import logging
 import os
 import sys
 import traceback
-
+from hashlib import md5
 log = logging.getLogger(__name__)
 
 class ResultWrapper(object):
@@ -20,10 +22,45 @@
         log.info('running task %s', t.task_id)
         return t
     except Exception, e:
+        print e
         if e.errno == 111:
             log.debug('Unnable to connect. Sync execution')
         else:
             log.error(traceback.format_exc())
         #pure sync version
         return ResultWrapper(task(*args, **kwargs))
+
+
+class LockTask(object):
+    """LockTask decorator"""
     
+    def __init__(self, func):
+        self.func = func
+        
+    def __call__(self, func):
+        return decorator(self.__wrapper, func)
+    
+    def __wrapper(self, func, *fargs, **fkwargs):
+        params = []
+        params.extend(fargs)
+        params.extend(fkwargs.values())
+        lockkey = 'task_%s' % \
+           md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
+        log.info('running task with lockkey %s', lockkey)
+        try:
+            l = DaemonLock(lockkey)
+            return func(*fargs, **fkwargs)
+            l.release()
+        except LockHeld:
+            log.info('LockHeld')
+            return 'Task with key %s already running' % lockkey   
+
+            
+            
+
+        
+        
+    
+    
+    
+  
--- a/pylons_app/lib/celerylib/tasks.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/celerylib/tasks.py	Thu Sep 23 01:08:33 2010 +0200
@@ -2,7 +2,7 @@
 from celery.task.sets import subtask
 from celeryconfig import PYLONS_CONFIG as config
 from pylons.i18n.translation import _
-from pylons_app.lib.celerylib import run_task
+from pylons_app.lib.celerylib import run_task, LockTask
 from pylons_app.lib.helpers import person
 from pylons_app.lib.smtp_mailer import SmtpMailer
 from pylons_app.lib.utils import OrderedDict
@@ -68,7 +68,7 @@
 @task
 def whoosh_index(repo_location, full_index):
     log = whoosh_index.get_logger()
-    from pylons_app.lib.indexers import DaemonLock
+    from pylons_app.lib.pidlock import DaemonLock
     from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
     try:
         l = DaemonLock()
@@ -80,7 +80,9 @@
         log.info('LockHeld')
         return 'LockHeld'    
 
+
 @task
+@LockTask('get_commits_stats')
 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
     author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
         
@@ -92,7 +94,7 @@
     repo = MercurialRepository(repos_path + repo_name)
 
     skip_date_limit = True
-    parse_limit = 500 #limit for single task changeset parsing
+    parse_limit = 350 #limit for single task changeset parsing
     last_rev = 0
     last_cs = None
     timegetter = itemgetter('time')
@@ -205,7 +207,9 @@
         log.error(traceback.format_exc())
         sa.rollback()
         return False
-                        
+    
+    run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+                            
     return True
 
 @task
--- a/pylons_app/lib/indexers/__init__.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/indexers/__init__.py	Thu Sep 23 01:08:33 2010 +0200
@@ -1,5 +1,4 @@
 from os.path import dirname as dn, join as jn
-from pidlock import LockHeld, DaemonLock
 from pylons_app.config.environment import load_environment
 from pylons_app.model.hg_model import HgModel
 from shutil import rmtree
--- a/pylons_app/lib/indexers/daemon.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/indexers/daemon.py	Thu Sep 23 01:08:33 2010 +0200
@@ -32,7 +32,7 @@
 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
 sys.path.append(project_path)
 
-from pidlock import LockHeld, DaemonLock
+from pylons_app.lib.pidlock import LockHeld, DaemonLock
 from pylons_app.model.hg_model import HgModel
 from pylons_app.lib.helpers import safe_unicode
 from whoosh.index import create_in, open_dir
--- a/pylons_app/lib/indexers/pidlock.py	Wed Sep 22 16:26:49 2010 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,127 +0,0 @@
-import os, time
-import sys
-from warnings import warn
-
-class LockHeld(Exception):pass
-
-
-class DaemonLock(object):
-    '''daemon locking
-    USAGE:
-    try:
-        l = lock()
-        main()
-        l.release()
-    except LockHeld:
-        sys.exit(1)
-    '''
-
-    def __init__(self, file=None, callbackfn=None,
-                 desc='daemon lock', debug=False):
-
-        self.pidfile = file if file else os.path.join(os.path.dirname(__file__),
-                                                      'running.lock')
-        self.callbackfn = callbackfn
-        self.desc = desc
-        self.debug = debug
-        self.held = False
-        #run the lock automatically !
-        self.lock()
-
-    def __del__(self):
-        if self.held:
-
-#            warn("use lock.release instead of del lock",
-#                    category = DeprecationWarning,
-#                    stacklevel = 2)
-
-            # ensure the lock will be removed
-            self.release()
-
-
-    def lock(self):
-        '''
-        locking function, if lock is present it will raise LockHeld exception
-        '''
-        lockname = '%s' % (os.getpid())
-
-        self.trylock()
-        self.makelock(lockname, self.pidfile)
-        return True
-
-    def trylock(self):
-        running_pid = False
-        try:
-            pidfile = open(self.pidfile, "r")
-            pidfile.seek(0)
-            running_pid = pidfile.readline()
-            if self.debug:
-                print 'lock file present running_pid: %s, checking for execution'\
-                % running_pid
-            # Now we check the PID from lock file matches to the current
-            # process PID
-            if running_pid:
-                if os.path.exists("/proc/%s" % running_pid):
-                        print "You already have an instance of the program running"
-                        print "It is running as process %s" % running_pid
-                        raise LockHeld
-                else:
-                        print "Lock File is there but the program is not running"
-                        print "Removing lock file for the: %s" % running_pid
-                        self.release()
-        except IOError, e:
-            if e.errno != 2:
-                raise
-
-
-    def release(self):
-        '''
-        releases the pid by removing the pidfile
-        '''
-        if self.callbackfn:
-            #execute callback function on release
-            if self.debug:
-                print 'executing callback function %s' % self.callbackfn
-            self.callbackfn()
-        try:
-            if self.debug:
-                print 'removing pidfile %s' % self.pidfile
-            os.remove(self.pidfile)
-            self.held = False
-        except OSError, e:
-            if self.debug:
-                print 'removing pidfile failed %s' % e
-            pass
-
-    def makelock(self, lockname, pidfile):
-        '''
-        this function will make an actual lock
-        @param lockname: acctual pid of file
-        @param pidfile: the file to write the pid in
-        '''
-        if self.debug:
-            print 'creating a file %s and pid: %s' % (pidfile, lockname)
-        pidfile = open(self.pidfile, "wb")
-        pidfile.write(lockname)
-        pidfile.close
-        self.held = True
-
-
-def main():
-    print 'func is running'
-    cnt = 20
-    while 1:
-        print cnt
-        if cnt == 0:
-            break
-        time.sleep(1)
-        cnt -= 1
-
-
-if __name__ == "__main__":
-    try:
-        l = DaemonLock(desc='test lock')
-        main()
-        l.release()
-    except LockHeld:
-        sys.exit(1)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylons_app/lib/pidlock.py	Thu Sep 23 01:08:33 2010 +0200
@@ -0,0 +1,127 @@
+import os, time
+import sys
+from warnings import warn
+
+class LockHeld(Exception):pass
+
+
+class DaemonLock(object):
+    """daemon locking
+    USAGE:
+    try:
+        l = lock()
+        main()
+        l.release()
+    except LockHeld:
+        sys.exit(1)
+    """
+
+    def __init__(self, file=None, callbackfn=None,
+                 desc='daemon lock', debug=False):
+
+        self.pidfile = file if file else os.path.join(os.path.dirname(__file__),
+                                                      'running.lock')
+        self.callbackfn = callbackfn
+        self.desc = desc
+        self.debug = debug
+        self.held = False
+        #run the lock automatically !
+        self.lock()
+
+    def __del__(self):
+        if self.held:
+
+#            warn("use lock.release instead of del lock",
+#                    category = DeprecationWarning,
+#                    stacklevel = 2)
+
+            # ensure the lock will be removed
+            self.release()
+
+
+    def lock(self):
+        """
+        locking function, if lock is present it will raise LockHeld exception
+        """
+        lockname = '%s' % (os.getpid())
+
+        self.trylock()
+        self.makelock(lockname, self.pidfile)
+        return True
+
+    def trylock(self):
+        running_pid = False
+        try:
+            pidfile = open(self.pidfile, "r")
+            pidfile.seek(0)
+            running_pid = pidfile.readline()
+            if self.debug:
+                print 'lock file present running_pid: %s, checking for execution'\
+                % running_pid
+            # Now we check the PID from lock file matches to the current
+            # process PID
+            if running_pid:
+                if os.path.exists("/proc/%s" % running_pid):
+                        print "You already have an instance of the program running"
+                        print "It is running as process %s" % running_pid
+                        raise LockHeld
+                else:
+                        print "Lock File is there but the program is not running"
+                        print "Removing lock file for the: %s" % running_pid
+                        self.release()
+        except IOError, e:
+            if e.errno != 2:
+                raise
+
+
+    def release(self):
+        """
+        releases the pid by removing the pidfile
+        """
+        if self.callbackfn:
+            #execute callback function on release
+            if self.debug:
+                print 'executing callback function %s' % self.callbackfn
+            self.callbackfn()
+        try:
+            if self.debug:
+                print 'removing pidfile %s' % self.pidfile
+            os.remove(self.pidfile)
+            self.held = False
+        except OSError, e:
+            if self.debug:
+                print 'removing pidfile failed %s' % e
+            pass
+
+    def makelock(self, lockname, pidfile):
+        """
+        this function will make an actual lock
+        @param lockname: acctual pid of file
+        @param pidfile: the file to write the pid in
+        """
+        if self.debug:
+            print 'creating a file %s and pid: %s' % (pidfile, lockname)
+        pidfile = open(self.pidfile, "wb")
+        pidfile.write(lockname)
+        pidfile.close
+        self.held = True
+
+
+def main():
+    print 'func is running'
+    cnt = 20
+    while 1:
+        print cnt
+        if cnt == 0:
+            break
+        time.sleep(1)
+        cnt -= 1
+
+
+if __name__ == "__main__":
+    try:
+        l = DaemonLock(desc='test lock')
+        main()
+        l.release()
+    except LockHeld:
+        sys.exit(1)
--- a/pylons_app/lib/utils.py	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/lib/utils.py	Thu Sep 23 01:08:33 2010 +0200
@@ -374,9 +374,8 @@
     @param repo_location:
     @param full_index:
     """
-    from pylons_app.lib.indexers import daemon
     from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
-    from pylons_app.lib.indexers.pidlock import DaemonLock, LockHeld
+    from pylons_app.lib.pidlock import DaemonLock, LockHeld
     from pylons_app.lib.indexers import IDX_LOCATION
     import shutil
     
--- a/pylons_app/templates/files/files_browser.html	Wed Sep 22 16:26:49 2010 +0200
+++ b/pylons_app/templates/files/files_browser.html	Thu Sep 23 01:08:33 2010 +0200
@@ -29,26 +29,27 @@
 		                 <th>${_('Last commiter')}</th>
 		             </tr>
 		         </thead>
-		         	<tr class="parity0">
-		          		<td>
-		          		% if c.files_list.parent:
-		          			${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
-		          		%endif
-		          		</td>
-		          		<td></td>
-		          		<td></td>
-		          		<td></td>
-		          		<td></td>
-		         	</tr>
+
+          		% if c.files_list.parent:
+         		<tr class="parity0">
+	          		<td>		          		
+	          			${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
+	          		</td>
+	          		<td></td>
+	          		<td></td>
+	          		<td></td>
+	          		<td></td>
+	          		<td></td>
+				</tr>	          		
+          		%endif
+		         	
 		    %for cnt,node in enumerate(c.files_list,1):
 				<tr class="parity${cnt%2}">
 		             <td>
 						${h.link_to(node.name,h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=node.path),class_=file_class(node))}
 		             </td>
 		             <td>
-		                %if node.is_file():
-		             		${h.format_byte_size(node.size,binary=True)}
-		             	%endif
+		             	${h.format_byte_size(node.size,binary=True)}
 		             </td>
 		             <td>
 		              %if node.is_file():