diff rhodecode/lib/celerylib/tasks.py @ 547:1e757ac98988

renamed project to rhodecode
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 06 Oct 2010 03:18:16 +0200
parents pylons_app/lib/celerylib/tasks.py@72778dda34cf
children b75b77ef649d
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/celerylib/tasks.py	Wed Oct 06 03:18:16 2010 +0200
@@ -0,0 +1,316 @@
+from celery.decorators import task
+from celery.task.sets import subtask
+from celeryconfig import PYLONS_CONFIG as config
+from operator import itemgetter
+from pylons.i18n.translation import _
+from rhodecode.lib.celerylib import run_task, locked_task
+from rhodecode.lib.helpers import person
+from rhodecode.lib.smtp_mailer import SmtpMailer
+from rhodecode.lib.utils import OrderedDict
+from time import mktime
+from vcs.backends.hg import MercurialRepository
+import json
+import traceback
+
+__all__ = ['whoosh_index', 'get_commits_stats',
+           'reset_user_password', 'send_email']
+
+def get_session():
+    from sqlalchemy import engine_from_config
+    from sqlalchemy.orm import sessionmaker, scoped_session
+    engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
+    sa = scoped_session(sessionmaker(bind=engine))
+    return sa
+
+def get_hg_settings():
+    from rhodecode.model.db import HgAppSettings
+    try:
+        sa = get_session()
+        ret = sa.query(HgAppSettings).all()
+    finally:
+        sa.remove()
+        
+    if not ret:
+        raise Exception('Could not get application settings !')
+    settings = {}
+    for each in ret:
+        settings['hg_app_' + each.app_settings_name] = each.app_settings_value    
+    
+    return settings
+
+def get_hg_ui_settings():
+    from rhodecode.model.db import HgAppUi
+    try:
+        sa = get_session()
+        ret = sa.query(HgAppUi).all()
+    finally:
+        sa.remove()
+        
+    if not ret:
+        raise Exception('Could not get application ui settings !')
+    settings = {}
+    for each in ret:
+        k = each.ui_key
+        v = each.ui_value
+        if k == '/':
+            k = 'root_path'
+        
+        if k.find('.') != -1:
+            k = k.replace('.', '_')
+        
+        if each.ui_section == 'hooks':
+            v = each.ui_active
+        
+        settings[each.ui_section + '_' + k] = v  
+    
+    return settings   
+
+@task
+@locked_task
+def whoosh_index(repo_location, full_index):
+    log = whoosh_index.get_logger()
+    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
+    WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
+
+@task
+@locked_task
+def get_commits_stats(repo_name, ts_min_y, ts_max_y):
+    from rhodecode.model.db import Statistics, Repository
+    log = get_commits_stats.get_logger()
+    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
+    
+    commits_by_day_author_aggregate = {}
+    commits_by_day_aggregate = {}
+    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
+    repo = MercurialRepository(repos_path + repo_name)
+
+    skip_date_limit = True
+    parse_limit = 350 #limit for single task changeset parsing optimal for
+    last_rev = 0
+    last_cs = None
+    timegetter = itemgetter('time')
+    
+    sa = get_session()
+    
+    dbrepo = sa.query(Repository)\
+        .filter(Repository.repo_name == repo_name).scalar()
+    cur_stats = sa.query(Statistics)\
+        .filter(Statistics.repository == dbrepo).scalar()
+    if cur_stats:
+        last_rev = cur_stats.stat_on_revision
+    if not repo.revisions:
+        return True
+    
+    if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
+        #pass silently without any work if we're not on first revision or current
+        #state of parsing revision(from db marker) is the last revision
+        return True
+    
+    if cur_stats:
+        commits_by_day_aggregate = OrderedDict(
+                                       json.loads(
+                                        cur_stats.commit_activity_combined))
+        commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
+    
+    log.debug('starting parsing %s', parse_limit)
+    for cnt, rev in enumerate(repo.revisions[last_rev:]):
+        last_cs = cs = repo.get_changeset(rev)
+        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
+                          cs.date.timetuple()[2])
+        timetupple = [int(x) for x in k.split('-')]
+        timetupple.extend([0 for _ in xrange(6)])
+        k = mktime(timetupple)
+        if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
+            try:
+                l = [timegetter(x) for x in commits_by_day_author_aggregate\
+                        [author_key_cleaner(cs.author)]['data']]
+                time_pos = l.index(k)
+            except ValueError:
+                time_pos = False
+                
+            if time_pos >= 0 and time_pos is not False:
+                
+                datadict = commits_by_day_author_aggregate\
+                    [author_key_cleaner(cs.author)]['data'][time_pos]
+                
+                datadict["commits"] += 1
+                datadict["added"] += len(cs.added)
+                datadict["changed"] += len(cs.changed)
+                datadict["removed"] += len(cs.removed)
+                #print datadict
+                
+            else:
+                #print 'ELSE !!!!'
+                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+                    
+                    datadict = {"time":k,
+                                "commits":1,
+                                "added":len(cs.added),
+                                "changed":len(cs.changed),
+                                "removed":len(cs.removed),
+                               }
+                    commits_by_day_author_aggregate\
+                        [author_key_cleaner(cs.author)]['data'].append(datadict)
+                                        
+        else:
+            #print k, 'nokey ADDING'
+            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+                commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
+                                    "label":author_key_cleaner(cs.author),
+                                    "data":[{"time":k,
+                                             "commits":1,
+                                             "added":len(cs.added),
+                                             "changed":len(cs.changed),
+                                             "removed":len(cs.removed),
+                                             }],
+                                    "schema":["commits"],
+                                    }               
+    
+#        #gather all data by day
+        if commits_by_day_aggregate.has_key(k):
+            commits_by_day_aggregate[k] += 1
+        else:
+            commits_by_day_aggregate[k] = 1
+        
+        if cnt >= parse_limit:
+            #don't fetch to much data since we can freeze application
+            break
+
+    overview_data = []
+    for k, v in commits_by_day_aggregate.items():
+        overview_data.append([k, v])
+    overview_data = sorted(overview_data, key=itemgetter(0))
+        
+    if not commits_by_day_author_aggregate:
+        commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
+            "label":author_key_cleaner(repo.contact),
+            "data":[0, 1],
+            "schema":["commits"],
+        }
+
+    stats = cur_stats if cur_stats else Statistics()
+    stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
+    stats.commit_activity_combined = json.dumps(overview_data)
+
+    log.debug('last revison %s', last_rev)
+    leftovers = len(repo.revisions[last_rev:])
+    log.debug('revisions to parse %s', leftovers)
+    
+    if last_rev == 0 or leftovers < parse_limit:    
+        stats.languages = json.dumps(__get_codes_stats(repo_name))
+        
+    stats.repository = dbrepo
+    stats.stat_on_revision = last_cs.revision
+    
+    try:
+        sa.add(stats)
+        sa.commit()    
+    except:
+        log.error(traceback.format_exc())
+        sa.rollback()
+        return False
+    if len(repo.revisions) > 1:
+        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+                            
+    return True
+
+@task
+def reset_user_password(user_email):
+    log = reset_user_password.get_logger()
+    from rhodecode.lib import auth
+    from rhodecode.model.db import User
+    
+    try:
+        try:
+            sa = get_session()
+            user = sa.query(User).filter(User.email == user_email).scalar()
+            new_passwd = auth.PasswordGenerator().gen_password(8,
+                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
+            if user:
+                user.password = auth.get_crypt_password(new_passwd)
+                sa.add(user)
+                sa.commit()
+                log.info('change password for %s', user_email)
+            if new_passwd is None:
+                raise Exception('unable to generate new password')
+            
+        except:
+            log.error(traceback.format_exc())
+            sa.rollback()
+        
+        run_task(send_email, user_email,
+                 "Your new hg-app password",
+                 'Your new hg-app password:%s' % (new_passwd))
+        log.info('send new password mail to %s', user_email)
+        
+        
+    except:
+        log.error('Failed to update user password')
+        log.error(traceback.format_exc())
+    return True
+
+@task    
+def send_email(recipients, subject, body):
+    log = send_email.get_logger()
+    email_config = dict(config.items('DEFAULT')) 
+    mail_from = email_config.get('app_email_from')
+    user = email_config.get('smtp_username')
+    passwd = email_config.get('smtp_password')
+    mail_server = email_config.get('smtp_server')
+    mail_port = email_config.get('smtp_port')
+    tls = email_config.get('smtp_use_tls')
+    ssl = False
+    
+    try:
+        m = SmtpMailer(mail_from, user, passwd, mail_server,
+                       mail_port, ssl, tls)
+        m.send(recipients, subject, body)  
+    except:
+        log.error('Mail sending failed')
+        log.error(traceback.format_exc())
+        return False
+    return True
+
+@task
+def create_repo_fork(form_data, cur_user):
+    import os
+    from rhodecode.model.repo_model import RepoModel
+    sa = get_session()
+    rm = RepoModel(sa)
+    
+    rm.create(form_data, cur_user, just_db=True, fork=True)
+    
+    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
+    repo_path = os.path.join(repos_path, form_data['repo_name'])
+    repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
+    
+    MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
+
+    
+def __get_codes_stats(repo_name):
+    LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
+                    'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
+                    'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
+                    'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
+                    'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
+                    'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
+                    'yaws']
+    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
+    repo = MercurialRepository(repos_path + repo_name)
+
+    code_stats = {}
+    for topnode, dirs, files in repo.walk('/', 'tip'):
+        for f in files:
+            k = f.mimetype
+            if f.extension in LANGUAGES_EXTENSIONS:
+                if code_stats.has_key(k):
+                    code_stats[k] += 1
+                else:
+                    code_stats[k] = 1
+                    
+    return code_stats or {}
+
+
+            
+
+