changeset 2527:95624ce4465f beta

orginized test module - separated test models into separate files - added scripts for script tests - few fixes and renames for readability
author Marcin Kuzminski <marcin@python-works.com>
date Sun, 01 Jul 2012 16:24:15 +0200
parents 473794943022
children 5c8b1eaafe77
files rhodecode/tests/_test_concurency.py rhodecode/tests/mem_watch rhodecode/tests/models/__init__.py rhodecode/tests/models/test_notifications.py rhodecode/tests/models/test_permissions.py rhodecode/tests/models/test_repos_groups.py rhodecode/tests/models/test_users.py rhodecode/tests/rhodecode_crawler.py rhodecode/tests/scripts/mem_watch rhodecode/tests/scripts/test_concurency.py rhodecode/tests/scripts/test_crawler.py rhodecode/tests/scripts/test_scm_operations.py rhodecode/tests/test_hg_operations.py rhodecode/tests/test_models.py rhodecode/tests/vcs/__init__.py
diffstat 14 files changed, 1608 insertions(+), 1559 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/tests/_test_concurency.py	Sun Jul 01 16:22:38 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,211 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    rhodecode.tests.test_hg_operations
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-    Test suite for making push/pull operations
-
-    :created_on: Dec 30, 2010
-    :author: marcink
-    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
-    :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import sys
-import shutil
-import logging
-from os.path import join as jn
-from os.path import dirname as dn
-
-from tempfile import _RandomNameSequence
-from subprocess import Popen, PIPE
-
-from paste.deploy import appconfig
-from pylons import config
-from sqlalchemy import engine_from_config
-
-from rhodecode.lib.utils import add_cache
-from rhodecode.model import init_model
-from rhodecode.model import meta
-from rhodecode.model.db import User, Repository
-from rhodecode.lib.auth import get_crypt_password
-
-from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
-from rhodecode.config.environment import load_environment
-
-rel_path = dn(dn(dn(os.path.abspath(__file__))))
-conf = appconfig('config:development.ini', relative_to=rel_path)
-load_environment(conf.global_conf, conf.local_conf)
-
-add_cache(conf)
-
-USER = 'test_admin'
-PASS = 'test12'
-HOST = 'hg.local'
-METHOD = 'pull'
-DEBUG = True
-log = logging.getLogger(__name__)
-
-
-class Command(object):
-
-    def __init__(self, cwd):
-        self.cwd = cwd
-
-    def execute(self, cmd, *args):
-        """Runs command on the system with given ``args``.
-        """
-
-        command = cmd + ' ' + ' '.join(args)
-        log.debug('Executing %s' % command)
-        if DEBUG:
-            print command
-        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
-        stdout, stderr = p.communicate()
-        if DEBUG:
-            print stdout, stderr
-        return stdout, stderr
-
-def get_session():
-    engine = engine_from_config(conf, 'sqlalchemy.db1.')
-    init_model(engine)
-    sa = meta.Session
-    return sa
-
-
-def create_test_user(force=True):
-    print 'creating test user'
-    sa = get_session()
-
-    user = sa.query(User).filter(User.username == USER).scalar()
-
-    if force and user is not None:
-        print 'removing current user'
-        for repo in sa.query(Repository).filter(Repository.user == user).all():
-            sa.delete(repo)
-        sa.delete(user)
-        sa.commit()
-
-    if user is None or force:
-        print 'creating new one'
-        new_usr = User()
-        new_usr.username = USER
-        new_usr.password = get_crypt_password(PASS)
-        new_usr.email = 'mail@mail.com'
-        new_usr.name = 'test'
-        new_usr.lastname = 'lasttestname'
-        new_usr.active = True
-        new_usr.admin = True
-        sa.add(new_usr)
-        sa.commit()
-
-    print 'done'
-
-
-def create_test_repo(force=True):
-    print 'creating test repo'
-    from rhodecode.model.repo import RepoModel
-    sa = get_session()
-
-    user = sa.query(User).filter(User.username == USER).scalar()
-    if user is None:
-        raise Exception('user not found')
-
-
-    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
-
-    if repo is None:
-        print 'repo not found creating'
-
-        form_data = {'repo_name':HG_REPO,
-                     'repo_type':'hg',
-                     'private':False,
-                     'clone_uri':'' }
-        rm = RepoModel(sa)
-        rm.base_path = '/home/hg'
-        rm.create(form_data, user)
-
-    print 'done'
-
-def set_anonymous_access(enable=True):
-    sa = get_session()
-    user = sa.query(User).filter(User.username == 'default').one()
-    user.active = enable
-    sa.add(user)
-    sa.commit()
-
-def get_anonymous_access():
-    sa = get_session()
-    return sa.query(User).filter(User.username == 'default').one().active
-
-
-#==============================================================================
-# TESTS
-#==============================================================================
-def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
-                                seq=None):
-    cwd = path = jn(TESTS_TMP_PATH, repo)
-
-    if seq == None:
-        seq = _RandomNameSequence().next()
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':repo, }
-
-    dest = path + seq
-    if method == 'pull':
-        stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
-    else:
-        stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
-
-        if no_errors is False:
-            assert """adding file changes""" in stdout, 'no messages about cloning'
-            assert """abort""" not in stderr , 'got error from clone'
-
-if __name__ == '__main__':
-    try:
-        create_test_user(force=False)
-        seq = None
-        import time
-
-        try:
-            METHOD = sys.argv[3]
-        except:
-            pass
-
-        if METHOD == 'pull':
-            seq = _RandomNameSequence().next()
-            test_clone_with_credentials(repo=sys.argv[1], method='clone',
-                                        seq=seq)
-        s = time.time()
-        for i in range(1, int(sys.argv[2]) + 1):
-            print 'take', i
-            test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
-                                        seq=seq)
-        print 'time taken %.3f' % (time.time() - s)
-    except Exception, e:
-        raise
-        sys.exit('stop on %s' % e)
--- a/rhodecode/tests/mem_watch	Sun Jul 01 16:22:38 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-ps -eo size,pid,user,command --sort -size | awk '{ hr=$1/1024 ; printf("%13.2f Mb ",hr) } { for ( x=4 ; x<=NF ; x++ ) { printf("%s ",$x) } print "" }'|grep [p]aster
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/models/test_notifications.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,189 @@
+import os
+import unittest
+from rhodecode.tests import *
+
+from rhodecode.model.db import User, Notification, UserNotification
+from rhodecode.model.user import UserModel
+
+from rhodecode.model.meta import Session
+from rhodecode.model.notification import NotificationModel
+
+
+class TestNotifications(unittest.TestCase):
+
+    def __init__(self, methodName='runTest'):
+        Session.remove()
+        self.u1 = UserModel().create_or_update(username=u'u1',
+                                        password=u'qweqwe',
+                                        email=u'u1@rhodecode.org',
+                                        firstname=u'u1', lastname=u'u1')
+        Session().commit()
+        self.u1 = self.u1.user_id
+
+        self.u2 = UserModel().create_or_update(username=u'u2',
+                                        password=u'qweqwe',
+                                        email=u'u2@rhodecode.org',
+                                        firstname=u'u2', lastname=u'u3')
+        Session().commit()
+        self.u2 = self.u2.user_id
+
+        self.u3 = UserModel().create_or_update(username=u'u3',
+                                        password=u'qweqwe',
+                                        email=u'u3@rhodecode.org',
+                                        firstname=u'u3', lastname=u'u3')
+        Session().commit()
+        self.u3 = self.u3.user_id
+
+        super(TestNotifications, self).__init__(methodName=methodName)
+
+    def _clean_notifications(self):
+        for n in Notification.query().all():
+            Session().delete(n)
+
+        Session().commit()
+        self.assertEqual(Notification.query().all(), [])
+
+    def tearDown(self):
+        self._clean_notifications()
+
+    def test_create_notification(self):
+        self.assertEqual([], Notification.query().all())
+        self.assertEqual([], UserNotification.query().all())
+
+        usrs = [self.u1, self.u2]
+        notification = NotificationModel().create(created_by=self.u1,
+                                           subject=u'subj', body=u'hi there',
+                                           recipients=usrs)
+        Session().commit()
+        u1 = User.get(self.u1)
+        u2 = User.get(self.u2)
+        u3 = User.get(self.u3)
+        notifications = Notification.query().all()
+        self.assertEqual(len(notifications), 1)
+
+        unotification = UserNotification.query()\
+            .filter(UserNotification.notification == notification).all()
+
+        self.assertEqual(notifications[0].recipients, [u1, u2])
+        self.assertEqual(notification.notification_id,
+                         notifications[0].notification_id)
+        self.assertEqual(len(unotification), len(usrs))
+        self.assertEqual([x.user.user_id for x in unotification], usrs)
+
+    def test_user_notifications(self):
+        self.assertEqual([], Notification.query().all())
+        self.assertEqual([], UserNotification.query().all())
+
+        notification1 = NotificationModel().create(created_by=self.u1,
+                                            subject=u'subj', body=u'hi there1',
+                                            recipients=[self.u3])
+        Session().commit()
+        notification2 = NotificationModel().create(created_by=self.u1,
+                                            subject=u'subj', body=u'hi there2',
+                                            recipients=[self.u3])
+        Session().commit()
+        u3 = Session().query(User).get(self.u3)
+
+        self.assertEqual(sorted([x.notification for x in u3.notifications]),
+                         sorted([notification2, notification1]))
+
+    def test_delete_notifications(self):
+        self.assertEqual([], Notification.query().all())
+        self.assertEqual([], UserNotification.query().all())
+
+        notification = NotificationModel().create(created_by=self.u1,
+                                           subject=u'title', body=u'hi there3',
+                                    recipients=[self.u3, self.u1, self.u2])
+        Session().commit()
+        notifications = Notification.query().all()
+        self.assertTrue(notification in notifications)
+
+        Notification.delete(notification.notification_id)
+        Session().commit()
+
+        notifications = Notification.query().all()
+        self.assertFalse(notification in notifications)
+
+        un = UserNotification.query().filter(UserNotification.notification
+                                             == notification).all()
+        self.assertEqual(un, [])
+
+    def test_delete_association(self):
+
+        self.assertEqual([], Notification.query().all())
+        self.assertEqual([], UserNotification.query().all())
+
+        notification = NotificationModel().create(created_by=self.u1,
+                                           subject=u'title', body=u'hi there3',
+                                    recipients=[self.u3, self.u1, self.u2])
+        Session().commit()
+
+        unotification = UserNotification.query()\
+                            .filter(UserNotification.notification ==
+                                    notification)\
+                            .filter(UserNotification.user_id == self.u3)\
+                            .scalar()
+
+        self.assertEqual(unotification.user_id, self.u3)
+
+        NotificationModel().delete(self.u3,
+                                   notification.notification_id)
+        Session().commit()
+
+        u3notification = UserNotification.query()\
+                            .filter(UserNotification.notification ==
+                                    notification)\
+                            .filter(UserNotification.user_id == self.u3)\
+                            .scalar()
+
+        self.assertEqual(u3notification, None)
+
+        # notification object is still there
+        self.assertEqual(Notification.query().all(), [notification])
+
+        #u1 and u2 still have assignments
+        u1notification = UserNotification.query()\
+                            .filter(UserNotification.notification ==
+                                    notification)\
+                            .filter(UserNotification.user_id == self.u1)\
+                            .scalar()
+        self.assertNotEqual(u1notification, None)
+        u2notification = UserNotification.query()\
+                            .filter(UserNotification.notification ==
+                                    notification)\
+                            .filter(UserNotification.user_id == self.u2)\
+                            .scalar()
+        self.assertNotEqual(u2notification, None)
+
+    def test_notification_counter(self):
+        self._clean_notifications()
+        self.assertEqual([], Notification.query().all())
+        self.assertEqual([], UserNotification.query().all())
+
+        NotificationModel().create(created_by=self.u1,
+                            subject=u'title', body=u'hi there_delete',
+                            recipients=[self.u3, self.u1])
+        Session().commit()
+
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u1), 1)
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u2), 0)
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u3), 1)
+
+        notification = NotificationModel().create(created_by=self.u1,
+                                           subject=u'title', body=u'hi there3',
+                                    recipients=[self.u3, self.u1, self.u2])
+        Session().commit()
+
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u1), 2)
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u2), 1)
+        self.assertEqual(NotificationModel()
+                         .get_unread_cnt_for_user(self.u3), 2)
+
+
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/models/test_permissions.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,316 @@
+import os
+import unittest
+from rhodecode.tests import *
+
+from rhodecode.model.repos_group import ReposGroupModel
+from rhodecode.model.repo import RepoModel
+from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
+from rhodecode.model.user import UserModel
+
+from rhodecode.model.meta import Session
+from rhodecode.model.users_group import UsersGroupModel
+from rhodecode.lib.auth import AuthUser
+
+
+def _make_group(path, desc='desc', parent_id=None,
+                 skip_if_exists=False):
+
+    gr = RepoGroup.get_by_group_name(path)
+    if gr and skip_if_exists:
+        return gr
+
+    gr = ReposGroupModel().create(path, desc, parent_id)
+    return gr
+
+
+class TestPermissions(unittest.TestCase):
+    def __init__(self, methodName='runTest'):
+        super(TestPermissions, self).__init__(methodName=methodName)
+
+    def setUp(self):
+        self.u1 = UserModel().create_or_update(
+            username=u'u1', password=u'qweqwe',
+            email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
+        )
+        self.u2 = UserModel().create_or_update(
+            username=u'u2', password=u'qweqwe',
+            email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
+        )
+        self.anon = User.get_by_username('default')
+        self.a1 = UserModel().create_or_update(
+            username=u'a1', password=u'qweqwe',
+            email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
+        )
+        Session().commit()
+
+    def tearDown(self):
+        if hasattr(self, 'test_repo'):
+            RepoModel().delete(repo=self.test_repo)
+        UserModel().delete(self.u1)
+        UserModel().delete(self.u2)
+        UserModel().delete(self.a1)
+        if hasattr(self, 'g1'):
+            ReposGroupModel().delete(self.g1.group_id)
+        if hasattr(self, 'g2'):
+            ReposGroupModel().delete(self.g2.group_id)
+
+        if hasattr(self, 'ug1'):
+            UsersGroupModel().delete(self.ug1, force=True)
+
+        Session().commit()
+
+    def test_default_perms_set(self):
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        perms = {
+            'repositories_groups': {},
+            'global': set([u'hg.create.repository', u'repository.read',
+                           u'hg.register.manual_activate']),
+            'repositories': {u'vcs_test_hg': u'repository.read'}
+        }
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         perms['repositories'][HG_REPO])
+        new_perm = 'repository.write'
+        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
+                                          perm=new_perm)
+        Session().commit()
+
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         new_perm)
+
+    def test_default_admin_perms_set(self):
+        a1_auth = AuthUser(user_id=self.a1.user_id)
+        perms = {
+            'repositories_groups': {},
+            'global': set([u'hg.admin']),
+            'repositories': {u'vcs_test_hg': u'repository.admin'}
+        }
+        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
+                         perms['repositories'][HG_REPO])
+        new_perm = 'repository.write'
+        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
+                                          perm=new_perm)
+        Session().commit()
+        # cannot really downgrade admins permissions !? they still get's set as
+        # admin !
+        u1_auth = AuthUser(user_id=self.a1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         perms['repositories'][HG_REPO])
+
+    def test_default_group_perms(self):
+        self.g1 = _make_group('test1', skip_if_exists=True)
+        self.g2 = _make_group('test2', skip_if_exists=True)
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        perms = {
+            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
+            'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
+            'repositories': {u'vcs_test_hg': u'repository.read'}
+        }
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         perms['repositories'][HG_REPO])
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         perms['repositories_groups'])
+
+    def test_default_admin_group_perms(self):
+        self.g1 = _make_group('test1', skip_if_exists=True)
+        self.g2 = _make_group('test2', skip_if_exists=True)
+        a1_auth = AuthUser(user_id=self.a1.user_id)
+        perms = {
+            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
+            'global': set(['hg.admin']),
+            'repositories': {u'vcs_test_hg': 'repository.admin'}
+        }
+
+        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
+                         perms['repositories'][HG_REPO])
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                         perms['repositories_groups'])
+
+    def test_propagated_permission_from_users_group(self):
+        # make group
+        self.ug1 = UsersGroupModel().create('G1')
+        # add user to group
+        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
+
+        # set permission to lower
+        new_perm = 'repository.none'
+        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
+        Session().commit()
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         new_perm)
+
+        # grant perm for group this should override permission from user
+        new_perm = 'repository.write'
+        RepoModel().grant_users_group_permission(repo=HG_REPO,
+                                                 group_name=self.ug1,
+                                                 perm=new_perm)
+        # check perms
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        perms = {
+            'repositories_groups': {},
+            'global': set([u'hg.create.repository', u'repository.read',
+                           u'hg.register.manual_activate']),
+            'repositories': {u'vcs_test_hg': u'repository.read'}
+        }
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         new_perm)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         perms['repositories_groups'])
+
+    def test_propagated_permission_from_users_group_lower_weight(self):
+        # make group
+        self.ug1 = UsersGroupModel().create('G1')
+        # add user to group
+        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
+
+        # set permission to lower
+        new_perm_h = 'repository.write'
+        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
+                                          perm=new_perm_h)
+        Session().commit()
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         new_perm_h)
+
+        # grant perm for group this should NOT override permission from user
+        # since it's lower than granted
+        new_perm_l = 'repository.read'
+        RepoModel().grant_users_group_permission(repo=HG_REPO,
+                                                 group_name=self.ug1,
+                                                 perm=new_perm_l)
+        # check perms
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        perms = {
+            'repositories_groups': {},
+            'global': set([u'hg.create.repository', u'repository.read',
+                           u'hg.register.manual_activate']),
+            'repositories': {u'vcs_test_hg': u'repository.write'}
+        }
+        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
+                         new_perm_h)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         perms['repositories_groups'])
+
+    def test_repo_in_group_permissions(self):
+        self.g1 = _make_group('group1', skip_if_exists=True)
+        self.g2 = _make_group('group2', skip_if_exists=True)
+        Session().commit()
+        # both perms should be read !
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.read', u'group2': u'group.read'})
+
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.read', u'group2': u'group.read'})
+
+        #Change perms to none for both groups
+        ReposGroupModel().grant_user_permission(repos_group=self.g1,
+                                                user=self.anon,
+                                                perm='group.none')
+        ReposGroupModel().grant_user_permission(repos_group=self.g2,
+                                                user=self.anon,
+                                                perm='group.none')
+
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+        # add repo to group
+        name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
+        self.test_repo = RepoModel().create_repo(
+            repo_name=name,
+            repo_type='hg',
+            description='',
+            owner=self.u1,
+        )
+        Session().commit()
+
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+        #grant permission for u2 !
+        ReposGroupModel().grant_user_permission(repos_group=self.g1,
+                                                user=self.u2,
+                                                perm='group.read')
+        ReposGroupModel().grant_user_permission(repos_group=self.g2,
+                                                user=self.u2,
+                                                perm='group.read')
+        Session().commit()
+        self.assertNotEqual(self.u1, self.u2)
+        #u1 and anon should have not change perms while u2 should !
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+        u2_auth = AuthUser(user_id=self.u2.user_id)
+        self.assertEqual(u2_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.read', u'group2': u'group.read'})
+
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                 {u'group1': u'group.none', u'group2': u'group.none'})
+
+    def test_repo_group_user_as_user_group_member(self):
+        # create Group1
+        self.g1 = _make_group('group1', skip_if_exists=True)
+        Session().commit()
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.read'})
+
+        # set default permission to none
+        ReposGroupModel().grant_user_permission(repos_group=self.g1,
+                                                user=self.anon,
+                                                perm='group.none')
+        # make group
+        self.ug1 = UsersGroupModel().create('G1')
+        # add user to group
+        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
+        Session().commit()
+
+        # check if user is in the group
+        membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
+        self.assertEqual(membrs, [self.u1.user_id])
+        # add some user to that group
+
+        # check his permissions
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.none'})
+
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.none'})
+
+        # grant ug1 read permissions for
+        ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
+                                                       group_name=self.ug1,
+                                                       perm='group.read')
+        Session().commit()
+        # check if the
+        obj = Session().query(UsersGroupRepoGroupToPerm)\
+            .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
+            .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
+            .scalar()
+        self.assertEqual(obj.permission.permission_name, 'group.read')
+
+        a1_auth = AuthUser(user_id=self.anon.user_id)
+
+        self.assertEqual(a1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.none'})
+
+        u1_auth = AuthUser(user_id=self.u1.user_id)
+        self.assertEqual(u1_auth.permissions['repositories_groups'],
+                         {u'group1': u'group.read'})
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/models/test_repos_groups.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,170 @@
+import os
+import unittest
+from rhodecode.tests import *
+
+from rhodecode.model.repos_group import ReposGroupModel
+from rhodecode.model.repo import RepoModel
+from rhodecode.model.db import RepoGroup, User
+from rhodecode.model.meta import Session
+from sqlalchemy.exc import IntegrityError
+
+
+def _make_group(path, desc='desc', parent_id=None,
+                 skip_if_exists=False):
+
+    gr = RepoGroup.get_by_group_name(path)
+    if gr and skip_if_exists:
+        return gr
+
+    gr = ReposGroupModel().create(path, desc, parent_id)
+    return gr
+
+
+class TestReposGroups(unittest.TestCase):
+
+    def setUp(self):
+        self.g1 = _make_group('test1', skip_if_exists=True)
+        Session().commit()
+        self.g2 = _make_group('test2', skip_if_exists=True)
+        Session().commit()
+        self.g3 = _make_group('test3', skip_if_exists=True)
+        Session().commit()
+
+    def tearDown(self):
+        print 'out'
+
+    def __check_path(self, *path):
+        """
+        Checks the path for existance !
+        """
+        path = [TESTS_TMP_PATH] + list(path)
+        path = os.path.join(*path)
+        return os.path.isdir(path)
+
+    def _check_folders(self):
+        print os.listdir(TESTS_TMP_PATH)
+
+    def __delete_group(self, id_):
+        ReposGroupModel().delete(id_)
+
+    def __update_group(self, id_, path, desc='desc', parent_id=None):
+        form_data = dict(
+            group_name=path,
+            group_description=desc,
+            group_parent_id=parent_id,
+            perms_updates=[],
+            perms_new=[]
+        )
+        gr = ReposGroupModel().update(id_, form_data)
+        return gr
+
+    def test_create_group(self):
+        g = _make_group('newGroup')
+        self.assertEqual(g.full_path, 'newGroup')
+
+        self.assertTrue(self.__check_path('newGroup'))
+
+    def test_create_same_name_group(self):
+        self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
+        Session().rollback()
+
+    def test_same_subgroup(self):
+        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
+        self.assertEqual(sg1.parent_group, self.g1)
+        self.assertEqual(sg1.full_path, 'test1/sub1')
+        self.assertTrue(self.__check_path('test1', 'sub1'))
+
+        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
+        self.assertEqual(ssg1.parent_group, sg1)
+        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
+        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
+
+    def test_remove_group(self):
+        sg1 = _make_group('deleteme')
+        self.__delete_group(sg1.group_id)
+
+        self.assertEqual(RepoGroup.get(sg1.group_id), None)
+        self.assertFalse(self.__check_path('deteteme'))
+
+        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
+        self.__delete_group(sg1.group_id)
+
+        self.assertEqual(RepoGroup.get(sg1.group_id), None)
+        self.assertFalse(self.__check_path('test1', 'deteteme'))
+
+    def test_rename_single_group(self):
+        sg1 = _make_group('initial')
+
+        new_sg1 = self.__update_group(sg1.group_id, 'after')
+        self.assertTrue(self.__check_path('after'))
+        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
+
+    def test_update_group_parent(self):
+
+        sg1 = _make_group('initial', parent_id=self.g1.group_id)
+
+        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
+        self.assertTrue(self.__check_path('test1', 'after'))
+        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
+
+        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
+        self.assertTrue(self.__check_path('test3', 'after'))
+        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
+
+        new_sg1 = self.__update_group(sg1.group_id, 'hello')
+        self.assertTrue(self.__check_path('hello'))
+
+        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
+
+    def test_subgrouping_with_repo(self):
+
+        g1 = _make_group('g1')
+        g2 = _make_group('g2')
+
+        # create new repo
+        form_data = dict(repo_name='john',
+                         repo_name_full='john',
+                         fork_name=None,
+                         description=None,
+                         repo_group=None,
+                         private=False,
+                         repo_type='hg',
+                         clone_uri=None,
+                         landing_rev='tip')
+        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
+        r = RepoModel().create(form_data, cur_user)
+
+        self.assertEqual(r.repo_name, 'john')
+
+        # put repo into group
+        form_data = form_data
+        form_data['repo_group'] = g1.group_id
+        form_data['perms_new'] = []
+        form_data['perms_updates'] = []
+        RepoModel().update(r.repo_name, form_data)
+        self.assertEqual(r.repo_name, 'g1/john')
+
+        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
+        self.assertTrue(self.__check_path('g2', 'g1'))
+
+        # test repo
+        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
+                                                                r.just_name]))
+
+    def test_move_to_root(self):
+        g1 = _make_group('t11')
+        Session().commit()
+        g2 = _make_group('t22', parent_id=g1.group_id)
+        Session().commit()
+
+        self.assertEqual(g2.full_path, 't11/t22')
+        self.assertTrue(self.__check_path('t11', 't22'))
+
+        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
+        Session().commit()
+
+        self.assertEqual(g2.group_name, 'g22')
+        # we moved out group from t1 to '' so it's full path should be 'g2'
+        self.assertEqual(g2.full_path, 'g22')
+        self.assertFalse(self.__check_path('t11', 't22'))
+        self.assertTrue(self.__check_path('g22'))
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/models/test_users.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,124 @@
+import unittest
+from rhodecode.tests import *
+
+from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
+    Permission
+from rhodecode.model.user import UserModel
+
+from rhodecode.model.meta import Session
+from rhodecode.model.users_group import UsersGroupModel
+
+
+class TestUser(unittest.TestCase):
+    def __init__(self, methodName='runTest'):
+        Session.remove()
+        super(TestUser, self).__init__(methodName=methodName)
+
+    def test_create_and_remove(self):
+        usr = UserModel().create_or_update(username=u'test_user',
+                                           password=u'qweqwe',
+                                     email=u'u232@rhodecode.org',
+                                     firstname=u'u1', lastname=u'u1')
+        Session().commit()
+        self.assertEqual(User.get_by_username(u'test_user'), usr)
+
+        # make users group
+        users_group = UsersGroupModel().create('some_example_group')
+        Session().commit()
+
+        UsersGroupModel().add_user_to_group(users_group, usr)
+        Session().commit()
+
+        self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
+        self.assertEqual(UsersGroupMember.query().count(), 1)
+        UserModel().delete(usr.user_id)
+        Session().commit()
+
+        self.assertEqual(UsersGroupMember.query().all(), [])
+
+    def test_additonal_email_as_main(self):
+        usr = UserModel().create_or_update(username=u'test_user',
+                                           password=u'qweqwe',
+                                     email=u'main_email@rhodecode.org',
+                                     firstname=u'u1', lastname=u'u1')
+        Session().commit()
+
+        def do():
+            m = UserEmailMap()
+            m.email = u'main_email@rhodecode.org'
+            m.user = usr
+            Session().add(m)
+            Session().commit()
+        self.assertRaises(AttributeError, do)
+
+        UserModel().delete(usr.user_id)
+        Session().commit()
+
+    def test_extra_email_map(self):
+        usr = UserModel().create_or_update(username=u'test_user',
+                                           password=u'qweqwe',
+                                     email=u'main_email@rhodecode.org',
+                                     firstname=u'u1', lastname=u'u1')
+        Session().commit()
+
+        m = UserEmailMap()
+        m.email = u'main_email2@rhodecode.org'
+        m.user = usr
+        Session().add(m)
+        Session().commit()
+
+        u = User.get_by_email(email='main_email@rhodecode.org')
+        self.assertEqual(usr.user_id, u.user_id)
+        self.assertEqual(usr.username, u.username)
+
+        u = User.get_by_email(email='main_email2@rhodecode.org')
+        self.assertEqual(usr.user_id, u.user_id)
+        self.assertEqual(usr.username, u.username)
+        u = User.get_by_email(email='main_email3@rhodecode.org')
+        self.assertEqual(None, u)
+
+        UserModel().delete(usr.user_id)
+        Session().commit()
+
+
+class TestUsers(unittest.TestCase):
+
+    def __init__(self, methodName='runTest'):
+        super(TestUsers, self).__init__(methodName=methodName)
+
+    def setUp(self):
+        self.u1 = UserModel().create_or_update(username=u'u1',
+                                        password=u'qweqwe',
+                                        email=u'u1@rhodecode.org',
+                                        firstname=u'u1', lastname=u'u1')
+
+    def tearDown(self):
+        perm = Permission.query().all()
+        for p in perm:
+            UserModel().revoke_perm(self.u1, p)
+
+        UserModel().delete(self.u1)
+        Session().commit()
+
+    def test_add_perm(self):
+        perm = Permission.query().all()[0]
+        UserModel().grant_perm(self.u1, perm)
+        Session().commit()
+        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
+
+    def test_has_perm(self):
+        perm = Permission.query().all()
+        for p in perm:
+            has_p = UserModel().has_perm(self.u1, p)
+            self.assertEqual(False, has_p)
+
+    def test_revoke_perm(self):
+        perm = Permission.query().all()[0]
+        UserModel().grant_perm(self.u1, perm)
+        Session().commit()
+        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
+
+        #revoke
+        UserModel().revoke_perm(self.u1, perm)
+        Session().commit()
+        self.assertEqual(UserModel().has_perm(self.u1, perm), False)
--- a/rhodecode/tests/rhodecode_crawler.py	Sun Jul 01 16:22:38 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,184 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    rhodecode.tests.test_crawer
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-    Test for crawling a project for memory usage
-    This should be runned just as regular script together
-    with a watch script that will show memory usage.
-
-    watch -n1 ./rhodecode/tests/mem_watch
-
-    :created_on: Apr 21, 2010
-    :author: marcink
-    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
-    :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-
-import cookielib
-import urllib
-import urllib2
-import time
-import os
-import sys
-from os.path import join as jn
-from os.path import dirname as dn
-
-__here__ = os.path.abspath(__file__)
-__root__ = dn(dn(dn(__here__)))
-sys.path.append(__root__)
-
-from rhodecode.lib import vcs
-from rhodecode.lib.compat import OrderedSet
-from rhodecode.lib.vcs.exceptions import RepositoryError
-
-PASES = 3
-HOST = 'http://127.0.0.1'
-PORT = 5000
-BASE_URI = '%s:%s/' % (HOST, PORT)
-
-if len(sys.argv) == 2:
-    BASE_URI = sys.argv[1]
-
-if not BASE_URI.endswith('/'):
-    BASE_URI += '/'
-
-print 'Crawling @ %s' % BASE_URI
-BASE_URI += '%s'
-PROJECT_PATH = jn('/', 'home', 'marcink', 'hg_repos')
-PROJECTS = [
-    'linux-magx-pbranch',
-    'CPython',
-    'rhodecode_tip',
-]
-
-
-cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt')
-o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
-o.addheaders = [
-    ('User-agent', 'rhodecode-crawler'),
-    ('Accept-Language', 'en - us, en;q = 0.5')
-]
-
-urllib2.install_opener(o)
-
-
-def _get_repo(proj):
-    if isinstance(proj, basestring):
-        repo = vcs.get_repo(jn(PROJECT_PATH, proj))
-        proj = proj
-    else:
-        repo = proj
-        proj = repo.name
-
-    return repo, proj
-
-
-def test_changelog_walk(proj, pages=100):
-    repo, proj = _get_repo(proj)
-
-    total_time = 0
-    for i in range(1, pages):
-
-        page = '/'.join((proj, 'changelog',))
-
-        full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page':i})
-        s = time.time()
-        f = o.open(full_uri)
-        size = len(f.read())
-        e = time.time() - s
-        total_time += e
-        print 'visited %s size:%s req:%s ms' % (full_uri, size, e)
-
-    print 'total_time', total_time
-    print 'average on req', total_time / float(pages)
-
-
-def test_changeset_walk(proj, limit=None):
-    repo, proj = _get_repo(proj)
-
-    print 'processing', jn(PROJECT_PATH, proj)
-    total_time = 0
-
-    cnt = 0
-    for i in repo:
-        cnt += 1
-        raw_cs = '/'.join((proj, 'changeset', i.raw_id))
-        if limit and limit == cnt:
-            break
-
-        full_uri = (BASE_URI % raw_cs)
-        print '%s visiting %s\%s' % (cnt, full_uri, i)
-        s = time.time()
-        f = o.open(full_uri)
-        size = len(f.read())
-        e = time.time() - s
-        total_time += e
-        print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e)
-
-    print 'total_time', total_time
-    print 'average on req', total_time / float(cnt)
-
-
-def test_files_walk(proj, limit=100):
-    repo, proj = _get_repo(proj)
-
-    print 'processing', jn(PROJECT_PATH, proj)
-    total_time = 0
-
-    paths_ = OrderedSet([''])
-    try:
-        tip = repo.get_changeset('tip')
-        for topnode, dirs, files in tip.walk('/'):
-
-            for dir in dirs:
-                paths_.add(dir.path)
-                for f in dir:
-                    paths_.add(f.path)
-
-            for f in files:
-                paths_.add(f.path)
-
-    except RepositoryError, e:
-        pass
-
-    cnt = 0
-    for f in paths_:
-        cnt += 1
-        if limit and limit == cnt:
-            break
-
-        file_path = '/'.join((proj, 'files', 'tip', f))
-        full_uri = (BASE_URI % file_path)
-        print '%s visiting %s' % (cnt, full_uri)
-        s = time.time()
-        f = o.open(full_uri)
-        size = len(f.read())
-        e = time.time() - s
-        total_time += e
-        print '%s visited OK size:%s req:%s ms' % (cnt, size, e)
-
-    print 'total_time', total_time
-    print 'average on req', total_time / float(cnt)
-
-if __name__ == '__main__':
-    for path in PROJECTS:
-        repo = vcs.get_repo(jn(PROJECT_PATH, path))
-        for i in range(PASES):
-            print 'PASS %s/%s' % (i, PASES)
-            test_changelog_walk(repo, pages=80)
-            test_changeset_walk(repo, limit=100)
-            test_files_walk(repo, limit=100)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/scripts/mem_watch	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,1 @@
+ps -eo size,pid,user,command --sort -size | awk '{ hr=$1/1024 ; printf("%13.2f Mb ",hr) } { for ( x=4 ; x<=NF ; x++ ) { printf("%s ",$x) } print "" }'|grep [p]aster
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/scripts/test_concurency.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,213 @@
+# -*- coding: utf-8 -*-
+"""
+    rhodecode.tests.test_hg_operations
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Test suite for making push/pull operations
+
+    :created_on: Dec 30, 2010
+    :author: marcink
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import shutil
+import logging
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from paste.deploy import appconfig
+from pylons import config
+from sqlalchemy import engine_from_config
+
+from rhodecode.lib.utils import add_cache
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import User, Repository
+from rhodecode.lib.auth import get_crypt_password
+
+from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
+from rhodecode.config.environment import load_environment
+
+rel_path = dn(dn(dn(os.path.abspath(__file__))))
+conf = appconfig('config:development.ini', relative_to=rel_path)
+load_environment(conf.global_conf, conf.local_conf)
+
+add_cache(conf)
+
+USER = 'test_admin'
+PASS = 'test12'
+HOST = 'hg.local'
+METHOD = 'pull'
+DEBUG = True
+log = logging.getLogger(__name__)
+
+
+class Command(object):
+
+    def __init__(self, cwd):
+        self.cwd = cwd
+
+    def execute(self, cmd, *args):
+        """Runs command on the system with given ``args``.
+        """
+
+        command = cmd + ' ' + ' '.join(args)
+        log.debug('Executing %s' % command)
+        if DEBUG:
+            print command
+        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+        stdout, stderr = p.communicate()
+        if DEBUG:
+            print stdout, stderr
+        return stdout, stderr
+
+
+def get_session():
+    engine = engine_from_config(conf, 'sqlalchemy.db1.')
+    init_model(engine)
+    sa = meta.Session
+    return sa
+
+
+def create_test_user(force=True):
+    print 'creating test user'
+    sa = get_session()
+
+    user = sa.query(User).filter(User.username == USER).scalar()
+
+    if force and user is not None:
+        print 'removing current user'
+        for repo in sa.query(Repository).filter(Repository.user == user).all():
+            sa.delete(repo)
+        sa.delete(user)
+        sa.commit()
+
+    if user is None or force:
+        print 'creating new one'
+        new_usr = User()
+        new_usr.username = USER
+        new_usr.password = get_crypt_password(PASS)
+        new_usr.email = 'mail@mail.com'
+        new_usr.name = 'test'
+        new_usr.lastname = 'lasttestname'
+        new_usr.active = True
+        new_usr.admin = True
+        sa.add(new_usr)
+        sa.commit()
+
+    print 'done'
+
+
+def create_test_repo(force=True):
+    print 'creating test repo'
+    from rhodecode.model.repo import RepoModel
+    sa = get_session()
+
+    user = sa.query(User).filter(User.username == USER).scalar()
+    if user is None:
+        raise Exception('user not found')
+
+    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
+
+    if repo is None:
+        print 'repo not found creating'
+
+        form_data = {'repo_name':HG_REPO,
+                     'repo_type':'hg',
+                     'private':False,
+                     'clone_uri':'' }
+        rm = RepoModel(sa)
+        rm.base_path = '/home/hg'
+        rm.create(form_data, user)
+
+    print 'done'
+
+
+def set_anonymous_access(enable=True):
+    sa = get_session()
+    user = sa.query(User).filter(User.username == 'default').one()
+    user.active = enable
+    sa.add(user)
+    sa.commit()
+
+
+def get_anonymous_access():
+    sa = get_session()
+    return sa.query(User).filter(User.username == 'default').one().active
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
+                                seq=None):
+    cwd = path = jn(TESTS_TMP_PATH, repo)
+
+    if seq == None:
+        seq = _RandomNameSequence().next()
+
+    try:
+        shutil.rmtree(path, ignore_errors=True)
+        os.makedirs(path)
+        #print 'made dirs %s' % jn(path)
+    except OSError:
+        raise
+
+    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+                  {'user':USER,
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':repo, }
+
+    dest = path + seq
+    if method == 'pull':
+        stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
+    else:
+        stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
+
+        if no_errors is False:
+            assert """adding file changes""" in stdout, 'no messages about cloning'
+            assert """abort""" not in stderr , 'got error from clone'
+
+if __name__ == '__main__':
+    try:
+        create_test_user(force=False)
+        seq = None
+        import time
+
+        try:
+            METHOD = sys.argv[3]
+        except:
+            pass
+
+        if METHOD == 'pull':
+            seq = _RandomNameSequence().next()
+            test_clone_with_credentials(repo=sys.argv[1], method='clone',
+                                        seq=seq)
+        s = time.time()
+        for i in range(1, int(sys.argv[2]) + 1):
+            print 'take', i
+            test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
+                                        seq=seq)
+        print 'time taken %.3f' % (time.time() - s)
+    except Exception, e:
+        raise
+        sys.exit('stop on %s' % e)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/scripts/test_crawler.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+"""
+    rhodecode.tests.test_crawer
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Test for crawling a project for memory usage
+    This should be runned just as regular script together
+    with a watch script that will show memory usage.
+
+    watch -n1 ./rhodecode/tests/mem_watch
+
+    :created_on: Apr 21, 2010
+    :author: marcink
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import cookielib
+import urllib
+import urllib2
+import time
+import os
+import sys
+from os.path import join as jn
+from os.path import dirname as dn
+
+__here__ = os.path.abspath(__file__)
+__root__ = dn(dn(dn(__here__)))
+sys.path.append(__root__)
+
+from rhodecode.lib import vcs
+from rhodecode.lib.compat import OrderedSet
+from rhodecode.lib.vcs.exceptions import RepositoryError
+
+PASES = 3
+HOST = 'http://127.0.0.1'
+PORT = 5000
+BASE_URI = '%s:%s/' % (HOST, PORT)
+
+if len(sys.argv) == 2:
+    BASE_URI = sys.argv[1]
+
+if not BASE_URI.endswith('/'):
+    BASE_URI += '/'
+
+print 'Crawling @ %s' % BASE_URI
+BASE_URI += '%s'
+PROJECT_PATH = jn('/', 'home', 'marcink', 'hg_repos')
+PROJECTS = [
+    'linux-magx-pbranch',
+    'CPython',
+    'rhodecode_tip',
+]
+
+
+cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt')
+o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
+o.addheaders = [
+    ('User-agent', 'rhodecode-crawler'),
+    ('Accept-Language', 'en - us, en;q = 0.5')
+]
+
+urllib2.install_opener(o)
+
+
+def _get_repo(proj):
+    if isinstance(proj, basestring):
+        repo = vcs.get_repo(jn(PROJECT_PATH, proj))
+        proj = proj
+    else:
+        repo = proj
+        proj = repo.name
+
+    return repo, proj
+
+
+def test_changelog_walk(proj, pages=100):
+    repo, proj = _get_repo(proj)
+
+    total_time = 0
+    for i in range(1, pages):
+
+        page = '/'.join((proj, 'changelog',))
+
+        full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page':i})
+        s = time.time()
+        f = o.open(full_uri)
+        size = len(f.read())
+        e = time.time() - s
+        total_time += e
+        print 'visited %s size:%s req:%s ms' % (full_uri, size, e)
+
+    print 'total_time', total_time
+    print 'average on req', total_time / float(pages)
+
+
+def test_changeset_walk(proj, limit=None):
+    repo, proj = _get_repo(proj)
+
+    print 'processing', jn(PROJECT_PATH, proj)
+    total_time = 0
+
+    cnt = 0
+    for i in repo:
+        cnt += 1
+        raw_cs = '/'.join((proj, 'changeset', i.raw_id))
+        if limit and limit == cnt:
+            break
+
+        full_uri = (BASE_URI % raw_cs)
+        print '%s visiting %s\%s' % (cnt, full_uri, i)
+        s = time.time()
+        f = o.open(full_uri)
+        size = len(f.read())
+        e = time.time() - s
+        total_time += e
+        print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e)
+
+    print 'total_time', total_time
+    print 'average on req', total_time / float(cnt)
+
+
+def test_files_walk(proj, limit=100):
+    repo, proj = _get_repo(proj)
+
+    print 'processing', jn(PROJECT_PATH, proj)
+    total_time = 0
+
+    paths_ = OrderedSet([''])
+    try:
+        tip = repo.get_changeset('tip')
+        for topnode, dirs, files in tip.walk('/'):
+
+            for dir in dirs:
+                paths_.add(dir.path)
+                for f in dir:
+                    paths_.add(f.path)
+
+            for f in files:
+                paths_.add(f.path)
+
+    except RepositoryError, e:
+        pass
+
+    cnt = 0
+    for f in paths_:
+        cnt += 1
+        if limit and limit == cnt:
+            break
+
+        file_path = '/'.join((proj, 'files', 'tip', f))
+        full_uri = (BASE_URI % file_path)
+        print '%s visiting %s' % (cnt, full_uri)
+        s = time.time()
+        f = o.open(full_uri)
+        size = len(f.read())
+        e = time.time() - s
+        total_time += e
+        print '%s visited OK size:%s req:%s ms' % (cnt, size, e)
+
+    print 'total_time', total_time
+    print 'average on req', total_time / float(cnt)
+
+if __name__ == '__main__':
+    for path in PROJECTS:
+        repo = vcs.get_repo(jn(PROJECT_PATH, path))
+        for i in range(PASES):
+            print 'PASS %s/%s' % (i, PASES)
+            test_changelog_walk(repo, pages=80)
+            test_changeset_walk(repo, limit=100)
+            test_files_walk(repo, limit=100)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/scripts/test_scm_operations.py	Sun Jul 01 16:24:15 2012 +0200
@@ -0,0 +1,410 @@
+# -*- coding: utf-8 -*-
+"""
+    rhodecode.tests.test_hg_operations
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Test suite for making push/pull operations
+
+    :created_on: Dec 30, 2010
+    :author: marcink
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import sys
+import shutil
+import logging
+
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from paste.deploy import appconfig
+from pylons import config
+from sqlalchemy import engine_from_config
+
+from rhodecode.lib.utils import add_cache
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import User, Repository, UserLog
+from rhodecode.lib.auth import get_crypt_password
+
+from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
+from rhodecode.config.environment import load_environment
+
+rel_path = dn(dn(dn(os.path.abspath(__file__))))
+
+conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
+load_environment(conf.global_conf, conf.local_conf)
+
+add_cache(conf)
+
+USER = 'test_admin'
+PASS = 'test12'
+HOST = '127.0.0.1:5000'
+DEBUG = False
+print 'DEBUG:', DEBUG
+log = logging.getLogger(__name__)
+
+engine = engine_from_config(conf, 'sqlalchemy.db1.')
+init_model(engine)
+sa = meta.Session()
+
+
+class Command(object):
+
+    def __init__(self, cwd):
+        self.cwd = cwd
+
+    def execute(self, cmd, *args):
+        """Runs command on the system with given ``args``.
+        """
+
+        command = cmd + ' ' + ' '.join(args)
+        log.debug('Executing %s' % command)
+        if DEBUG:
+            print command
+        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+        stdout, stderr = p.communicate()
+        if DEBUG:
+            print stdout, stderr
+        return stdout, stderr
+
+
+def test_wrapp(func):
+
+    def __wrapp(*args, **kwargs):
+        print '>>>%s' % func.__name__
+        try:
+            res = func(*args, **kwargs)
+        except Exception, e:
+            print ('###############\n-'
+                   '--%s failed %s--\n'
+                   '###############\n' % (func.__name__, e))
+            sys.exit()
+        print '++OK++'
+        return res
+    return __wrapp
+
+
+def create_test_user(force=True):
+    print '\tcreating test user'
+
+    user = User.get_by_username(USER)
+
+    if force and user is not None:
+        print '\tremoving current user'
+        for repo in Repository.query().filter(Repository.user == user).all():
+            sa.delete(repo)
+        sa.delete(user)
+        sa.commit()
+
+    if user is None or force:
+        print '\tcreating new one'
+        new_usr = User()
+        new_usr.username = USER
+        new_usr.password = get_crypt_password(PASS)
+        new_usr.email = 'mail@mail.com'
+        new_usr.name = 'test'
+        new_usr.lastname = 'lasttestname'
+        new_usr.active = True
+        new_usr.admin = True
+        sa.add(new_usr)
+        sa.commit()
+
+    print '\tdone'
+
+
+def create_test_repo(force=True):
+    from rhodecode.model.repo import RepoModel
+
+    user = User.get_by_username(USER)
+    if user is None:
+        raise Exception('user not found')
+
+    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
+
+    if repo is None:
+        print '\trepo not found creating'
+
+        form_data = {'repo_name':HG_REPO,
+                     'repo_type':'hg',
+                     'private':False,
+                     'clone_uri':'' }
+        rm = RepoModel(sa)
+        rm.base_path = '/home/hg'
+        rm.create(form_data, user)
+
+
+def set_anonymous_access(enable=True):
+    user = User.get_by_username('default')
+    user.active = enable
+    sa.add(user)
+    sa.commit()
+    print '\tanonymous access is now:', enable
+    if enable != User.get_by_username('default').active:
+        raise Exception('Cannot set anonymous access')
+
+
+def get_anonymous_access():
+    user = User.get_by_username('default')
+    return user.active
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+@test_wrapp
+def test_clone_with_credentials(no_errors=False):
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+    try:
+        shutil.rmtree(path, ignore_errors=True)
+        os.makedirs(path)
+        #print 'made dirs %s' % jn(path)
+    except OSError:
+        raise
+
+    print '\tchecking if anonymous access is enabled'
+    anonymous_access = get_anonymous_access()
+    if anonymous_access:
+        print '\tenabled, disabling it '
+        set_anonymous_access(enable=False)
+
+    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
+                  {'user':USER,
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO,
+                   'dest':path}
+
+    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+    if no_errors is False:
+        assert """adding file changes""" in stdout, 'no messages about cloning'
+        assert """abort""" not in stderr , 'got error from clone'
+
+
+@test_wrapp
+def test_clone_anonymous():
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+    try:
+        shutil.rmtree(path, ignore_errors=True)
+        os.makedirs(path)
+        #print 'made dirs %s' % jn(path)
+    except OSError:
+        raise
+
+
+    print '\tchecking if anonymous access is enabled'
+    anonymous_access = get_anonymous_access()
+    if not anonymous_access:
+        print '\tnot enabled, enabling it '
+        set_anonymous_access(enable=True)
+
+    clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
+                  {'user':USER,
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO,
+                   'dest':path}
+
+    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+    assert """adding file changes""" in stdout, 'no messages about cloning'
+    assert """abort""" not in stderr , 'got error from clone'
+
+    #disable if it was enabled
+    if not anonymous_access:
+        print '\tdisabling anonymous access'
+        set_anonymous_access(enable=False)
+
+
+@test_wrapp
+def test_clone_wrong_credentials():
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+    try:
+        shutil.rmtree(path, ignore_errors=True)
+        os.makedirs(path)
+        #print 'made dirs %s' % jn(path)
+    except OSError:
+        raise
+
+    print '\tchecking if anonymous access is enabled'
+    anonymous_access = get_anonymous_access()
+    if anonymous_access:
+        print '\tenabled, disabling it '
+        set_anonymous_access(enable=False)
+
+    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
+                  {'user':USER + 'error',
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO,
+                   'dest':path}
+
+    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+    if not """abort: authorization failed"""  in stderr:
+        raise Exception('Failure')
+
+
+@test_wrapp
+def test_pull():
+    pass
+
+
+@test_wrapp
+def test_push_modify_file(f_name='setup.py'):
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+    modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
+    for i in xrange(5):
+        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
+        Command(cwd).execute(cmd)
+
+        cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
+        Command(cwd).execute(cmd)
+
+    Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
+
+
+@test_wrapp
+def test_push_new_file(commits=15, with_clone=True):
+
+    if with_clone:
+        test_clone_with_credentials(no_errors=True)
+
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+
+    Command(cwd).execute('touch %s' % added_file)
+
+    Command(cwd).execute('hg add %s' % added_file)
+
+    for i in xrange(commits):
+        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+        Command(cwd).execute(cmd)
+
+        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
+                                'Marcin Kuźminski <marcin@python-blog.com>',
+                                added_file)
+        Command(cwd).execute(cmd)
+
+    push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+                  {'user':USER,
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO,
+                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+    Command(cwd).execute('hg push --verbose --debug %s' % push_url)
+
+
+@test_wrapp
+def test_push_wrong_credentials():
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+                  {'user':USER + 'xxx',
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO,
+                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+    modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
+    for i in xrange(5):
+        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
+        Command(cwd).execute(cmd)
+
+        cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
+        Command(cwd).execute(cmd)
+
+    Command(cwd).execute('hg push %s' % clone_url)
+
+
+@test_wrapp
+def test_push_wrong_path():
+    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+    added_file = jn(path, 'somefile.py')
+
+    try:
+        shutil.rmtree(path, ignore_errors=True)
+        os.makedirs(path)
+        print '\tmade dirs %s' % jn(path)
+    except OSError:
+        raise
+
+    Command(cwd).execute("""echo '' > %s""" % added_file)
+    Command(cwd).execute("""hg init %s""" % path)
+    Command(cwd).execute("""hg add %s""" % added_file)
+
+    for i in xrange(2):
+        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+        Command(cwd).execute(cmd)
+
+        cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
+        Command(cwd).execute(cmd)
+
+    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+                  {'user':USER,
+                   'pass':PASS,
+                   'host':HOST,
+                   'cloned_repo':HG_REPO + '_error',
+                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+    stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
+    if not """abort: HTTP Error 403: Forbidden"""  in stderr:
+        raise Exception('Failure')
+
+
+@test_wrapp
+def get_logs():
+    return UserLog.query().all()
+
+
+@test_wrapp
+def test_logs(initial):
+    logs = UserLog.query().all()
+    operations = 4
+    if len(initial) + operations != len(logs):
+        raise Exception("missing number of logs initial:%s vs current:%s" % \
+                            (len(initial), len(logs)))
+
+
+if __name__ == '__main__':
+    create_test_user(force=False)
+    create_test_repo()
+
+    initial_logs = get_logs()
+    print 'initial activity logs: %s' % len(initial_logs)
+    s = time.time()
+    #test_push_modify_file()
+    test_clone_with_credentials()
+    test_clone_wrong_credentials()
+
+    test_push_new_file(commits=2, with_clone=True)
+
+    test_clone_anonymous()
+    test_push_wrong_path()
+
+    test_push_wrong_credentials()
+
+    test_logs(initial_logs)
+    print 'finished ok in %.3f' % (time.time() - s)
--- a/rhodecode/tests/test_hg_operations.py	Sun Jul 01 16:22:38 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,401 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    rhodecode.tests.test_hg_operations
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-    Test suite for making push/pull operations
-
-    :created_on: Dec 30, 2010
-    :author: marcink
-    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
-    :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import time
-import sys
-import shutil
-import logging
-
-from os.path import join as jn
-from os.path import dirname as dn
-
-from tempfile import _RandomNameSequence
-from subprocess import Popen, PIPE
-
-from paste.deploy import appconfig
-from pylons import config
-from sqlalchemy import engine_from_config
-
-from rhodecode.lib.utils import add_cache
-from rhodecode.model import init_model
-from rhodecode.model import meta
-from rhodecode.model.db import User, Repository, UserLog
-from rhodecode.lib.auth import get_crypt_password
-
-from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
-from rhodecode.config.environment import load_environment
-
-rel_path = dn(dn(dn(os.path.abspath(__file__))))
-
-conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
-load_environment(conf.global_conf, conf.local_conf)
-
-add_cache(conf)
-
-USER = 'test_admin'
-PASS = 'test12'
-HOST = '127.0.0.1:5000'
-DEBUG = False
-print 'DEBUG:', DEBUG
-log = logging.getLogger(__name__)
-
-engine = engine_from_config(conf, 'sqlalchemy.db1.')
-init_model(engine)
-sa = meta.Session
-
-class Command(object):
-
-    def __init__(self, cwd):
-        self.cwd = cwd
-
-    def execute(self, cmd, *args):
-        """Runs command on the system with given ``args``.
-        """
-
-        command = cmd + ' ' + ' '.join(args)
-        log.debug('Executing %s' % command)
-        if DEBUG:
-            print command
-        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
-        stdout, stderr = p.communicate()
-        if DEBUG:
-            print stdout, stderr
-        return stdout, stderr
-
-
-def test_wrapp(func):
-
-    def __wrapp(*args, **kwargs):
-        print '>>>%s' % func.__name__
-        try:
-            res = func(*args, **kwargs)
-        except Exception, e:
-            print ('###############\n-'
-                   '--%s failed %s--\n'
-                   '###############\n' % (func.__name__, e))
-            sys.exit()
-        print '++OK++'
-        return res
-    return __wrapp
-
-
-def create_test_user(force=True):
-    print '\tcreating test user'
-
-    user = User.get_by_username(USER)
-
-    if force and user is not None:
-        print '\tremoving current user'
-        for repo in Repository.query().filter(Repository.user == user).all():
-            sa.delete(repo)
-        sa.delete(user)
-        sa.commit()
-
-    if user is None or force:
-        print '\tcreating new one'
-        new_usr = User()
-        new_usr.username = USER
-        new_usr.password = get_crypt_password(PASS)
-        new_usr.email = 'mail@mail.com'
-        new_usr.name = 'test'
-        new_usr.lastname = 'lasttestname'
-        new_usr.active = True
-        new_usr.admin = True
-        sa.add(new_usr)
-        sa.commit()
-
-    print '\tdone'
-
-
-def create_test_repo(force=True):
-    from rhodecode.model.repo import RepoModel
-
-    user = User.get_by_username(USER)
-    if user is None:
-        raise Exception('user not found')
-
-
-    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
-
-    if repo is None:
-        print '\trepo not found creating'
-
-        form_data = {'repo_name':HG_REPO,
-                     'repo_type':'hg',
-                     'private':False,
-                     'clone_uri':'' }
-        rm = RepoModel(sa)
-        rm.base_path = '/home/hg'
-        rm.create(form_data, user)
-
-
-def set_anonymous_access(enable=True):
-    user = User.get_by_username('default')
-    user.active = enable
-    sa.add(user)
-    sa.commit()
-    print '\tanonymous access is now:', enable
-    if enable != User.get_by_username('default').active:
-        raise Exception('Cannot set anonymous access')
-
-def get_anonymous_access():
-    user = User.get_by_username('default')
-    return user.active
-
-
-#==============================================================================
-# TESTS
-#==============================================================================
-@test_wrapp
-def test_clone_with_credentials(no_errors=False):
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if anonymous_access:
-        print '\tenabled, disabling it '
-        set_anonymous_access(enable=False)
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
-
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
-    if no_errors is False:
-        assert """adding file changes""" in stdout, 'no messages about cloning'
-        assert """abort""" not in stderr , 'got error from clone'
-
-
-@test_wrapp
-def test_clone_anonymous():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
-
-
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if not anonymous_access:
-        print '\tnot enabled, enabling it '
-        set_anonymous_access(enable=True)
-
-    clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
-
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
-    assert """adding file changes""" in stdout, 'no messages about cloning'
-    assert """abort""" not in stderr , 'got error from clone'
-
-    #disable if it was enabled
-    if not anonymous_access:
-        print '\tdisabling anonymous access'
-        set_anonymous_access(enable=False)
-
-@test_wrapp
-def test_clone_wrong_credentials():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if anonymous_access:
-        print '\tenabled, disabling it '
-        set_anonymous_access(enable=False)
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER + 'error',
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
-
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
-    if not """abort: authorization failed"""  in stderr:
-        raise Exception('Failure')
-
-@test_wrapp
-def test_pull():
-    pass
-
-@test_wrapp
-def test_push_modify_file(f_name='setup.py'):
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
-    for i in xrange(5):
-        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-        cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-    Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
-
-@test_wrapp
-def test_push_new_file(commits=15, with_clone=True):
-
-    if with_clone:
-        test_clone_with_credentials(no_errors=True)
-
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
-
-    Command(cwd).execute('touch %s' % added_file)
-
-    Command(cwd).execute('hg add %s' % added_file)
-
-    for i in xrange(commits):
-        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
-        Command(cwd).execute(cmd)
-
-        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
-                                'Marcin Kuźminski <marcin@python-blog.com>',
-                                added_file)
-        Command(cwd).execute(cmd)
-
-    push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    Command(cwd).execute('hg push --verbose --debug %s' % push_url)
-
-@test_wrapp
-def test_push_wrong_credentials():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER + 'xxx',
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
-    for i in xrange(5):
-        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-        cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-    Command(cwd).execute('hg push %s' % clone_url)
-
-@test_wrapp
-def test_push_wrong_path():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    added_file = jn(path, 'somefile.py')
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        print '\tmade dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    Command(cwd).execute("""echo '' > %s""" % added_file)
-    Command(cwd).execute("""hg init %s""" % path)
-    Command(cwd).execute("""hg add %s""" % added_file)
-
-    for i in xrange(2):
-        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
-        Command(cwd).execute(cmd)
-
-        cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
-        Command(cwd).execute(cmd)
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO + '_error',
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
-    if not """abort: HTTP Error 403: Forbidden"""  in stderr:
-        raise Exception('Failure')
-
-@test_wrapp
-def get_logs():
-    return UserLog.query().all()
-
-@test_wrapp
-def test_logs(initial):
-    logs = UserLog.query().all()
-    operations = 4
-    if len(initial) + operations != len(logs):
-        raise Exception("missing number of logs initial:%s vs current:%s" % \
-                            (len(initial), len(logs)))
-
-
-if __name__ == '__main__':
-    create_test_user(force=False)
-    create_test_repo()
-
-    initial_logs = get_logs()
-    print 'initial activity logs: %s' % len(initial_logs)
-    s = time.time()
-    #test_push_modify_file()
-    test_clone_with_credentials()
-    test_clone_wrong_credentials()
-
-    test_push_new_file(commits=2, with_clone=True)
-
-    test_clone_anonymous()
-    test_push_wrong_path()
-
-    test_push_wrong_credentials()
-
-    test_logs(initial_logs)
-    print 'finished ok in %.3f' % (time.time() - s)
--- a/rhodecode/tests/test_models.py	Sun Jul 01 16:22:38 2012 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,761 +0,0 @@
-import os
-import unittest
-from rhodecode.tests import *
-
-from rhodecode.model.repos_group import ReposGroupModel
-from rhodecode.model.repo import RepoModel
-from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
-    UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
-    Repository, UserEmailMap
-from sqlalchemy.exc import IntegrityError, DatabaseError
-from rhodecode.model.user import UserModel
-
-from rhodecode.model.meta import Session
-from rhodecode.model.notification import NotificationModel
-from rhodecode.model.users_group import UsersGroupModel
-from rhodecode.lib.auth import AuthUser
-
-
-def _make_group(path, desc='desc', parent_id=None,
-                 skip_if_exists=False):
-
-    gr = RepoGroup.get_by_group_name(path)
-    if gr and skip_if_exists:
-        return gr
-
-    gr = ReposGroupModel().create(path, desc, parent_id)
-    return gr
-
-
-class TestReposGroups(unittest.TestCase):
-
-    def setUp(self):
-        self.g1 = _make_group('test1', skip_if_exists=True)
-        Session.commit()
-        self.g2 = _make_group('test2', skip_if_exists=True)
-        Session.commit()
-        self.g3 = _make_group('test3', skip_if_exists=True)
-        Session.commit()
-
-    def tearDown(self):
-        print 'out'
-
-    def __check_path(self, *path):
-        """
-        Checks the path for existance !
-        """
-        path = [TESTS_TMP_PATH] + list(path)
-        path = os.path.join(*path)
-        return os.path.isdir(path)
-
-    def _check_folders(self):
-        print os.listdir(TESTS_TMP_PATH)
-
-    def __delete_group(self, id_):
-        ReposGroupModel().delete(id_)
-
-    def __update_group(self, id_, path, desc='desc', parent_id=None):
-        form_data = dict(
-            group_name=path,
-            group_description=desc,
-            group_parent_id=parent_id,
-            perms_updates=[],
-            perms_new=[]
-        )
-        gr = ReposGroupModel().update(id_, form_data)
-        return gr
-
-    def test_create_group(self):
-        g = _make_group('newGroup')
-        self.assertEqual(g.full_path, 'newGroup')
-
-        self.assertTrue(self.__check_path('newGroup'))
-
-    def test_create_same_name_group(self):
-        self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
-        Session.rollback()
-
-    def test_same_subgroup(self):
-        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
-        self.assertEqual(sg1.parent_group, self.g1)
-        self.assertEqual(sg1.full_path, 'test1/sub1')
-        self.assertTrue(self.__check_path('test1', 'sub1'))
-
-        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
-        self.assertEqual(ssg1.parent_group, sg1)
-        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
-        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
-
-    def test_remove_group(self):
-        sg1 = _make_group('deleteme')
-        self.__delete_group(sg1.group_id)
-
-        self.assertEqual(RepoGroup.get(sg1.group_id), None)
-        self.assertFalse(self.__check_path('deteteme'))
-
-        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
-        self.__delete_group(sg1.group_id)
-
-        self.assertEqual(RepoGroup.get(sg1.group_id), None)
-        self.assertFalse(self.__check_path('test1', 'deteteme'))
-
-    def test_rename_single_group(self):
-        sg1 = _make_group('initial')
-
-        new_sg1 = self.__update_group(sg1.group_id, 'after')
-        self.assertTrue(self.__check_path('after'))
-        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
-
-    def test_update_group_parent(self):
-
-        sg1 = _make_group('initial', parent_id=self.g1.group_id)
-
-        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
-        self.assertTrue(self.__check_path('test1', 'after'))
-        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
-
-        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
-        self.assertTrue(self.__check_path('test3', 'after'))
-        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
-
-        new_sg1 = self.__update_group(sg1.group_id, 'hello')
-        self.assertTrue(self.__check_path('hello'))
-
-        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
-
-    def test_subgrouping_with_repo(self):
-
-        g1 = _make_group('g1')
-        g2 = _make_group('g2')
-
-        # create new repo
-        form_data = dict(repo_name='john',
-                         repo_name_full='john',
-                         fork_name=None,
-                         description=None,
-                         repo_group=None,
-                         private=False,
-                         repo_type='hg',
-                         clone_uri=None,
-                         landing_rev='tip')
-        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
-        r = RepoModel().create(form_data, cur_user)
-
-        self.assertEqual(r.repo_name, 'john')
-
-        # put repo into group
-        form_data = form_data
-        form_data['repo_group'] = g1.group_id
-        form_data['perms_new'] = []
-        form_data['perms_updates'] = []
-        RepoModel().update(r.repo_name, form_data)
-        self.assertEqual(r.repo_name, 'g1/john')
-
-        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
-        self.assertTrue(self.__check_path('g2', 'g1'))
-
-        # test repo
-        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
-
-    def test_move_to_root(self):
-        g1 = _make_group('t11')
-        Session.commit()
-        g2 = _make_group('t22', parent_id=g1.group_id)
-        Session.commit()
-
-        self.assertEqual(g2.full_path, 't11/t22')
-        self.assertTrue(self.__check_path('t11', 't22'))
-
-        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
-        Session.commit()
-
-        self.assertEqual(g2.group_name, 'g22')
-        # we moved out group from t1 to '' so it's full path should be 'g2'
-        self.assertEqual(g2.full_path, 'g22')
-        self.assertFalse(self.__check_path('t11', 't22'))
-        self.assertTrue(self.__check_path('g22'))
-
-
-class TestUser(unittest.TestCase):
-    def __init__(self, methodName='runTest'):
-        Session.remove()
-        super(TestUser, self).__init__(methodName=methodName)
-
-    def test_create_and_remove(self):
-        usr = UserModel().create_or_update(username=u'test_user',
-                                           password=u'qweqwe',
-                                     email=u'u232@rhodecode.org',
-                                     firstname=u'u1', lastname=u'u1')
-        Session.commit()
-        self.assertEqual(User.get_by_username(u'test_user'), usr)
-
-        # make users group
-        users_group = UsersGroupModel().create('some_example_group')
-        Session.commit()
-
-        UsersGroupModel().add_user_to_group(users_group, usr)
-        Session.commit()
-
-        self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
-        self.assertEqual(UsersGroupMember.query().count(), 1)
-        UserModel().delete(usr.user_id)
-        Session.commit()
-
-        self.assertEqual(UsersGroupMember.query().all(), [])
-
-    def test_additonal_email_as_main(self):
-        usr = UserModel().create_or_update(username=u'test_user',
-                                           password=u'qweqwe',
-                                     email=u'main_email@rhodecode.org',
-                                     firstname=u'u1', lastname=u'u1')
-        Session.commit()
-
-        def do():
-            m = UserEmailMap()
-            m.email = u'main_email@rhodecode.org'
-            m.user = usr
-            Session.add(m)
-            Session.commit()
-        self.assertRaises(AttributeError, do)
-
-        UserModel().delete(usr.user_id)
-        Session.commit()
-
-    def test_extra_email_map(self):
-        usr = UserModel().create_or_update(username=u'test_user',
-                                           password=u'qweqwe',
-                                     email=u'main_email@rhodecode.org',
-                                     firstname=u'u1', lastname=u'u1')
-        Session.commit()
-
-        m = UserEmailMap()
-        m.email = u'main_email2@rhodecode.org'
-        m.user = usr
-        Session.add(m)
-        Session.commit()
-
-        u = User.get_by_email(email='main_email@rhodecode.org')
-        self.assertEqual(usr.user_id, u.user_id)
-        self.assertEqual(usr.username, u.username)
-
-        u = User.get_by_email(email='main_email2@rhodecode.org')
-        self.assertEqual(usr.user_id, u.user_id)
-        self.assertEqual(usr.username, u.username)
-        u = User.get_by_email(email='main_email3@rhodecode.org')
-        self.assertEqual(None, u)
-
-        UserModel().delete(usr.user_id)
-        Session.commit()
-
-
-class TestNotifications(unittest.TestCase):
-
-    def __init__(self, methodName='runTest'):
-        Session.remove()
-        self.u1 = UserModel().create_or_update(username=u'u1',
-                                        password=u'qweqwe',
-                                        email=u'u1@rhodecode.org',
-                                        firstname=u'u1', lastname=u'u1')
-        Session.commit()
-        self.u1 = self.u1.user_id
-
-        self.u2 = UserModel().create_or_update(username=u'u2',
-                                        password=u'qweqwe',
-                                        email=u'u2@rhodecode.org',
-                                        firstname=u'u2', lastname=u'u3')
-        Session.commit()
-        self.u2 = self.u2.user_id
-
-        self.u3 = UserModel().create_or_update(username=u'u3',
-                                        password=u'qweqwe',
-                                        email=u'u3@rhodecode.org',
-                                        firstname=u'u3', lastname=u'u3')
-        Session.commit()
-        self.u3 = self.u3.user_id
-
-        super(TestNotifications, self).__init__(methodName=methodName)
-
-    def _clean_notifications(self):
-        for n in Notification.query().all():
-            Session.delete(n)
-
-        Session.commit()
-        self.assertEqual(Notification.query().all(), [])
-
-    def tearDown(self):
-        self._clean_notifications()
-
-    def test_create_notification(self):
-        self.assertEqual([], Notification.query().all())
-        self.assertEqual([], UserNotification.query().all())
-
-        usrs = [self.u1, self.u2]
-        notification = NotificationModel().create(created_by=self.u1,
-                                           subject=u'subj', body=u'hi there',
-                                           recipients=usrs)
-        Session.commit()
-        u1 = User.get(self.u1)
-        u2 = User.get(self.u2)
-        u3 = User.get(self.u3)
-        notifications = Notification.query().all()
-        self.assertEqual(len(notifications), 1)
-
-        unotification = UserNotification.query()\
-            .filter(UserNotification.notification == notification).all()
-
-        self.assertEqual(notifications[0].recipients, [u1, u2])
-        self.assertEqual(notification.notification_id,
-                         notifications[0].notification_id)
-        self.assertEqual(len(unotification), len(usrs))
-        self.assertEqual([x.user.user_id for x in unotification], usrs)
-
-    def test_user_notifications(self):
-        self.assertEqual([], Notification.query().all())
-        self.assertEqual([], UserNotification.query().all())
-
-        notification1 = NotificationModel().create(created_by=self.u1,
-                                            subject=u'subj', body=u'hi there1',
-                                            recipients=[self.u3])
-        Session.commit()
-        notification2 = NotificationModel().create(created_by=self.u1,
-                                            subject=u'subj', body=u'hi there2',
-                                            recipients=[self.u3])
-        Session.commit()
-        u3 = Session.query(User).get(self.u3)
-
-        self.assertEqual(sorted([x.notification for x in u3.notifications]),
-                         sorted([notification2, notification1]))
-
-    def test_delete_notifications(self):
-        self.assertEqual([], Notification.query().all())
-        self.assertEqual([], UserNotification.query().all())
-
-        notification = NotificationModel().create(created_by=self.u1,
-                                           subject=u'title', body=u'hi there3',
-                                    recipients=[self.u3, self.u1, self.u2])
-        Session.commit()
-        notifications = Notification.query().all()
-        self.assertTrue(notification in notifications)
-
-        Notification.delete(notification.notification_id)
-        Session.commit()
-
-        notifications = Notification.query().all()
-        self.assertFalse(notification in notifications)
-
-        un = UserNotification.query().filter(UserNotification.notification
-                                             == notification).all()
-        self.assertEqual(un, [])
-
-    def test_delete_association(self):
-
-        self.assertEqual([], Notification.query().all())
-        self.assertEqual([], UserNotification.query().all())
-
-        notification = NotificationModel().create(created_by=self.u1,
-                                           subject=u'title', body=u'hi there3',
-                                    recipients=[self.u3, self.u1, self.u2])
-        Session.commit()
-
-        unotification = UserNotification.query()\
-                            .filter(UserNotification.notification ==
-                                    notification)\
-                            .filter(UserNotification.user_id == self.u3)\
-                            .scalar()
-
-        self.assertEqual(unotification.user_id, self.u3)
-
-        NotificationModel().delete(self.u3,
-                                   notification.notification_id)
-        Session.commit()
-
-        u3notification = UserNotification.query()\
-                            .filter(UserNotification.notification ==
-                                    notification)\
-                            .filter(UserNotification.user_id == self.u3)\
-                            .scalar()
-
-        self.assertEqual(u3notification, None)
-
-        # notification object is still there
-        self.assertEqual(Notification.query().all(), [notification])
-
-        #u1 and u2 still have assignments
-        u1notification = UserNotification.query()\
-                            .filter(UserNotification.notification ==
-                                    notification)\
-                            .filter(UserNotification.user_id == self.u1)\
-                            .scalar()
-        self.assertNotEqual(u1notification, None)
-        u2notification = UserNotification.query()\
-                            .filter(UserNotification.notification ==
-                                    notification)\
-                            .filter(UserNotification.user_id == self.u2)\
-                            .scalar()
-        self.assertNotEqual(u2notification, None)
-
-    def test_notification_counter(self):
-        self._clean_notifications()
-        self.assertEqual([], Notification.query().all())
-        self.assertEqual([], UserNotification.query().all())
-
-        NotificationModel().create(created_by=self.u1,
-                            subject=u'title', body=u'hi there_delete',
-                            recipients=[self.u3, self.u1])
-        Session.commit()
-
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u1), 1)
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u2), 0)
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u3), 1)
-
-        notification = NotificationModel().create(created_by=self.u1,
-                                           subject=u'title', body=u'hi there3',
-                                    recipients=[self.u3, self.u1, self.u2])
-        Session.commit()
-
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u1), 2)
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u2), 1)
-        self.assertEqual(NotificationModel()
-                         .get_unread_cnt_for_user(self.u3), 2)
-
-
-class TestUsers(unittest.TestCase):
-
-    def __init__(self, methodName='runTest'):
-        super(TestUsers, self).__init__(methodName=methodName)
-
-    def setUp(self):
-        self.u1 = UserModel().create_or_update(username=u'u1',
-                                        password=u'qweqwe',
-                                        email=u'u1@rhodecode.org',
-                                        firstname=u'u1', lastname=u'u1')
-
-    def tearDown(self):
-        perm = Permission.query().all()
-        for p in perm:
-            UserModel().revoke_perm(self.u1, p)
-
-        UserModel().delete(self.u1)
-        Session.commit()
-
-    def test_add_perm(self):
-        perm = Permission.query().all()[0]
-        UserModel().grant_perm(self.u1, perm)
-        Session.commit()
-        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
-
-    def test_has_perm(self):
-        perm = Permission.query().all()
-        for p in perm:
-            has_p = UserModel().has_perm(self.u1, p)
-            self.assertEqual(False, has_p)
-
-    def test_revoke_perm(self):
-        perm = Permission.query().all()[0]
-        UserModel().grant_perm(self.u1, perm)
-        Session.commit()
-        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
-
-        #revoke
-        UserModel().revoke_perm(self.u1, perm)
-        Session.commit()
-        self.assertEqual(UserModel().has_perm(self.u1, perm), False)
-
-
-class TestPermissions(unittest.TestCase):
-    def __init__(self, methodName='runTest'):
-        super(TestPermissions, self).__init__(methodName=methodName)
-
-    def setUp(self):
-        self.u1 = UserModel().create_or_update(
-            username=u'u1', password=u'qweqwe',
-            email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
-        )
-        self.u2 = UserModel().create_or_update(
-            username=u'u2', password=u'qweqwe',
-            email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
-        )
-        self.anon = User.get_by_username('default')
-        self.a1 = UserModel().create_or_update(
-            username=u'a1', password=u'qweqwe',
-            email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
-        )
-        Session.commit()
-
-    def tearDown(self):
-        if hasattr(self, 'test_repo'):
-            RepoModel().delete(repo=self.test_repo)
-        UserModel().delete(self.u1)
-        UserModel().delete(self.u2)
-        UserModel().delete(self.a1)
-        if hasattr(self, 'g1'):
-            ReposGroupModel().delete(self.g1.group_id)
-        if hasattr(self, 'g2'):
-            ReposGroupModel().delete(self.g2.group_id)
-
-        if hasattr(self, 'ug1'):
-            UsersGroupModel().delete(self.ug1, force=True)
-
-        Session.commit()
-
-    def test_default_perms_set(self):
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        perms = {
-            'repositories_groups': {},
-            'global': set([u'hg.create.repository', u'repository.read',
-                           u'hg.register.manual_activate']),
-            'repositories': {u'vcs_test_hg': u'repository.read'}
-        }
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         perms['repositories'][HG_REPO])
-        new_perm = 'repository.write'
-        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
-        Session.commit()
-
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm)
-
-    def test_default_admin_perms_set(self):
-        a1_auth = AuthUser(user_id=self.a1.user_id)
-        perms = {
-            'repositories_groups': {},
-            'global': set([u'hg.admin']),
-            'repositories': {u'vcs_test_hg': u'repository.admin'}
-        }
-        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
-                         perms['repositories'][HG_REPO])
-        new_perm = 'repository.write'
-        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm)
-        Session.commit()
-        # cannot really downgrade admins permissions !? they still get's set as
-        # admin !
-        u1_auth = AuthUser(user_id=self.a1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         perms['repositories'][HG_REPO])
-
-    def test_default_group_perms(self):
-        self.g1 = _make_group('test1', skip_if_exists=True)
-        self.g2 = _make_group('test2', skip_if_exists=True)
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        perms = {
-            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
-            'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
-            'repositories': {u'vcs_test_hg': u'repository.read'}
-        }
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         perms['repositories'][HG_REPO])
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         perms['repositories_groups'])
-
-    def test_default_admin_group_perms(self):
-        self.g1 = _make_group('test1', skip_if_exists=True)
-        self.g2 = _make_group('test2', skip_if_exists=True)
-        a1_auth = AuthUser(user_id=self.a1.user_id)
-        perms = {
-            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
-            'global': set(['hg.admin']),
-            'repositories': {u'vcs_test_hg': 'repository.admin'}
-        }
-
-        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
-                         perms['repositories'][HG_REPO])
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                         perms['repositories_groups'])
-
-    def test_propagated_permission_from_users_group(self):
-        # make group
-        self.ug1 = UsersGroupModel().create('G1')
-        # add user to group
-        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
-
-        # set permission to lower
-        new_perm = 'repository.none'
-        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
-        Session.commit()
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         new_perm)
-
-        # grant perm for group this should override permission from user
-        new_perm = 'repository.write'
-        RepoModel().grant_users_group_permission(repo=HG_REPO,
-                                                 group_name=self.ug1,
-                                                 perm=new_perm)
-        # check perms
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        perms = {
-            'repositories_groups': {},
-            'global': set([u'hg.create.repository', u'repository.read',
-                           u'hg.register.manual_activate']),
-            'repositories': {u'vcs_test_hg': u'repository.read'}
-        }
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         new_perm)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         perms['repositories_groups'])
-
-    def test_propagated_permission_from_users_group_lower_weight(self):
-        # make group
-        self.ug1 = UsersGroupModel().create('G1')
-        # add user to group
-        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
-
-        # set permission to lower
-        new_perm_h = 'repository.write'
-        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
-                                          perm=new_perm_h)
-        Session.commit()
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         new_perm_h)
-
-        # grant perm for group this should NOT override permission from user
-        # since it's lower than granted
-        new_perm_l = 'repository.read'
-        RepoModel().grant_users_group_permission(repo=HG_REPO,
-                                                 group_name=self.ug1,
-                                                 perm=new_perm_l)
-        # check perms
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        perms = {
-            'repositories_groups': {},
-            'global': set([u'hg.create.repository', u'repository.read',
-                           u'hg.register.manual_activate']),
-            'repositories': {u'vcs_test_hg': u'repository.write'}
-        }
-        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
-                         new_perm_h)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         perms['repositories_groups'])
-
-    def test_repo_in_group_permissions(self):
-        self.g1 = _make_group('group1', skip_if_exists=True)
-        self.g2 = _make_group('group2', skip_if_exists=True)
-        Session.commit()
-        # both perms should be read !
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.read', u'group2': u'group.read'})
-
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.read', u'group2': u'group.read'})
-
-        #Change perms to none for both groups
-        ReposGroupModel().grant_user_permission(repos_group=self.g1,
-                                                user=self.anon,
-                                                perm='group.none')
-        ReposGroupModel().grant_user_permission(repos_group=self.g2,
-                                                user=self.anon,
-                                                perm='group.none')
-
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-        # add repo to group
-        form_data = {
-            'repo_name': HG_REPO,
-            'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
-            'repo_type': 'hg',
-            'clone_uri': '',
-            'repo_group': self.g1.group_id,
-            'description': 'desc',
-            'private': False,
-            'landing_rev': 'tip'
-        }
-        self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
-        Session.commit()
-
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-        #grant permission for u2 !
-        ReposGroupModel().grant_user_permission(repos_group=self.g1,
-                                                user=self.u2,
-                                                perm='group.read')
-        ReposGroupModel().grant_user_permission(repos_group=self.g2,
-                                                user=self.u2,
-                                                perm='group.read')
-        Session.commit()
-        self.assertNotEqual(self.u1, self.u2)
-        #u1 and anon should have not change perms while u2 should !
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-        u2_auth = AuthUser(user_id=self.u2.user_id)
-        self.assertEqual(u2_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.read', u'group2': u'group.read'})
-
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                 {u'group1': u'group.none', u'group2': u'group.none'})
-
-    def test_repo_group_user_as_user_group_member(self):
-        # create Group1
-        self.g1 = _make_group('group1', skip_if_exists=True)
-        Session.commit()
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.read'})
-
-        # set default permission to none
-        ReposGroupModel().grant_user_permission(repos_group=self.g1,
-                                                user=self.anon,
-                                                perm='group.none')
-        # make group
-        self.ug1 = UsersGroupModel().create('G1')
-        # add user to group
-        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
-        Session.commit()
-
-        # check if user is in the group
-        membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
-        self.assertEqual(membrs, [self.u1.user_id])
-        # add some user to that group
-
-        # check his permissions
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.none'})
-
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.none'})
-
-        # grant ug1 read permissions for
-        ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
-                                                       group_name=self.ug1,
-                                                       perm='group.read')
-        Session.commit()
-        # check if the
-        obj = Session.query(UsersGroupRepoGroupToPerm)\
-            .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
-            .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
-            .scalar()
-        self.assertEqual(obj.permission.permission_name, 'group.read')
-
-        a1_auth = AuthUser(user_id=self.anon.user_id)
-
-        self.assertEqual(a1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.none'})
-
-        u1_auth = AuthUser(user_id=self.u1.user_id)
-        self.assertEqual(u1_auth.permissions['repositories_groups'],
-                         {u'group1': u'group.read'})
--- a/rhodecode/tests/vcs/__init__.py	Sun Jul 01 16:22:38 2012 +0200
+++ b/rhodecode/tests/vcs/__init__.py	Sun Jul 01 16:24:15 2012 +0200
@@ -22,8 +22,8 @@
 import os
 from rhodecode.lib import vcs
 from rhodecode.lib.vcs.utils.compat import unittest
+from utils import VCSTestError, SCMFetcher
 from rhodecode.tests import *
-from utils import VCSTestError, SCMFetcher
 
 
 def setup_package():