changeset 467:3fc3ce53659b celery

starting celery branch
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 11 Sep 2010 01:55:46 +0200
parents 183cee110578
children 935dddee7422
files celeryconfig.py pylons_app/controllers/admin/settings.py pylons_app/controllers/summary.py pylons_app/lib/celerylib/__init__.py pylons_app/lib/celerylib/tasks.py pylons_app/templates/admin/settings/settings.html
diffstat 6 files changed, 189 insertions(+), 78 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/celeryconfig.py	Sat Sep 11 01:55:46 2010 +0200
@@ -0,0 +1,32 @@
+# List of modules to import when celery starts.
+import sys
+import os
+sys.path.append(os.getcwd())
+CELERY_IMPORTS = ("pylons_app.lib.celerylib.tasks", )
+
+## Result store settings.
+CELERY_RESULT_BACKEND = "database"
+CELERY_RESULT_DBURI = "sqlite:///hg_app.db"
+
+
+## Broker settings.
+BROKER_HOST = "localhost"
+BROKER_PORT = 5672
+BROKER_VHOST = "rabbitmqhost"
+BROKER_USER = "rabbitmq"
+BROKER_PASSWORD = "qweqwe"
+
+## Worker settings
+## If you're doing mostly I/O you can have more processes,
+## but if mostly spending CPU, try to keep it close to the
+## number of CPUs on your machine. If not set, the number of CPUs/cores
+## available will be used.
+CELERYD_CONCURRENCY = 2
+# CELERYD_LOG_FILE = "celeryd.log"
+CELERYD_LOG_LEVEL = "DEBUG"
+CELERYD_MAX_TASKS_PER_CHILD = 1
+
+#CELERY_ALWAYS_EAGER = True
+#rabbitmqctl add_user rabbitmq qweqwe
+#rabbitmqctl add_vhost rabbitmqhost
+#rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
\ No newline at end of file
--- a/pylons_app/controllers/admin/settings.py	Sat Sep 11 01:52:16 2010 +0200
+++ b/pylons_app/controllers/admin/settings.py	Sat Sep 11 01:55:46 2010 +0200
@@ -38,6 +38,7 @@
     ApplicationUiSettingsForm
 from pylons_app.model.hg_model import HgModel
 from pylons_app.model.user_model import UserModel
+from pylons_app.lib.celerylib import tasks,run_task
 import formencode
 import logging
 import traceback
@@ -102,6 +103,12 @@
             invalidate_cache('cached_repo_list')
             h.flash(_('Repositories sucessfully rescanned'), category='success')            
         
+        if setting_id == 'whoosh':
+            repo_location = get_hg_ui_settings()['paths_root_path']
+            full_index = request.POST.get('full_index',False)
+            task = run_task(tasks.whoosh_index,True,repo_location,full_index)
+            
+            h.flash(_('Whoosh reindex task scheduled'), category='success')
         if setting_id == 'global':
             
             application_form = ApplicationSettingsForm()()
--- a/pylons_app/controllers/summary.py	Sat Sep 11 01:52:16 2010 +0200
+++ b/pylons_app/controllers/summary.py	Sat Sep 11 01:55:46 2010 +0200
@@ -22,16 +22,14 @@
 summary controller for pylons
 @author: marcink
 """
-from datetime import datetime, timedelta
-from pylons import tmpl_context as c, request
+from pylons import tmpl_context as c, request,url
 from pylons_app.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 from pylons_app.lib.base import BaseController, render
-from pylons_app.lib.helpers import person
 from pylons_app.lib.utils import OrderedDict
 from pylons_app.model.hg_model import HgModel
-from time import mktime
 from webhelpers.paginate import Page
-import calendar
+from pylons_app.lib.celerylib import run_task
+from pylons_app.lib.celerylib.tasks import get_commits_stats
 import logging
 
 log = logging.getLogger(__name__)
@@ -62,78 +60,11 @@
         c.repo_branches = OrderedDict()
         for name, hash in c.repo_info.branches.items()[:10]:
             c.repo_branches[name] = c.repo_info.get_changeset(hash)
-
-        c.commit_data = self.__get_commit_stats(c.repo_info)
+        
+        task = run_task(get_commits_stats,False,c.repo_info.name)
+        c.ts_min = task.result[0]
+        c.ts_max = task.result[1]
+        c.commit_data = task.result[2]
         
         return render('summary/summary.html')
 
-
-
-    def __get_commit_stats(self, repo):
-        aggregate = OrderedDict()
-        
-        #graph range
-        td = datetime.today() + timedelta(days=1) 
-        y, m, d = td.year, td.month, td.day
-        c.ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
-                            d, 0, 0, 0, 0, 0, 0,))
-        c.ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
-        
-        def author_key_cleaner(k):
-            k = person(k)
-            k = k.replace('"', "'") #for js data compatibilty
-            return k
-                
-        for cs in repo[:200]:#added limit 200 until fix #29 is made
-            k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
-                              cs.date.timetuple()[2])
-            timetupple = [int(x) for x in k.split('-')]
-            timetupple.extend([0 for _ in xrange(6)])
-            k = mktime(timetupple)
-            if aggregate.has_key(author_key_cleaner(cs.author)):
-                if aggregate[author_key_cleaner(cs.author)].has_key(k):
-                    aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
-                    aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
-                    aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
-                    aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
-                    
-                else:
-                    #aggregate[author_key_cleaner(cs.author)].update(dates_range)
-                    if k >= c.ts_min and k <= c.ts_max:
-                        aggregate[author_key_cleaner(cs.author)][k] = {}
-                        aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
-                        aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
-                        aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
-                        aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
-                                            
-            else:
-                if k >= c.ts_min and k <= c.ts_max:
-                    aggregate[author_key_cleaner(cs.author)] = OrderedDict()
-                    #aggregate[author_key_cleaner(cs.author)].update(dates_range)
-                    aggregate[author_key_cleaner(cs.author)][k] = {}
-                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
-                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
-                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
-                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
-        
-        d = ''
-        tmpl0 = u""""%s":%s"""
-        tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
-        for author in aggregate:
-            
-            d += tmpl0 % (author,
-                          tmpl1 \
-                          % (author,
-                        [{"time":x,
-                          "commits":aggregate[author][x]['commits'],
-                          "added":aggregate[author][x]['added'],
-                          "changed":aggregate[author][x]['changed'],
-                          "removed":aggregate[author][x]['removed'],
-                          } for x in aggregate[author]]))
-        if d == '':
-            d = '"%s":{label:"%s",data:[[0,1],]}' \
-                % (author_key_cleaner(repo.contact),
-                   author_key_cleaner(repo.contact))
-        return d
-
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylons_app/lib/celerylib/__init__.py	Sat Sep 11 01:55:46 2010 +0200
@@ -0,0 +1,24 @@
+from vcs.utils.lazy import LazyProperty
+import logging
+
+log = logging.getLogger(__name__)
+
+class ResultWrapper(object):
+    def __init__(self, task):
+        self.task = task
+        
+    @LazyProperty
+    def result(self):
+        return self.task
+
+def run_task(task,async,*args,**kwargs):
+    try:
+        t = task.delay(*args,**kwargs)
+        log.info('running task %s',t.task_id)
+        if not async:
+            t.wait()
+        return t
+    except:
+        #pure sync version
+        return ResultWrapper(task(*args,**kwargs))
+    
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylons_app/lib/celerylib/tasks.py	Sat Sep 11 01:55:46 2010 +0200
@@ -0,0 +1,92 @@
+from celery.decorators import task
+from datetime import datetime, timedelta
+from pylons_app.lib.helpers import person
+from pylons_app.lib.utils import OrderedDict
+from time import mktime
+import calendar
+import logging
+from vcs.backends.hg import MercurialRepository
+
+log = logging.getLogger(__name__)
+
+@task()
+def whoosh_index(repo_location,full_index):
+    from pylons_app.lib.indexers import DaemonLock
+    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon,LockHeld
+    try:
+        l = DaemonLock()
+        WhooshIndexingDaemon(repo_location=repo_location)\
+            .run(full_index=full_index)
+        l.release()
+        return 'Done'
+    except LockHeld:
+        log.info('LockHeld')
+        return 'LockHeld'    
+
+@task()
+def get_commits_stats(repo):
+    aggregate = OrderedDict()
+    repo = MercurialRepository('/home/marcink/hg_repos/'+repo)
+    #graph range
+    td = datetime.today() + timedelta(days=1) 
+    y, m, d = td.year, td.month, td.day
+    ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
+                        d, 0, 0, 0, 0, 0, 0,))
+    ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
+    
+    def author_key_cleaner(k):
+        k = person(k)
+        k = k.replace('"', "'") #for js data compatibilty
+        return k
+            
+    for cs in repo[:200]:#added limit 200 until fix #29 is made
+        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
+                          cs.date.timetuple()[2])
+        timetupple = [int(x) for x in k.split('-')]
+        timetupple.extend([0 for _ in xrange(6)])
+        k = mktime(timetupple)
+        if aggregate.has_key(author_key_cleaner(cs.author)):
+            if aggregate[author_key_cleaner(cs.author)].has_key(k):
+                aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
+                aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
+                aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
+                aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
+                
+            else:
+                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
+                if k >= ts_min and k <= ts_max:
+                    aggregate[author_key_cleaner(cs.author)][k] = {}
+                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
+                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
+                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
+                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
+                                        
+        else:
+            if k >= ts_min and k <= ts_max:
+                aggregate[author_key_cleaner(cs.author)] = OrderedDict()
+                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
+                aggregate[author_key_cleaner(cs.author)][k] = {}
+                aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
+                aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
+                aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
+                aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
+    
+    d = ''
+    tmpl0 = u""""%s":%s"""
+    tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
+    for author in aggregate:
+        
+        d += tmpl0 % (author,
+                      tmpl1 \
+                      % (author,
+                    [{"time":x,
+                      "commits":aggregate[author][x]['commits'],
+                      "added":aggregate[author][x]['added'],
+                      "changed":aggregate[author][x]['changed'],
+                      "removed":aggregate[author][x]['removed'],
+                      } for x in aggregate[author]]))
+    if d == '':
+        d = '"%s":{label:"%s",data:[[0,1],]}' \
+            % (author_key_cleaner(repo.contact),
+               author_key_cleaner(repo.contact))
+    return (ts_min, ts_max, d)    
--- a/pylons_app/templates/admin/settings/settings.html	Sat Sep 11 01:52:16 2010 +0200
+++ b/pylons_app/templates/admin/settings/settings.html	Sat Sep 11 01:55:46 2010 +0200
@@ -47,7 +47,32 @@
         </div>
     </div>  
     ${h.end_form()}
-     
+    
+    <h3>${_('Whoosh indexing')}</h3>
+    ${h.form(url('admin_setting', setting_id='whoosh'),method='put')}
+    <div class="form">
+        <!-- fields -->
+        
+        <div class="fields">
+            <div class="field">
+                <div class="label label-checkbox">
+                    <label for="destroy">${_('index build option')}:</label>
+                </div>
+                <div class="checkboxes">
+                    <div class="checkbox">
+                        ${h.checkbox('full_index',True)}
+                        <label for="checkbox-1">${_('build from scratch')}</label>
+                    </div>
+                </div>
+            </div>
+                            
+            <div class="buttons">
+            ${h.submit('reindex','reindex',class_="ui-button ui-widget ui-state-default ui-corner-all")}
+            </div>                                                          
+        </div>
+    </div>  
+    ${h.end_form()}
+         
     <h3>${_('Global application settings')}</h3> 
     ${h.form(url('admin_setting', setting_id='global'),method='put')}
     <div class="form">