Mercurial > kallithea
view rhodecode/tests/functional/test_admin_repos.py @ 4116:ffd45b185016 rhodecode-2.2.5-gpl
Imported some of the GPLv3'd changes from RhodeCode v2.2.5.
This imports changes between changesets 21af6c4eab3d and 6177597791c2 in
RhodeCode's original repository, including only changes to Python files and HTML.
RhodeCode clearly licensed its changes to these files under GPLv3
in their /LICENSE file, which states the following:
The Python code and integrated HTML are licensed under the GPLv3 license.
(See:
https://code.rhodecode.com/rhodecode/files/v2.2.5/LICENSE
or
http://web.archive.org/web/20140512193334/https://code.rhodecode.com/rhodecode/files/f3b123159901f15426d18e3dc395e8369f70ebe0/LICENSE
for an online copy of that LICENSE file)
Conservancy reviewed these changes and confirmed that they can be licensed as
a whole to the Kallithea project under GPLv3-only.
While some of the contents committed herein are clearly licensed
GPLv3-or-later, on the whole we must assume the are GPLv3-only, since the
statement above from RhodeCode indicates that they intend GPLv3-only as their
license, per GPLv3§14 and other relevant sections of GPLv3.
author | Bradley M. Kuhn <bkuhn@sfconservancy.org> |
---|---|
date | Wed, 02 Jul 2014 19:03:13 -0400 |
parents | d7488551578e |
children | 7e5f8c12a3fc |
line wrap: on
line source
# -*- coding: utf-8 -*- import os import mock import urllib from rhodecode.lib import vcs from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ Permission from rhodecode.model.user import UserModel from rhodecode.tests import * from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.repo import RepoModel from rhodecode.model.scm import ScmModel from rhodecode.model.meta import Session from rhodecode.tests.fixture import Fixture, error_function fixture = Fixture() def _get_permission_for_user(user, repo): perm = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository == Repository.get_by_repo_name(repo))\ .filter(UserRepoToPerm.user == User.get_by_username(user))\ .all() return perm class _BaseTest(TestController): """ Write all tests here """ REPO = None REPO_TYPE = None NEW_REPO = None OTHER_TYPE_REPO = None OTHER_TYPE = None @classmethod def setup_class(cls): pass @classmethod def teardown_class(cls): pass def test_index(self): self.log_user() response = self.app.get(url('repos')) def test_create(self): self.log_user() repo_name = self.NEW_REPO description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name)) self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, 'Created repository <a href="/%s">%s</a>' % (repo_name, repo_name)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name).one() self.assertEqual(new_repo.repo_name, repo_name) self.assertEqual(new_repo.description, description) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) response.mustcontain(self.REPO_TYPE) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) RepoModel().delete(repo_name) Session().commit() def test_create_non_ascii(self): self.log_user() non_ascii = "ąęł" repo_name = "%s%s" % (self.NEW_REPO, non_ascii) repo_name_unicode = repo_name.decode('utf8') description = 'description for newly created repo' + non_ascii description_unicode = description.decode('utf8') response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name)) self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, u'Created repository <a href="/%s">%s</a>' % (urllib.quote(repo_name), repo_name_unicode)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_unicode).one() self.assertEqual(new_repo.repo_name, repo_name_unicode) self.assertEqual(new_repo.description, description_unicode) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) response.mustcontain(self.REPO_TYPE) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) def test_create_in_group(self): self.log_user() ## create GROUP group_name = 'sometest_%s' % self.REPO_TYPE gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) Session().commit() repo_name = 'ingroup' repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, repo_group=gr.group_id,)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, 'Created repository <a href="/%s">%s</a>' % (repo_name_full, repo_name_full)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_full).one() new_repo_id = new_repo.repo_id self.assertEqual(new_repo.repo_name, repo_name_full) self.assertEqual(new_repo.description, description) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name_full)) response.mustcontain(repo_name_full) response.mustcontain(self.REPO_TYPE) inherited_perms = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository_id == new_repo_id).all() self.assertEqual(len(inherited_perms), 1) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) except Exception: RepoGroupModel().delete(group_name) Session().commit() self.fail('no repo %s in filesystem' % repo_name) RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) Session().commit() def test_create_in_group_without_needed_permissions(self): usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) # revoke user_model = UserModel() # disable fork and create on default user user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') # disable on regular user user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') Session().commit() ## create GROUP group_name = 'reg_sometest_%s' % self.REPO_TYPE gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) Session().commit() group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE gr_allowed = RepoGroupModel().create(group_name=group_name_allowed, group_description='test', owner=TEST_USER_REGULAR_LOGIN) Session().commit() repo_name = 'ingroup' repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, repo_group=gr.group_id,)) response.mustcontain('Invalid value') # user is allowed to create in this group repo_name = 'ingroup' repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name]) description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, repo_group=gr_allowed.group_id,)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, 'Created repository <a href="/%s">%s</a>' % (repo_name_full, repo_name_full)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_full).one() new_repo_id = new_repo.repo_id self.assertEqual(new_repo.repo_name, repo_name_full) self.assertEqual(new_repo.description, description) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name_full)) response.mustcontain(repo_name_full) response.mustcontain(self.REPO_TYPE) inherited_perms = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository_id == new_repo_id).all() self.assertEqual(len(inherited_perms), 1) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) except Exception: RepoGroupModel().delete(group_name) Session().commit() self.fail('no repo %s in filesystem' % repo_name) RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) RepoGroupModel().delete(group_name_allowed) Session().commit() def test_create_in_group_inherit_permissions(self): self.log_user() ## create GROUP group_name = 'sometest_%s' % self.REPO_TYPE gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) perm = Permission.get_by_key('repository.write') RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm) ## add repo permissions Session().commit() repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, repo_group=gr.group_id, repo_copy_permissions=True)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) self.checkSessionFlash(response, 'Created repository <a href="/%s">%s</a>' % (repo_name_full, repo_name_full)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_full).one() new_repo_id = new_repo.repo_id self.assertEqual(new_repo.repo_name, repo_name_full) self.assertEqual(new_repo.description, description) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name_full)) response.mustcontain(repo_name_full) response.mustcontain(self.REPO_TYPE) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) except Exception: RepoGroupModel().delete(group_name) Session().commit() self.fail('no repo %s in filesystem' % repo_name) #check if inherited permissiona are applied inherited_perms = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository_id == new_repo_id).all() self.assertEqual(len(inherited_perms), 2) self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username for x in inherited_perms]) self.assertTrue('repository.write' in [x.permission.permission_name for x in inherited_perms]) RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) Session().commit() def test_create_remote_repo_wrong_clone_uri(self): self.log_user() repo_name = self.NEW_REPO description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, clone_uri='http://127.0.0.1/repo')) response.mustcontain('invalid clone url') def test_create_remote_repo_wrong_clone_uri_hg_svn(self): self.log_user() repo_name = self.NEW_REPO description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description, clone_uri='svn+http://127.0.0.1/repo')) response.mustcontain('invalid clone url') def test_delete(self): self.log_user() repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_type=self.REPO_TYPE, repo_name=repo_name, repo_description=description)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name)) self.checkSessionFlash(response, 'Created repository <a href="/%s">%s</a>' % (repo_name, repo_name)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name).one() self.assertEqual(new_repo.repo_name, repo_name) self.assertEqual(new_repo.description, description) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) response.mustcontain(self.REPO_TYPE) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) response = self.app.delete(url('repo', repo_name=repo_name)) self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) response.follow() #check if repo was deleted from db deleted_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name).scalar() self.assertEqual(deleted_repo, None) self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), False) def test_delete_non_ascii(self): self.log_user() non_ascii = "ąęł" repo_name = "%s%s" % (self.NEW_REPO, non_ascii) repo_name_unicode = repo_name.decode('utf8') description = 'description for newly created repo' + non_ascii description_unicode = description.decode('utf8') response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description)) ## run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name)) self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, u'Created repository <a href="/%s">%s</a>' % (urllib.quote(repo_name), repo_name_unicode)) # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_unicode).one() self.assertEqual(new_repo.repo_name, repo_name_unicode) self.assertEqual(new_repo.description, description_unicode) # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) response.mustcontain(self.REPO_TYPE) # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) response = self.app.delete(url('repo', repo_name=repo_name)) self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode)) response.follow() #check if repo was deleted from db deleted_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_unicode).scalar() self.assertEqual(deleted_repo, None) self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), False) def test_delete_repo_with_group(self): #TODO: pass def test_delete_browser_fakeout(self): response = self.app.post(url('repo', repo_name=self.REPO), params=dict(_method='delete')) def test_show(self): self.log_user() response = self.app.get(url('repo', repo_name=self.REPO)) def test_edit(self): response = self.app.get(url('edit_repo', repo_name=self.REPO)) def test_set_private_flag_sets_default_to_none(self): self.log_user() #initially repository perm should be read perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.read') self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) response = self.app.put(url('repo', repo_name=self.REPO), fixture._get_repo_create_params(repo_private=1, repo_name=self.REPO, repo_type=self.REPO_TYPE, user=TEST_USER_ADMIN_LOGIN)) self.checkSessionFlash(response, msg='Repository %s updated successfully' % (self.REPO)) self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True) #now the repo default permission should be None perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.none') response = self.app.put(url('repo', repo_name=self.REPO), fixture._get_repo_create_params(repo_private=False, repo_name=self.REPO, repo_type=self.REPO_TYPE, user=TEST_USER_ADMIN_LOGIN)) self.checkSessionFlash(response, msg='Repository %s updated successfully' % (self.REPO)) self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) #we turn off private now the repo default permission should stay None perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.none') #update this permission back perm[0].permission = Permission.get_by_key('repository.read') Session().add(perm[0]) Session().commit() def test_set_repo_fork_has_no_self_id(self): self.log_user() repo = Repository.get_by_repo_name(self.REPO) response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO)) opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id response.mustcontain(no=[opt]) def test_set_fork_of_other_repo(self): self.log_user() other_repo = 'other_%s' % self.REPO_TYPE fixture.create_repo(other_repo, repo_type=self.REPO_TYPE) repo = Repository.get_by_repo_name(self.REPO) repo2 = Repository.get_by_repo_name(other_repo) response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), params=dict(id_fork_of=repo2.repo_id)) repo = Repository.get_by_repo_name(self.REPO) repo2 = Repository.get_by_repo_name(other_repo) self.checkSessionFlash(response, 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name)) assert repo.fork == repo2 response = response.follow() # check if given repo is selected opt = """<option value="%s" selected="selected">%s</option>""" % ( repo2.repo_id, repo2.repo_name) response.mustcontain(opt) fixture.destroy_repo(other_repo, forks='detach') def test_set_fork_of_other_type_repo(self): self.log_user() repo = Repository.get_by_repo_name(self.REPO) repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), params=dict(id_fork_of=repo2.repo_id)) repo = Repository.get_by_repo_name(self.REPO) repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) self.checkSessionFlash(response, 'Cannot set repository as fork of repository with other type') def test_set_fork_of_none(self): self.log_user() ## mark it as None response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), params=dict(id_fork_of=None)) repo = Repository.get_by_repo_name(self.REPO) repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) self.checkSessionFlash(response, 'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing")) assert repo.fork is None def test_set_fork_of_same_repo(self): self.log_user() repo = Repository.get_by_repo_name(self.REPO) response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), params=dict(id_fork_of=repo.repo_id)) self.checkSessionFlash(response, 'An error occurred during this operation') def test_create_on_top_level_without_permissions(self): usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) # revoke user_model = UserModel() # disable fork and create on default user user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') # disable on regular user user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') Session().commit() user = User.get(usr['user_id']) repo_name = self.NEW_REPO+'no_perms' description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description)) response.mustcontain('no permission to create repository in root location') RepoModel().delete(repo_name) Session().commit() @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) def test_create_repo_when_filesystem_op_fails(self): self.log_user() repo_name = self.NEW_REPO description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, repo_type=self.REPO_TYPE, repo_description=description)) self.checkSessionFlash(response, 'Error creating repository %s' % repo_name) # repo must not be in db repo = Repository.get_by_repo_name(repo_name) self.assertEqual(repo, None) # repo must not be in filesystem ! self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name))) class TestAdminReposControllerGIT(_BaseTest): REPO = GIT_REPO REPO_TYPE = 'git' NEW_REPO = NEW_GIT_REPO OTHER_TYPE_REPO = HG_REPO OTHER_TYPE = 'hg' class TestAdminReposControllerHG(_BaseTest): REPO = HG_REPO REPO_TYPE = 'hg' NEW_REPO = NEW_HG_REPO OTHER_TYPE_REPO = GIT_REPO OTHER_TYPE = 'git'