Mercurial > kallithea
changeset 5983:a69bcf8b28b3
tests: use forks when modifying repositories in vcs tests
Otherwise other tests (primarily checking the size of the repo) would fail
because they expect the repos to be unmodified.
author | domruf <dominikruf@gmail.com> |
---|---|
date | Wed, 15 Jun 2016 18:33:27 +0200 |
parents | f55891e42e0e |
children | 77bd10c410ed |
files | kallithea/tests/other/manual_test_vcs_operations.py |
diffstat | 1 files changed, 38 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/tests/other/manual_test_vcs_operations.py Tue Jun 14 19:33:42 2016 +0200 +++ b/kallithea/tests/other/manual_test_vcs_operations.py Wed Jun 15 18:33:27 2016 +0200 @@ -42,6 +42,7 @@ from subprocess import Popen, PIPE from kallithea.tests import * +from kallithea.tests.fixture import Fixture from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation from kallithea.model.meta import Session from kallithea.model.repo import RepoModel @@ -50,6 +51,8 @@ DEBUG = True HOST = '127.0.0.1:4999' # test host +fixture = Fixture() + class Command(object): @@ -251,7 +254,10 @@ clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) - stdout, stderr = _add_files_and_push('hg', DEST) + fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next()) + fixture.create_fork(HG_REPO, fork_name) + clone_url = _construct_url(fork_name).split()[0] + stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url) assert 'pushing to' in stdout assert 'Repository size' in stdout @@ -263,9 +269,11 @@ stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST) - - print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] + fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next()) + fixture.create_fork(GIT_REPO, fork_name) + clone_url = _construct_url(fork_name).split()[0] + stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url) + print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] # TODO: what is this for _check_proper_git_push(stdout, stderr) def test_push_invalidates_cache_hg(self): @@ -282,10 +290,13 @@ clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) - stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) + fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next()) + fixture.create_fork(HG_REPO, fork_name) + clone_url = _construct_url(fork_name).split()[0] + stdout, stderr = _add_files_and_push('hg', DEST, files_no=1, clone_url=clone_url) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).all() + ==fork_name).all() assert key == [] def test_push_invalidates_cache_git(self): @@ -303,11 +314,14 @@ stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST, files_no=1) + fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next()) + fixture.create_fork(GIT_REPO, fork_name) + clone_url = _construct_url(fork_name).split()[0] + stdout, stderr = _add_files_and_push('git', DEST, files_no=1, clone_url=clone_url) _check_proper_git_push(stdout, stderr) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).all() + ==fork_name).all() assert key == [] def test_push_wrong_credentials_hg(self): @@ -452,52 +466,56 @@ def test_push_unlocks_repository_hg(self): # enable locking - r = Repository.get_by_repo_name(HG_REPO) + fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next()) + fixture.create_fork(HG_REPO, fork_name) + r = Repository.get_by_repo_name(fork_name) r.enable_locking = True Session().add(r) Session().commit() #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) + clone_url = _construct_url(fork_name, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) #check for lock repo after clone - r = Repository.get_by_repo_name(HG_REPO) + r = Repository.get_by_repo_name(fork_name) uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id assert r.locked[0] == uid #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('hg', DEST) - assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout + stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0]) + assert ('remote: Released lock on repo `%s`' % fork_name) in stdout #we need to cleanup the Session Here ! Session.remove() - r = Repository.get_by_repo_name(HG_REPO) + r = Repository.get_by_repo_name(fork_name) assert r.locked == [None, None] #TODO: fix me ! somehow during tests hooks don't get called on Git def test_push_unlocks_repository_git(self): # enable locking - r = Repository.get_by_repo_name(GIT_REPO) + fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next()) + fixture.create_fork(GIT_REPO, fork_name) + r = Repository.get_by_repo_name(fork_name) r.enable_locking = True Session().add(r) Session().commit() #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) + clone_url = _construct_url(fork_name, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) #check for lock repo after clone - r = Repository.get_by_repo_name(GIT_REPO) + r = Repository.get_by_repo_name(fork_name) assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('git', DEST) + stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0]) _check_proper_git_push(stdout, stderr) - assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stderr + assert ('remote: Released lock on repo `%s`' % fork_name) in stderr #we need to cleanup the Session Here ! Session.remove() - r = Repository.get_by_repo_name(GIT_REPO) + r = Repository.get_by_repo_name(fork_name) assert r.locked == [None, None] def test_ip_restriction_hg(self):