view kallithea/tests/functional/test_admin_repos.py @ 8109:e527cc2ce8dc

cleanup: get rid of most "import *" Apply script generated with the following hack: ( hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(.*)' may be undefined, or defined from star imports.*/sed -ri 's,\\\\<\2\\\\>([^=]|$),XXXX.\2\\\\1,g' \1/gp" | sort -u hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.* undefined name '(.*)'$/sed -ri 's,\\\\<\2\\\\>([^=]|$),XXXX.\2\\\\1,g' \1/gp" | sort -u hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(from .*)\.([^.]*) import \*' used.*/sed -ri 's,\\\\<XXXX\\\\.,\3.,g' \1/gp" | sort -u hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(from .*)\.([^.]*) import \*' used.*/sed -ri 's,\2\\\\.\3 .*,\2 import \3,g' \1/gp" | sort -u ) | grep -v kallithea/bin/kallithea_cli_ishell.py > fix2.sh
author Mads Kiilerich <mads@kiilerich.com>
date Thu, 02 Jan 2020 00:44:56 +0100
parents fe4086096758
children 22b40db44a41
line wrap: on
line source

# -*- coding: utf-8 -*-

import os
import urllib

import mock
import pytest

from kallithea.lib import vcs
from kallithea.lib.utils2 import safe_str, safe_unicode
from kallithea.model.db import Permission, RepoGroup, Repository, Ui, User, UserRepoToPerm
from kallithea.model.meta import Session
from kallithea.model.repo import RepoModel
from kallithea.model.repo_group import RepoGroupModel
from kallithea.model.user import UserModel
from kallithea.tests import base
from kallithea.tests.fixture import Fixture, error_function


fixture = Fixture()


def _get_permission_for_user(user, repo):
    perm = UserRepoToPerm.query() \
                .filter(UserRepoToPerm.repository ==
                        Repository.get_by_repo_name(repo)) \
                .filter(UserRepoToPerm.user == User.get_by_username(user)) \
                .all()
    return perm


class _BaseTestCase(base.TestController):
    """
    Write all tests here
    """
    REPO = None
    REPO_TYPE = None
    NEW_REPO = None
    OTHER_TYPE_REPO = None
    OTHER_TYPE = None

    def test_index(self):
        self.log_user()
        response = self.app.get(base.url('repos'))

    def test_create(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
        assert response.json == {u'result': True}
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name, repo_name))

        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name).one()

        assert new_repo.repo_name == repo_name
        assert new_repo.description == description

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
        except vcs.exceptions.VCSError:
            pytest.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name)
        Session().commit()

    def test_case_insensitivity(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                                 fixture._get_repo_create_params(repo_private=False,
                                                                 repo_name=repo_name,
                                                                 repo_type=self.REPO_TYPE,
                                                                 repo_description=description,
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
        # try to create repo with swapped case
        swapped_repo_name = repo_name.swapcase()
        response = self.app.post(base.url('repos'),
                                 fixture._get_repo_create_params(repo_private=False,
                                                                 repo_name=swapped_repo_name,
                                                                 repo_type=self.REPO_TYPE,
                                                                 repo_description=description,
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
        response.mustcontain('already exists')

        RepoModel().delete(repo_name)
        Session().commit()

    def test_create_in_group(self):
        self.log_user()

        ## create GROUP
        group_name = u'sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description=u'test',
                                     owner=base.TEST_USER_ADMIN_LOGIN)
        Session().commit()

        repo_name = u'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
        assert response.json == {u'result': True}
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        assert new_repo.repo_name == repo_name_full
        assert new_repo.description == description

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        inherited_perms = UserRepoToPerm.query() \
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        assert len(inherited_perms) == 1

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
        except vcs.exceptions.VCSError:
            RepoGroupModel().delete(group_name)
            Session().commit()
            pytest.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        Session().commit()

    def test_create_in_group_without_needed_permissions(self):
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
        # avoid spurious RepoGroup DetachedInstanceError ...
        session_csrf_secret_token = self.session_csrf_secret_token()
        # revoke
        user_model = UserModel()
        # disable fork and create on default user
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')

        # disable on regular user
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
        Session().commit()

        ## create GROUP
        group_name = u'reg_sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description=u'test',
                                     owner=base.TEST_USER_ADMIN_LOGIN)
        Session().commit()

        group_name_allowed = u'reg_sometest_allowed_%s' % self.REPO_TYPE
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
                                     group_description=u'test',
                                     owner=base.TEST_USER_REGULAR_LOGIN)
        Session().commit()

        repo_name = u'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,
                                                _session_csrf_secret_token=session_csrf_secret_token))

        response.mustcontain('Invalid value')

        # user is allowed to create in this group
        repo_name = u'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name])
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr_allowed.group_id,
                                                _session_csrf_secret_token=session_csrf_secret_token))

        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
        assert response.json == {u'result': True}
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        assert new_repo.repo_name == repo_name_full
        assert new_repo.description == description

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        inherited_perms = UserRepoToPerm.query() \
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        assert len(inherited_perms) == 1

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
        except vcs.exceptions.VCSError:
            RepoGroupModel().delete(group_name)
            Session().commit()
            pytest.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        RepoGroupModel().delete(group_name_allowed)
        Session().commit()

    def test_create_in_group_inherit_permissions(self):
        self.log_user()

        ## create GROUP
        group_name = u'sometest_%s' % self.REPO_TYPE
        gr = RepoGroupModel().create(group_name=group_name,
                                     group_description=u'test',
                                     owner=base.TEST_USER_ADMIN_LOGIN)
        perm = Permission.get_by_key('repository.write')
        RepoGroupModel().grant_user_permission(gr, base.TEST_USER_REGULAR_LOGIN, perm)

        ## add repo permissions
        Session().commit()

        repo_name = u'ingroup_inherited_%s' % self.REPO_TYPE
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                repo_group=gr.group_id,
                                                repo_copy_permissions=True,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))

        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name_full, repo_name_full))
        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name_full).one()
        new_repo_id = new_repo.repo_id

        assert new_repo.repo_name == repo_name_full
        assert new_repo.description == description

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
        response.mustcontain(repo_name_full)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
        except vcs.exceptions.VCSError:
            RepoGroupModel().delete(group_name)
            Session().commit()
            pytest.fail('no repo %s in filesystem' % repo_name)

        # check if inherited permissiona are applied
        inherited_perms = UserRepoToPerm.query() \
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
        assert len(inherited_perms) == 2

        assert base.TEST_USER_REGULAR_LOGIN in [x.user.username
                                                    for x in inherited_perms]
        assert 'repository.write' in [x.permission.permission_name
                                               for x in inherited_perms]

        RepoModel().delete(repo_name_full)
        RepoGroupModel().delete(group_name)
        Session().commit()

    def test_create_remote_repo_wrong_clone_uri(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                clone_uri='http://127.0.0.1/repo',
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        response.mustcontain('Invalid repository URL')

    def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                clone_uri='svn+http://127.0.0.1/repo',
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        response.mustcontain('Invalid repository URL')

    def test_delete(self):
        self.log_user()
        repo_name = u'vcs_test_new_to_delete_%s' % self.REPO_TYPE
        description = u'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_type=self.REPO_TYPE,
                                                repo_name=repo_name,
                                                repo_description=description,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
        self.checkSessionFlash(response,
                               'Created repository <a href="/%s">%s</a>'
                               % (repo_name, repo_name))
        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name).one()

        assert new_repo.repo_name == repo_name
        assert new_repo.description == description

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
        except vcs.exceptions.VCSError:
            pytest.fail('no repo %s in filesystem' % repo_name)

        response = self.app.post(base.url('delete_repo', repo_name=repo_name),
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})

        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))

        response.follow()

        # check if repo was deleted from db
        deleted_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name).scalar()

        assert deleted_repo is None

        assert os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)) == False

    def test_delete_non_ascii(self):
        self.log_user()
        non_ascii = "ąęł"
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
        repo_name_unicode = safe_unicode(repo_name)
        description = 'description for newly created repo' + non_ascii
        description_unicode = safe_unicode(description)
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        ## run the check page that triggers the flash message
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
        assert response.json == {u'result': True}
        self.checkSessionFlash(response,
                               u'Created repository <a href="/%s">%s</a>'
                               % (urllib.quote(repo_name), repo_name_unicode))
        # test if the repo was created in the database
        new_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name_unicode).one()

        assert new_repo.repo_name == repo_name_unicode
        assert new_repo.description == description_unicode

        # test if the repository is visible in the list ?
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
        response.mustcontain(repo_name)
        response.mustcontain(self.REPO_TYPE)

        # test if the repository was created on filesystem
        try:
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)))
        except vcs.exceptions.VCSError:
            pytest.fail('no repo %s in filesystem' % repo_name)

        response = self.app.post(base.url('delete_repo', repo_name=repo_name),
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
        response.follow()

        # check if repo was deleted from db
        deleted_repo = Session().query(Repository) \
            .filter(Repository.repo_name == repo_name_unicode).scalar()

        assert deleted_repo is None

        assert os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)) == False

    def test_delete_repo_with_group(self):
        # TODO:
        pass

    def test_delete_browser_fakeout(self):
        response = self.app.post(base.url('delete_repo', repo_name=self.REPO),
                                 params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))

    def test_show(self):
        self.log_user()
        response = self.app.get(base.url('summary_home', repo_name=self.REPO))

    def test_edit(self):
        response = self.app.get(base.url('edit_repo', repo_name=self.REPO))

    def test_set_private_flag_sets_default_to_none(self):
        self.log_user()
        # initially repository perm should be read
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        assert len(perm), 1
        assert perm[0].permission.permission_name == 'repository.read'
        assert Repository.get_by_repo_name(self.REPO).private == False

        response = self.app.post(base.url('update_repo', repo_name=self.REPO),
                        fixture._get_repo_create_params(repo_private=1,
                                                repo_name=self.REPO,
                                                repo_type=self.REPO_TYPE,
                                                owner=base.TEST_USER_ADMIN_LOGIN,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        self.checkSessionFlash(response,
                               msg='Repository %s updated successfully' % (self.REPO))
        assert Repository.get_by_repo_name(self.REPO).private == True

        # now the repo default permission should be None
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        assert len(perm), 1
        assert perm[0].permission.permission_name == 'repository.none'

        response = self.app.post(base.url('update_repo', repo_name=self.REPO),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=self.REPO,
                                                repo_type=self.REPO_TYPE,
                                                owner=base.TEST_USER_ADMIN_LOGIN,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
        self.checkSessionFlash(response,
                               msg='Repository %s updated successfully' % (self.REPO))
        assert Repository.get_by_repo_name(self.REPO).private == False

        # we turn off private now the repo default permission should stay None
        perm = _get_permission_for_user(user='default', repo=self.REPO)
        assert len(perm), 1
        assert perm[0].permission.permission_name == 'repository.none'

        # update this permission back
        perm[0].permission = Permission.get_by_key('repository.read')
        Session().commit()

    def test_set_repo_fork_has_no_self_id(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        response = self.app.get(base.url('edit_repo_advanced', repo_name=self.REPO))
        opt = """<option value="%s">%s</option>""" % (repo.repo_id, self.REPO)
        response.mustcontain(no=[opt])

    def test_set_fork_of_other_repo(self):
        self.log_user()
        other_repo = u'other_%s' % self.REPO_TYPE
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(other_repo)
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(other_repo)
        self.checkSessionFlash(response,
            'Marked repository %s as fork of %s' % (repo.repo_name, repo2.repo_name))

        assert repo.fork == repo2
        response = response.follow()
        # check if given repo is selected

        opt = """<option value="%s" selected="selected">%s</option>""" % (
                    repo2.repo_id, repo2.repo_name)
        response.mustcontain(opt)

        fixture.destroy_repo(other_repo, forks='detach')

    def test_set_fork_of_other_type_repo(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        self.checkSessionFlash(response,
            'Cannot set repository as fork of repository with other type')

    def test_set_fork_of_none(self):
        self.log_user()
        ## mark it as None
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=None, _session_csrf_secret_token=self.session_csrf_secret_token()))
        repo = Repository.get_by_repo_name(self.REPO)
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
        self.checkSessionFlash(response,
                               'Marked repository %s as fork of %s'
                               % (repo.repo_name, "Nothing"))
        assert repo.fork is None

    def test_set_fork_of_same_repo(self):
        self.log_user()
        repo = Repository.get_by_repo_name(self.REPO)
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
                                params=dict(id_fork_of=repo.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
        self.checkSessionFlash(response,
                               'An error occurred during this operation')

    def test_create_on_top_level_without_permissions(self):
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
        # revoke
        user_model = UserModel()
        # disable fork and create on default user
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')

        # disable on regular user
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
        Session().commit()


        user = User.get(usr['user_id'])

        repo_name = self.NEW_REPO + u'no_perms'
        description = 'description for newly created repo'
        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))

        response.mustcontain('<span class="error-message">Invalid value</span>')

        RepoModel().delete(repo_name)
        Session().commit()

    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
    def test_create_repo_when_filesystem_op_fails(self):
        self.log_user()
        repo_name = self.NEW_REPO
        description = 'description for newly created repo'

        response = self.app.post(base.url('repos'),
                        fixture._get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_type=self.REPO_TYPE,
                                                repo_description=description,
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))

        self.checkSessionFlash(response,
                               'Error creating repository %s' % repo_name)
        # repo must not be in db
        repo = Repository.get_by_repo_name(repo_name)
        assert repo is None

        # repo must not be in filesystem !
        assert not os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))


class TestAdminReposControllerGIT(_BaseTestCase):
    REPO = base.GIT_REPO
    REPO_TYPE = 'git'
    NEW_REPO = base.NEW_GIT_REPO
    OTHER_TYPE_REPO = base.HG_REPO
    OTHER_TYPE = 'hg'


class TestAdminReposControllerHG(_BaseTestCase):
    REPO = base.HG_REPO
    REPO_TYPE = 'hg'
    NEW_REPO = base.NEW_HG_REPO
    OTHER_TYPE_REPO = base.GIT_REPO
    OTHER_TYPE = 'git'

    def test_permanent_url_protocol_access(self):
        repo = Repository.get_by_repo_name(self.REPO)
        permanent_name = '_%d' % repo.repo_id

        # 400 Bad Request - Unable to detect pull/push action
        self.app.get(base.url('summary_home', repo_name=permanent_name),
            extra_environ={'HTTP_ACCEPT': 'application/mercurial'},
            status=400,
        )