view rhodecode/tests/functional/test_admin_repos.py @ 4116:ffd45b185016 rhodecode-2.2.5-gpl

Imported some of the GPLv3'd changes from RhodeCode v2.2.5. This imports changes between changesets 21af6c4eab3d and 6177597791c2 in RhodeCode's original repository, including only changes to Python files and HTML. RhodeCode clearly licensed its changes to these files under GPLv3 in their /LICENSE file, which states the following: The Python code and integrated HTML are licensed under the GPLv3 license. (See: https://code.rhodecode.com/rhodecode/files/v2.2.5/LICENSE or http://web.archive.org/web/20140512193334/https://code.rhodecode.com/rhodecode/files/f3b123159901f15426d18e3dc395e8369f70ebe0/LICENSE for an online copy of that LICENSE file) Conservancy reviewed these changes and confirmed that they can be licensed as a whole to the Kallithea project under GPLv3-only. While some of the contents committed herein are clearly licensed GPLv3-or-later, on the whole we must assume the are GPLv3-only, since the statement above from RhodeCode indicates that they intend GPLv3-only as their license, per GPLv3§14 and other relevant sections of GPLv3.
author Bradley M. Kuhn <bkuhn@sfconservancy.org>
date Wed, 02 Jul 2014 19:03:13 -0400
parents d7488551578e
children 7e5f8c12a3fc
line wrap: on
line source

# -*- coding: utf-8 -*-

import os
import mock
import urllib

from rhodecode.lib import vcs
from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
    Permission
from rhodecode.model.user import UserModel
from rhodecode.tests import *
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.scm import ScmModel
from rhodecode.model.meta import Session
from rhodecode.tests.fixture import Fixture, error_function

fixture = Fixture()


def _get_permission_for_user(user, repo):
    perm = UserRepoToPerm.query()\
                .filter(UserRepoToPerm.repository ==
                        Repository.get_by_repo_name(repo))\
                .filter(UserRepoToPerm.user == User.get_by_username(user))\
                .all()
    return perm


class _BaseTest(TestController):
    """
    Write all tests here
    """
    REPO = None
    REPO_TYPE = None
    NEW_REPO = None
    OTHER_TYPE_REPO = None
    OTHER_TYPE = None

    @classmethod
    def setup_class(cls):
        pass

    @classmethod
    def teardown_class(cls):
        pass

    def test_index(self):
        self.log_user()
        response = self.app.get(url('repos'))

    def test_create(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description))
        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
        self.assertEqual(response.json, {u'result': True})
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name, repo_name))

        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except Exception:
            self.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name)
        Session().commit()

    def test_create_non_ascii(self):
        self.log_user()
        non_ascii = "ąęł"
        repo_name = "%s%s" % (self.NEW_REPO, non_ascii)
        repo_name_unicode = repo_name.decode('utf8')
        description = 'description for newly created repo' + non_ascii
        description_unicode = description.decode('utf8')
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description))
        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
        self.assertEqual(response.json, {u'result': True})
        self.checkSessionFlash(response,
                               u'Created repository <a href="/%s">%s</a>'
                               % (urllib.quote(repo_name), repo_name_unicode))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_unicode).one()

        self.assertEqual(new_repo.repo_name, repo_name_unicode)
        self.assertEqual(new_repo.description, description_unicode)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except Exception:
            self.fail('no repo %s in filesystem' % repo_name)

    def test_create_in_group(self):
        self.log_user()

        ## create GROUP
        group_name = 'sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description='test',
                                     owner=TEST_USER_ADMIN_LOGIN)
        Session().commit()

        repo_name = 'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,))
        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
        self.assertEqual(response.json, {u'result': True})
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        self.assertEqual(new_repo.repo_name, repo_name_full)
        self.assertEqual(new_repo.description, description)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        inherited_perms = UserRepoToPerm.query()\
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        self.assertEqual(len(inherited_perms), 1)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
        except Exception:
            RepoGroupModel().delete(group_name)
            Session().commit()
            self.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        Session().commit()

    def test_create_in_group_without_needed_permissions(self):
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
        # revoke
        user_model = UserModel()
        # disable fork and create on default user
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')

        # disable on regular user
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
        Session().commit()

        ## create GROUP
        group_name = 'reg_sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description='test',
                                     owner=TEST_USER_ADMIN_LOGIN)
        Session().commit()

        group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
                                     group_description='test',
                                     owner=TEST_USER_REGULAR_LOGIN)
        Session().commit()

        repo_name = 'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,))

        response.mustcontain('Invalid value')

        # user is allowed to create in this group
        repo_name = 'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name])
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr_allowed.group_id,))

        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
        self.assertEqual(response.json, {u'result': True})
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        self.assertEqual(new_repo.repo_name, repo_name_full)
        self.assertEqual(new_repo.description, description)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        inherited_perms = UserRepoToPerm.query()\
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        self.assertEqual(len(inherited_perms), 1)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
        except Exception:
            RepoGroupModel().delete(group_name)
            Session().commit()
            self.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        RepoGroupModel().delete(group_name_allowed)
        Session().commit()

    def test_create_in_group_inherit_permissions(self):
        self.log_user()

        ## create GROUP
        group_name = 'sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description='test',
                                     owner=TEST_USER_ADMIN_LOGIN)
        perm = Permission.get_by_key('repository.write')
        RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm)

        ## add repo permissions
        Session().commit()

        repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,
                                                repo_copy_permissions=True))

        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        self.assertEqual(new_repo.repo_name, repo_name_full)
        self.assertEqual(new_repo.description, description)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
        except Exception:
            RepoGroupModel().delete(group_name)
            Session().commit()
            self.fail('no repo %s in filesystem' % repo_name)

        #check if inherited permissiona are applied
        inherited_perms = UserRepoToPerm.query()\
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        self.assertEqual(len(inherited_perms), 2)

        self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
                                                    for x in inherited_perms])
        self.assertTrue('repository.write' in [x.permission.permission_name
                                               for x in inherited_perms])

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        Session().commit()

    def test_create_remote_repo_wrong_clone_uri(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                clone_uri='http://127.0.0.1/repo'))
        response.mustcontain('invalid clone url')


    def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                clone_uri='svn+http://127.0.0.1/repo'))
        response.mustcontain('invalid clone url')


    def test_delete(self):
        self.log_user()
        repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_type=self.REPO_TYPE,
                                                repo_name=repo_name,
                                                repo_description=description))
        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name, repo_name))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except Exception:
            self.fail('no repo %s in filesystem' % repo_name)

        response = self.app.delete(url('repo', repo_name=repo_name))

        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))

        response.follow()

        #check if repo was deleted from db
        deleted_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).scalar()

        self.assertEqual(deleted_repo, None)

        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
                                  False)

    def test_delete_non_ascii(self):
        self.log_user()
        non_ascii = "ąęł"
        repo_name = "%s%s" % (self.NEW_REPO, non_ascii)
        repo_name_unicode = repo_name.decode('utf8')
        description = 'description for newly created repo' + non_ascii
        description_unicode = description.decode('utf8')
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description))
        ## run the check page that triggers the flash message
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
        self.assertEqual(response.json, {u'result': True})
        self.checkSessionFlash(response,
                               u'Created repository <a href="/%s">%s</a>'
                               % (urllib.quote(repo_name), repo_name_unicode))
        # test if the repo was created in the database
        new_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_unicode).one()

        self.assertEqual(new_repo.repo_name, repo_name_unicode)
        self.assertEqual(new_repo.description, description_unicode)

        # test if the repository is visible in the list ?
        response = self.app.get(url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except Exception:
            self.fail('no repo %s in filesystem' % repo_name)

        response = self.app.delete(url('repo', repo_name=repo_name))
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
        response.follow()

        #check if repo was deleted from db
        deleted_repo = Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_unicode).scalar()

        self.assertEqual(deleted_repo, None)

        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
                                  False)

    def test_delete_repo_with_group(self):
        #TODO:
        pass

    def test_delete_browser_fakeout(self):
        response = self.app.post(url('repo', repo_name=self.REPO),
                                 params=dict(_method='delete'))

    def test_show(self):
        self.log_user()
        response = self.app.get(url('repo', repo_name=self.REPO))

    def test_edit(self):
        response = self.app.get(url('edit_repo', repo_name=self.REPO))

    def test_set_private_flag_sets_default_to_none(self):
        self.log_user()
        #initially repository perm should be read
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        self.assertTrue(len(perm), 1)
        self.assertEqual(perm[0].permission.permission_name, 'repository.read')
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)

        response = self.app.put(url('repo', repo_name=self.REPO),
                        fixture._get_repo_create_params(repo_private=1,
                                                repo_name=self.REPO,
                                                repo_type=self.REPO_TYPE,
                                                user=TEST_USER_ADMIN_LOGIN))
        self.checkSessionFlash(response,
                               msg='Repository %s updated successfully' % (self.REPO))
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True)

        #now the repo default permission should be None
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        self.assertTrue(len(perm), 1)
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')

        response = self.app.put(url('repo', repo_name=self.REPO),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=self.REPO,
                                                repo_type=self.REPO_TYPE,
                                                user=TEST_USER_ADMIN_LOGIN))
        self.checkSessionFlash(response,
                               msg='Repository %s updated successfully' % (self.REPO))
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)

        #we turn off private now the repo default permission should stay None
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        self.assertTrue(len(perm), 1)
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')

        #update this permission back
        perm[0].permission = Permission.get_by_key('repository.read')
        Session().add(perm[0])
        Session().commit()

    def test_set_repo_fork_has_no_self_id(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO))
        opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
        response.mustcontain(no=[opt])

    def test_set_fork_of_other_repo(self):
        self.log_user()
        other_repo = 'other_%s' % self.REPO_TYPE
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(other_repo)
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo2.repo_id))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(other_repo)
        self.checkSessionFlash(response,
            'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))

        assert repo.fork == repo2
        response = response.follow()
        # check if given repo is selected

        opt = """<option value="%s" selected="selected">%s</option>""" % (
                    repo2.repo_id, repo2.repo_name)
        response.mustcontain(opt)

        fixture.destroy_repo(other_repo, forks='detach')

    def test_set_fork_of_other_type_repo(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo2.repo_id))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        self.checkSessionFlash(response,
            'Cannot set repository as fork of repository with other type')

    def test_set_fork_of_none(self):
        self.log_user()
        ## mark it as None
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=None))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        self.checkSessionFlash(response,
                               'Marked repo %s as fork of %s'
                               % (repo.repo_name, "Nothing"))
        assert repo.fork is None

    def test_set_fork_of_same_repo(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo.repo_id))
        self.checkSessionFlash(response,
                               'An error occurred during this operation')

    def test_create_on_top_level_without_permissions(self):
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
        # revoke
        user_model = UserModel()
        # disable fork and create on default user
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')

        # disable on regular user
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
        Session().commit()


        user = User.get(usr['user_id'])

        repo_name = self.NEW_REPO+'no_perms'
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description))

        response.mustcontain('no permission to create repository in root location')

        RepoModel().delete(repo_name)
        Session().commit()

    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
    def test_create_repo_when_filesystem_op_fails(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = 'description for newly created repo'

        response = self.app.post(url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description))

        self.checkSessionFlash(response,
                               'Error creating repository %s' % repo_name)
        # repo must not be in db
        repo = Repository.get_by_repo_name(repo_name)
        self.assertEqual(repo, None)

        # repo must not be in filesystem !
        self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))

class TestAdminReposControllerGIT(_BaseTest):
    REPO = GIT_REPO
    REPO_TYPE = 'git'
    NEW_REPO = NEW_GIT_REPO
    OTHER_TYPE_REPO = HG_REPO
    OTHER_TYPE = 'hg'


class TestAdminReposControllerHG(_BaseTest):
    REPO = HG_REPO
    REPO_TYPE = 'hg'
    NEW_REPO = NEW_HG_REPO
    OTHER_TYPE_REPO = GIT_REPO
    OTHER_TYPE = 'git'