changeset 493:2256c78afe53 celery

implemented basic autoupdating statistics fetched from database
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 22 Sep 2010 04:30:36 +0200
parents a5a17000e45b
children b4d9680cd164
files celeryconfig.py pylons_app/controllers/summary.py pylons_app/lib/celerylib/tasks.py pylons_app/model/db.py pylons_app/templates/summary/summary.html
diffstat 5 files changed, 183 insertions(+), 80 deletions(-) [+]
line wrap: on
line diff
--- a/celeryconfig.py	Tue Sep 21 15:36:46 2010 +0200
+++ b/celeryconfig.py	Wed Sep 22 04:30:36 2010 +0200
@@ -16,6 +16,8 @@
 ## Result store settings.
 CELERY_RESULT_BACKEND = "database"
 CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
+CELERY_RESULT_SERIALIZER = 'json'
+
 
 BROKER_CONNECTION_MAX_RETRIES = 30
 
@@ -36,7 +38,37 @@
 CELERYD_LOG_LEVEL = "DEBUG"
 CELERYD_MAX_TASKS_PER_CHILD = 1
 
-#CELERY_ALWAYS_EAGER = True
-#rabbitmqctl add_user rabbitmq qweqwe
-#rabbitmqctl add_vhost rabbitmqhost
-#rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
+#Tasks will never be sent to the queue, but executed locally instead.
+CELERY_ALWAYS_EAGER = False
+
+#===============================================================================
+# EMAIL SETTINGS
+#===============================================================================
+pylons_email_config = dict(config.items('DEFAULT'))
+
+CELERY_SEND_TASK_ERROR_EMAILS = True
+
+#List of (name, email_address) tuples for the admins that should receive error e-mails.
+ADMINS = [('Administrator', pylons_email_config.get('email_to'))]
+
+#The e-mail address this worker sends e-mails from. Default is "celery@localhost".
+SERVER_EMAIL = pylons_email_config.get('error_email_from')
+
+#The mail server to use. Default is "localhost".
+MAIL_HOST = pylons_email_config.get('smtp_server')
+
+#Username (if required) to log on to the mail server with.
+MAIL_HOST_USER = pylons_email_config.get('smtp_username')
+
+#Password (if required) to log on to the mail server with.
+MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password')
+
+MAIL_PORT = pylons_email_config.get('smtp_port')
+
+
+#===============================================================================
+# INSTRUCTIONS FOR RABBITMQ
+#===============================================================================
+# rabbitmqctl add_user rabbitmq qweqwe
+# rabbitmqctl add_vhost rabbitmqhost
+# rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
--- a/pylons_app/controllers/summary.py	Tue Sep 21 15:36:46 2010 +0200
+++ b/pylons_app/controllers/summary.py	Wed Sep 22 04:30:36 2010 +0200
@@ -27,9 +27,13 @@
 from pylons_app.lib.base import BaseController, render
 from pylons_app.lib.utils import OrderedDict
 from pylons_app.model.hg_model import HgModel
+from pylons_app.model.db import Statistics
 from webhelpers.paginate import Page
 from pylons_app.lib.celerylib import run_task
 from pylons_app.lib.celerylib.tasks import get_commits_stats
+from datetime import datetime, timedelta
+from time import mktime
+import calendar
 import logging
 
 log = logging.getLogger(__name__)
@@ -61,11 +65,32 @@
         for name, hash in c.repo_info.branches.items()[:10]:
             c.repo_branches[name] = c.repo_info.get_changeset(hash)
         
-        task = run_task(get_commits_stats, c.repo_info.name)
-        c.ts_min = task.result[0]
-        c.ts_max = task.result[1]
-        c.commit_data = task.result[2]
-        c.overview_data = task.result[3]
+        td = datetime.today() + timedelta(days=1) 
+        y, m, d = td.year, td.month, td.day
+        
+        ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
+                            d, 0, 0, 0, 0, 0, 0,))
+        ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
+                            d, 0, 0, 0, 0, 0, 0,))
+        
+        ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
+            
+        run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y)
+        c.ts_min = ts_min_m
+        c.ts_max = ts_max_y
+        
+        
+        stats = self.sa.query(Statistics)\
+            .filter(Statistics.repository == c.repo_info.dbrepo)\
+            .scalar()
+
+        if stats:
+            c.commit_data = stats.commit_activity
+            c.overview_data = stats.commit_activity_combined
+        else:
+            import json
+            c.commit_data = json.dumps({})
+            c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 0] ])
         
         return render('summary/summary.html')
 
--- a/pylons_app/lib/celerylib/tasks.py	Tue Sep 21 15:36:46 2010 +0200
+++ b/pylons_app/lib/celerylib/tasks.py	Wed Sep 22 04:30:36 2010 +0200
@@ -1,7 +1,6 @@
 from celery.decorators import task
 from celery.task.sets import subtask
 from celeryconfig import PYLONS_CONFIG as config
-from datetime import datetime, timedelta
 from pylons.i18n.translation import _
 from pylons_app.lib.celerylib import run_task
 from pylons_app.lib.helpers import person
@@ -10,7 +9,6 @@
 from operator import itemgetter
 from vcs.backends.hg import MercurialRepository
 from time import mktime
-import calendar
 import traceback
 import json
 
@@ -83,94 +81,132 @@
         return 'LockHeld'    
 
 @task
-def get_commits_stats(repo):
+def get_commits_stats(repo_name, ts_min_y, ts_max_y):
+    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
+        
+    from pylons_app.model.db import Statistics, Repository
     log = get_commits_stats.get_logger()
-    aggregate = OrderedDict()
-    overview_aggregate = OrderedDict()
+    commits_by_day_author_aggregate = {}
+    commits_by_day_aggregate = {}
     repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
-    repo = MercurialRepository(repos_path + repo)
-    #graph range
-    td = datetime.today() + timedelta(days=1) 
-    y, m, d = td.year, td.month, td.day
+    repo = MercurialRepository(repos_path + repo_name)
+
+    skip_date_limit = True
+    parse_limit = 500 #limit for single task changeset parsing
+    last_rev = 0
+    last_cs = None
+    timegetter = itemgetter('time')
+    
+    sa = get_session()
     
-    ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
-                        d, 0, 0, 0, 0, 0, 0,))
-    ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
-                        d, 0, 0, 0, 0, 0, 0,))
+    dbrepo = sa.query(Repository)\
+        .filter(Repository.repo_name == repo_name).scalar()
+    cur_stats = sa.query(Statistics)\
+        .filter(Statistics.repository == dbrepo).scalar()
+    if cur_stats:
+        last_rev = cur_stats.stat_on_revision
     
-    ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
-    skip_date_limit = True
+    if last_rev == repo.revisions[-1]:
+        #pass silently without any work
+        return True
     
-    def author_key_cleaner(k):
-        k = person(k)
-        k = k.replace('"', "") #for js data compatibilty
-        return k
-            
-    for cs in repo[:200]:#added limit 200 until fix #29 is made
+    if cur_stats:
+        commits_by_day_aggregate = OrderedDict(
+                                       json.loads(
+                                        cur_stats.commit_activity_combined))
+        commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
+    
+    for cnt, rev in enumerate(repo.revisions[last_rev:]):
+        last_cs = cs = repo.get_changeset(rev)
         k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
                           cs.date.timetuple()[2])
         timetupple = [int(x) for x in k.split('-')]
         timetupple.extend([0 for _ in xrange(6)])
         k = mktime(timetupple)
-        if aggregate.has_key(author_key_cleaner(cs.author)):
-            if aggregate[author_key_cleaner(cs.author)].has_key(k):
-                aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
-                aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
-                aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
-                aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
+        if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
+            try:
+                l = [timegetter(x) for x in commits_by_day_author_aggregate\
+                        [author_key_cleaner(cs.author)]['data']]
+                time_pos = l.index(k)
+            except ValueError:
+                time_pos = False
+                
+            if time_pos >= 0 and time_pos is not False:
+                
+                datadict = commits_by_day_author_aggregate\
+                    [author_key_cleaner(cs.author)]['data'][time_pos]
+                
+                datadict["commits"] += 1
+                datadict["added"] += len(cs.added)
+                datadict["changed"] += len(cs.changed)
+                datadict["removed"] += len(cs.removed)
+                #print datadict
                 
             else:
-                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
+                #print 'ELSE !!!!'
                 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-                    aggregate[author_key_cleaner(cs.author)][k] = {}
-                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
-                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
-                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
-                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
+                    
+                    datadict = {"time":k,
+                                "commits":1,
+                                "added":len(cs.added),
+                                "changed":len(cs.changed),
+                                "removed":len(cs.removed),
+                               }
+                    commits_by_day_author_aggregate\
+                        [author_key_cleaner(cs.author)]['data'].append(datadict)
                                         
         else:
+            #print k, 'nokey ADDING'
             if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-                aggregate[author_key_cleaner(cs.author)] = OrderedDict()
-                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
-                aggregate[author_key_cleaner(cs.author)][k] = {}
-                aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
-                aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
-                aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
-                aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
+                commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
+                                    "label":author_key_cleaner(cs.author),
+                                    "data":[{"time":k,
+                                             "commits":1,
+                                             "added":len(cs.added),
+                                             "changed":len(cs.changed),
+                                             "removed":len(cs.removed),
+                                             }],
+                                    "schema":["commits"],
+                                    }               
     
-        
-        if overview_aggregate.has_key(k):
-            overview_aggregate[k] += 1
+#        #gather all data by day
+        if commits_by_day_aggregate.has_key(k):
+            commits_by_day_aggregate[k] += 1
         else:
-            overview_aggregate[k] = 1
-    
+            commits_by_day_aggregate[k] = 1
+        
+        if cnt >= parse_limit:
+            #don't fetch to much data since we can freeze application
+            break
+
     overview_data = []
-    for k, v in overview_aggregate.items():
+    for k, v in commits_by_day_aggregate.items():
         overview_data.append([k, v])
     overview_data = sorted(overview_data, key=itemgetter(0))
-    data = {}
-    for author in aggregate:
-        commit_data = sorted([{"time":x,
-                               "commits":aggregate[author][x]['commits'],
-                               "added":aggregate[author][x]['added'],
-                               "changed":aggregate[author][x]['changed'],
-                               "removed":aggregate[author][x]['removed'],
-                              } for x in aggregate[author]],
-                              key=itemgetter('time'))
         
-        data[author] = {"label":author,
-                      "data":commit_data,
-                      "schema":["commits"]
-                      }
-        
-    if not data:
-        data[author_key_cleaner(repo.contact)] = {
+    if not commits_by_day_author_aggregate:
+        commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
             "label":author_key_cleaner(repo.contact),
             "data":[0, 1],
             "schema":["commits"],
         }
-                
-    return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data))    
+
+    stats = cur_stats if cur_stats else Statistics()
+    stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
+    stats.commit_activity_combined = json.dumps(overview_data)
+    stats.repository = dbrepo
+    stats.stat_on_revision = last_cs.revision
+    stats.languages = json.dumps({'_TOTAL_':0, '':0})
+    
+    try:
+        sa.add(stats)
+        sa.commit()    
+    except:
+        log.error(traceback.format_exc())
+        sa.rollback()
+        return False
+                        
+    return True
 
 @task
 def reset_user_password(user_email):
@@ -184,10 +220,11 @@
             user = sa.query(User).filter(User.email == user_email).scalar()
             new_passwd = auth.PasswordGenerator().gen_password(8,
                              auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
-            user.password = auth.get_crypt_password(new_passwd)
-            sa.add(user)
-            sa.commit()
-            log.info('change password for %s', user_email)
+            if user:
+                user.password = auth.get_crypt_password(new_passwd)
+                sa.add(user)
+                sa.commit()
+                log.info('change password for %s', user_email)
             if new_passwd is None:
                 raise Exception('unable to generate new password')
             
--- a/pylons_app/model/db.py	Tue Sep 21 15:36:46 2010 +0200
+++ b/pylons_app/model/db.py	Wed Sep 22 04:30:36 2010 +0200
@@ -120,6 +120,15 @@
     user = relation('User')
     permission = relation('Permission')
 
-
+class Statistics(Base):
+    __tablename__ = 'statistics'
+    __table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
+    stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
+    repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None)
+    stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False)
+    commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data
+    commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data
+    languages = Column("languages", BLOB(), nullable=False)#JSON data
+    
+    repository = relation('Repository')
 
-
--- a/pylons_app/templates/summary/summary.html	Tue Sep 21 15:36:46 2010 +0200
+++ b/pylons_app/templates/summary/summary.html	Wed Sep 22 04:30:36 2010 +0200
@@ -123,7 +123,7 @@
 <div class="box box-right"  style="min-height:455px">
     <!-- box / title -->
     <div class="title">
-        <h5>${_('Commit activity')}</h5>
+        <h5>${_('Commit activity by day / author')}</h5>
     </div>
     
     <div class="table">