# HG changeset patch # User Mads Kiilerich # Date 1429833830 -7200 # Node ID 55f855431f77816b2e40aa41b48819c4260e294d # Parent 552e47cd0f36ee7039418061a91f9456a25335b7 tests: rename test_vcs_operations.py to avoid automatic pick-up This is a special test that requires special setup and must be run manually. Nose already skips the test because it is executable. diff -r 552e47cd0f36 -r 55f855431f77 kallithea/tests/other/manual_test_vcs_operations.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kallithea/tests/other/manual_test_vcs_operations.py Fri Apr 24 02:03:50 2015 +0200 @@ -0,0 +1,535 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +kallithea.tests.test_scm_operations +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Test suite for making push/pull operations. + +Run it in two terminals:: + paster serve test.ini + KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 nosetests kallithea/tests/other/manual_test_vcs_operations.py + +You must have git > 1.8.1 for tests to work fine + +This file was forked by the Kallithea project in July 2014. +Original author and date, and relevant copyright and licensing information is below: +:created_on: Dec 30, 2010 +:author: marcink +:copyright: (c) 2013 RhodeCode GmbH, and others. +:license: GPLv3, see LICENSE.md for more details. + +""" + +import tempfile +import time +from os.path import join as jn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from kallithea.tests import * +from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation +from kallithea.model.meta import Session +from kallithea.model.repo import RepoModel +from kallithea.model.user import UserModel + +DEBUG = True +HOST = '127.0.0.1:5000' # test host + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """ + Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + if DEBUG: + print '*** CMD %s ***' % command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def _get_tmp_dir(): + return tempfile.mkdtemp(prefix='rc_integration_test') + + +def _construct_url(repo, dest=None, **kwargs): + if dest is None: + #make temp clone + dest = _get_tmp_dir() + params = { + 'user': TEST_USER_ADMIN_LOGIN, + 'passwd': TEST_USER_ADMIN_PASS, + 'host': HOST, + 'cloned_repo': repo, + 'dest': dest + } + params.update(**kwargs) + if params['user'] and params['passwd']: + _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params + else: + _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params + return _url + + +def _add_files_and_push(vcs, DEST, **kwargs): + """ + Generate some files, add it to DEST repo and push back + vcs is git or hg and defines what VCS we want to make those files for + + :param vcs: + :param DEST: + """ + # commit some stuff into this repo + cwd = path = jn(DEST) + #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) + added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) + Command(cwd).execute('touch %s' % added_file) + Command(cwd).execute('%s add %s' % (vcs, added_file)) + + for i in xrange(kwargs.get('files_no', 3)): + cmd = """echo 'added_line%s' >> %s""" % (i, added_file) + Command(cwd).execute(cmd) + author_str = 'Marcin Kuźminski ' + if vcs == 'hg': + cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( + i, author_str, added_file + ) + elif vcs == 'git': + cmd = """EMAIL="me@email.com" git commit -m 'commited new %s' --author '%s' %s """ % ( + i, author_str, added_file + ) + Command(cwd).execute(cmd) + + # PUSH it back + _REPO = None + if vcs == 'hg': + _REPO = HG_REPO + elif vcs == 'git': + _REPO = GIT_REPO + + kwargs['dest'] = '' + clone_url = _construct_url(_REPO, **kwargs) + if 'clone_url' in kwargs: + clone_url = kwargs['clone_url'] + stdout = stderr = None + if vcs == 'hg': + stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) + elif vcs == 'git': + stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master") + + return stdout, stderr + + +def set_anonymous_access(enable=True): + user = User.get_by_username(User.DEFAULT_USER) + user.active = enable + Session().add(user) + Session().commit() + print '\tanonymous access is now:', enable + if enable != User.get_by_username(User.DEFAULT_USER).active: + raise Exception('Cannot set anonymous access') + + +#============================================================================== +# TESTS +#============================================================================== + + +def _check_proper_git_push(stdout, stderr): + #WTF Git stderr is output ?! + assert 'fatal' not in stderr + assert 'rejected' not in stderr + assert 'Pushing to' in stderr + assert 'master -> master' in stderr + + +class TestVCSOperations(BaseTestCase): + + @classmethod + def setup_class(cls): + #DISABLE ANONYMOUS ACCESS + set_anonymous_access(False) + + def setUp(self): + r = Repository.get_by_repo_name(GIT_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + r = Repository.get_by_repo_name(HG_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + def test_clone_hg_repo_by_admin(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' + + def test_clone_git_repo_by_admin(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + assert 'Cloning into' in stdout + assert stderr == '' + + def test_clone_wrong_credentials_hg(self): + clone_url = _construct_url(HG_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: authorization failed' in stderr + + def test_clone_wrong_credentials_git(self): + clone_url = _construct_url(GIT_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'fatal: Authentication failed' in stderr + + def test_clone_git_dir_as_hg(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_hg_repo_as_git(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found' in stderr + + def test_clone_non_existing_path_hg(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_non_existing_path_git(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found' in stderr + + def test_push_new_file_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST) + + assert 'pushing to' in stdout + assert 'Repository size' in stdout + assert 'Last revision is now' in stdout + + def test_push_new_file_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + # commit some stuff into this repo + stdout, stderr = _add_files_and_push('git', DEST) + + print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] + _check_proper_git_push(stdout, stderr) + + def test_push_invalidates_cache_hg(self): + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==HG_REPO).scalar() + if not key: + key = CacheInvalidation(HG_REPO, HG_REPO) + + key.cache_active = True + Session().add(key) + Session().commit() + + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) + + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==HG_REPO).one() + self.assertEqual(key.cache_active, False) + + def test_push_invalidates_cache_git(self): + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==GIT_REPO).scalar() + if not key: + key = CacheInvalidation(GIT_REPO, GIT_REPO) + + key.cache_active = True + Session().add(key) + Session().commit() + + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + # commit some stuff into this repo + stdout, stderr = _add_files_and_push('git', DEST, files_no=1) + _check_proper_git_push(stdout, stderr) + + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==GIT_REPO).one() + print CacheInvalidation.get_all() + self.assertEqual(key.cache_active, False) + + def test_push_wrong_credentials_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, user='bad', + passwd='name') + + assert 'abort: authorization failed' in stderr + + def test_push_wrong_credentials_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, user='bad', + passwd='name') + + assert 'fatal: Authentication failed' in stderr + + def test_push_back_to_wrong_url_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'HTTP Error 404: Not Found' in stderr + + def test_push_back_to_wrong_url_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'not found' in stderr + + def test_clone_and_create_lock_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_and_create_lock_git(self): + # enable locking + r = Repository.get_by_repo_name(GIT_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_after_repo_was_locked_hg(self): + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + + def test_clone_after_repo_was_locked_git(self): + #lock repo + r = Repository.get_by_repo_name(GIT_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + msg = ("""The requested URL returned error: 423""") + assert msg in stderr + + def test_push_on_locked_repo_by_other_user_hg(self): + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + # let this user actually push ! + RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, + perm='repository.write') + Session().commit() + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + + #push fails repo is locked by other user ! + stdout, stderr = _add_files_and_push('hg', DEST, + user=TEST_USER_REGULAR_LOGIN, + passwd=TEST_USER_REGULAR_PASS) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + + def test_push_on_locked_repo_by_other_user_git(self): + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #lock repo + r = Repository.get_by_repo_name(GIT_REPO) + # let this user actually push ! + RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, + perm='repository.write') + Session().commit() + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + + #push fails repo is locked by other user ! + stdout, stderr = _add_files_and_push('git', DEST, + user=TEST_USER_REGULAR_LOGIN, + passwd=TEST_USER_REGULAR_PASS) + err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN) + assert err in stderr + + #TODO: fix this somehow later on Git, Git is stupid and even if we throw + #back 423 to it, it makes ANOTHER request and we fail there with 405 :/ + + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) + #msg = "405 Method Not Allowed" + #assert msg in stderr + + def test_push_unlocks_repository_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check for lock repo after clone + r = Repository.get_by_repo_name(HG_REPO) + uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + assert r.locked[0] == uid + + #push is ok and repo is now unlocked + stdout, stderr = _add_files_and_push('hg', DEST) + assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout + #we need to cleanup the Session Here ! + Session.remove() + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked == [None, None] + + #TODO: fix me ! somehow during tests hooks don't get called on Git + def test_push_unlocks_repository_git(self): + # enable locking + r = Repository.get_by_repo_name(GIT_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #check for lock repo after clone + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + #push is ok and repo is now unlocked + stdout, stderr = _add_files_and_push('git', DEST) + _check_proper_git_push(stdout, stderr) + + #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout + #we need to cleanup the Session Here ! + Session.remove() + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked == [None, None] + + def test_ip_restriction_hg(self): + user_model = UserModel() + try: + user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') + Session().commit() + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: HTTP Error 403: Forbidden' in stderr + finally: + #release IP restrictions + for ip in UserIpMap.getAll(): + UserIpMap.delete(ip.ip_id) + Session().commit() + + time.sleep(2) + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' + + def test_ip_restriction_git(self): + user_model = UserModel() + try: + user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') + Session().commit() + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + msg = ("""The requested URL returned error: 403""") + assert msg in stderr + finally: + #release IP restrictions + for ip in UserIpMap.getAll(): + UserIpMap.delete(ip.ip_id) + Session().commit() + + time.sleep(2) + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + assert 'Cloning into' in stdout + assert stderr == '' diff -r 552e47cd0f36 -r 55f855431f77 kallithea/tests/other/test_vcs_operations.py --- a/kallithea/tests/other/test_vcs_operations.py Thu Apr 23 14:14:51 2015 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,533 +0,0 @@ -# -*- coding: utf-8 -*- -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -kallithea.tests.test_scm_operations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Test suite for making push/pull operations. -Run using after doing paster serve test.ini:: - KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 nosetests kallithea/tests/other/test_vcs_operations.py - -You must have git > 1.8.1 for tests to work fine - -This file was forked by the Kallithea project in July 2014. -Original author and date, and relevant copyright and licensing information is below: -:created_on: Dec 30, 2010 -:author: marcink -:copyright: (c) 2013 RhodeCode GmbH, and others. -:license: GPLv3, see LICENSE.md for more details. - -""" - -import tempfile -import time -from os.path import join as jn - -from tempfile import _RandomNameSequence -from subprocess import Popen, PIPE - -from kallithea.tests import * -from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation -from kallithea.model.meta import Session -from kallithea.model.repo import RepoModel -from kallithea.model.user import UserModel - -DEBUG = True -HOST = '127.0.0.1:5000' # test host - - -class Command(object): - - def __init__(self, cwd): - self.cwd = cwd - - def execute(self, cmd, *args): - """ - Runs command on the system with given ``args``. - """ - - command = cmd + ' ' + ' '.join(args) - if DEBUG: - print '*** CMD %s ***' % command - p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) - stdout, stderr = p.communicate() - if DEBUG: - print stdout, stderr - return stdout, stderr - - -def _get_tmp_dir(): - return tempfile.mkdtemp(prefix='rc_integration_test') - - -def _construct_url(repo, dest=None, **kwargs): - if dest is None: - #make temp clone - dest = _get_tmp_dir() - params = { - 'user': TEST_USER_ADMIN_LOGIN, - 'passwd': TEST_USER_ADMIN_PASS, - 'host': HOST, - 'cloned_repo': repo, - 'dest': dest - } - params.update(**kwargs) - if params['user'] and params['passwd']: - _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params - else: - _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params - return _url - - -def _add_files_and_push(vcs, DEST, **kwargs): - """ - Generate some files, add it to DEST repo and push back - vcs is git or hg and defines what VCS we want to make those files for - - :param vcs: - :param DEST: - """ - # commit some stuff into this repo - cwd = path = jn(DEST) - #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) - added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) - Command(cwd).execute('touch %s' % added_file) - Command(cwd).execute('%s add %s' % (vcs, added_file)) - - for i in xrange(kwargs.get('files_no', 3)): - cmd = """echo 'added_line%s' >> %s""" % (i, added_file) - Command(cwd).execute(cmd) - author_str = 'Marcin Kuźminski ' - if vcs == 'hg': - cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( - i, author_str, added_file - ) - elif vcs == 'git': - cmd = """EMAIL="me@email.com" git commit -m 'commited new %s' --author '%s' %s """ % ( - i, author_str, added_file - ) - Command(cwd).execute(cmd) - - # PUSH it back - _REPO = None - if vcs == 'hg': - _REPO = HG_REPO - elif vcs == 'git': - _REPO = GIT_REPO - - kwargs['dest'] = '' - clone_url = _construct_url(_REPO, **kwargs) - if 'clone_url' in kwargs: - clone_url = kwargs['clone_url'] - stdout = stderr = None - if vcs == 'hg': - stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) - elif vcs == 'git': - stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master") - - return stdout, stderr - - -def set_anonymous_access(enable=True): - user = User.get_by_username(User.DEFAULT_USER) - user.active = enable - Session().add(user) - Session().commit() - print '\tanonymous access is now:', enable - if enable != User.get_by_username(User.DEFAULT_USER).active: - raise Exception('Cannot set anonymous access') - - -#============================================================================== -# TESTS -#============================================================================== - - -def _check_proper_git_push(stdout, stderr): - #WTF Git stderr is output ?! - assert 'fatal' not in stderr - assert 'rejected' not in stderr - assert 'Pushing to' in stderr - assert 'master -> master' in stderr - - -class TestVCSOperations(BaseTestCase): - - @classmethod - def setup_class(cls): - #DISABLE ANONYMOUS ACCESS - set_anonymous_access(False) - - def setUp(self): - r = Repository.get_by_repo_name(GIT_REPO) - Repository.unlock(r) - r.enable_locking = False - Session().add(r) - Session().commit() - - r = Repository.get_by_repo_name(HG_REPO) - Repository.unlock(r) - r.enable_locking = False - Session().add(r) - Session().commit() - - def test_clone_hg_repo_by_admin(self): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - assert 'requesting all changes' in stdout - assert 'adding changesets' in stdout - assert 'adding manifests' in stdout - assert 'adding file changes' in stdout - - assert stderr == '' - - def test_clone_git_repo_by_admin(self): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - assert 'Cloning into' in stdout - assert stderr == '' - - def test_clone_wrong_credentials_hg(self): - clone_url = _construct_url(HG_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'abort: authorization failed' in stderr - - def test_clone_wrong_credentials_git(self): - clone_url = _construct_url(GIT_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'fatal: Authentication failed' in stderr - - def test_clone_git_dir_as_hg(self): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - def test_clone_hg_repo_as_git(self): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found' in stderr - - def test_clone_non_existing_path_hg(self): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - def test_clone_non_existing_path_git(self): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found' in stderr - - def test_push_new_file_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST) - - assert 'pushing to' in stdout - assert 'Repository size' in stdout - assert 'Last revision is now' in stdout - - def test_push_new_file_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST) - - print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] - _check_proper_git_push(stdout, stderr) - - def test_push_invalidates_cache_hg(self): - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).scalar() - if not key: - key = CacheInvalidation(HG_REPO, HG_REPO) - - key.cache_active = True - Session().add(key) - Session().commit() - - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) - - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).one() - self.assertEqual(key.cache_active, False) - - def test_push_invalidates_cache_git(self): - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).scalar() - if not key: - key = CacheInvalidation(GIT_REPO, GIT_REPO) - - key.cache_active = True - Session().add(key) - Session().commit() - - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST, files_no=1) - _check_proper_git_push(stdout, stderr) - - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).one() - print CacheInvalidation.get_all() - self.assertEqual(key.cache_active, False) - - def test_push_wrong_credentials_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, user='bad', - passwd='name') - - assert 'abort: authorization failed' in stderr - - def test_push_wrong_credentials_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - stdout, stderr = _add_files_and_push('git', DEST, user='bad', - passwd='name') - - assert 'fatal: Authentication failed' in stderr - - def test_push_back_to_wrong_url_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, - clone_url='http://127.0.0.1:5000/tmp',) - - assert 'HTTP Error 404: Not Found' in stderr - - def test_push_back_to_wrong_url_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - stdout, stderr = _add_files_and_push('git', DEST, - clone_url='http://127.0.0.1:5000/tmp',) - - assert 'not found' in stderr - - def test_clone_and_create_lock_hg(self): - # enable locking - r = Repository.get_by_repo_name(HG_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - # clone - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #check if lock was made - r = Repository.get_by_repo_name(HG_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - def test_clone_and_create_lock_git(self): - # enable locking - r = Repository.get_by_repo_name(GIT_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - # clone - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - #check if lock was made - r = Repository.get_by_repo_name(GIT_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - def test_clone_after_repo_was_locked_hg(self): - #lock repo - r = Repository.get_by_repo_name(HG_REPO) - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - #pull fails since repo is locked - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" - % (HG_REPO, TEST_USER_ADMIN_LOGIN)) - assert msg in stderr - - def test_clone_after_repo_was_locked_git(self): - #lock repo - r = Repository.get_by_repo_name(GIT_REPO) - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - #pull fails since repo is locked - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - msg = ("""The requested URL returned error: 423""") - assert msg in stderr - - def test_push_on_locked_repo_by_other_user_hg(self): - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #lock repo - r = Repository.get_by_repo_name(HG_REPO) - # let this user actually push ! - RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, - perm='repository.write') - Session().commit() - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - - #push fails repo is locked by other user ! - stdout, stderr = _add_files_and_push('hg', DEST, - user=TEST_USER_REGULAR_LOGIN, - passwd=TEST_USER_REGULAR_PASS) - msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" - % (HG_REPO, TEST_USER_ADMIN_LOGIN)) - assert msg in stderr - - def test_push_on_locked_repo_by_other_user_git(self): - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - #lock repo - r = Repository.get_by_repo_name(GIT_REPO) - # let this user actually push ! - RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, - perm='repository.write') - Session().commit() - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - - #push fails repo is locked by other user ! - stdout, stderr = _add_files_and_push('git', DEST, - user=TEST_USER_REGULAR_LOGIN, - passwd=TEST_USER_REGULAR_PASS) - err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN) - assert err in stderr - - #TODO: fix this somehow later on Git, Git is stupid and even if we throw - #back 423 to it, it makes ANOTHER request and we fail there with 405 :/ - - msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" - % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) - #msg = "405 Method Not Allowed" - #assert msg in stderr - - def test_push_unlocks_repository_hg(self): - # enable locking - r = Repository.get_by_repo_name(HG_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #check for lock repo after clone - r = Repository.get_by_repo_name(HG_REPO) - uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - assert r.locked[0] == uid - - #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('hg', DEST) - assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout - #we need to cleanup the Session Here ! - Session.remove() - r = Repository.get_by_repo_name(HG_REPO) - assert r.locked == [None, None] - - #TODO: fix me ! somehow during tests hooks don't get called on Git - def test_push_unlocks_repository_git(self): - # enable locking - r = Repository.get_by_repo_name(GIT_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - #check for lock repo after clone - r = Repository.get_by_repo_name(GIT_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('git', DEST) - _check_proper_git_push(stdout, stderr) - - #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout - #we need to cleanup the Session Here ! - Session.remove() - r = Repository.get_by_repo_name(GIT_REPO) - assert r.locked == [None, None] - - def test_ip_restriction_hg(self): - user_model = UserModel() - try: - user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') - Session().commit() - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'abort: HTTP Error 403: Forbidden' in stderr - finally: - #release IP restrictions - for ip in UserIpMap.getAll(): - UserIpMap.delete(ip.ip_id) - Session().commit() - - time.sleep(2) - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - assert 'requesting all changes' in stdout - assert 'adding changesets' in stdout - assert 'adding manifests' in stdout - assert 'adding file changes' in stdout - - assert stderr == '' - - def test_ip_restriction_git(self): - user_model = UserModel() - try: - user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') - Session().commit() - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - msg = ("""The requested URL returned error: 403""") - assert msg in stderr - finally: - #release IP restrictions - for ip in UserIpMap.getAll(): - UserIpMap.delete(ip.ip_id) - Session().commit() - - time.sleep(2) - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - assert 'Cloning into' in stdout - assert stderr == ''