Mercurial > kallithea
view kallithea/tests/other/manual_test_vcs_operations.py @ 5967:5c549b26231a
tests: set LANG and LANGUAGE - vcs tests expect messages to be in english
Some systems work fine without - especially Windows seems to need it.
author | domruf <dominikruf@gmail.com> |
---|---|
date | Mon, 02 May 2016 22:54:09 +0200 |
parents | 04c82eb326fb |
children | 7db1bcf1d95b |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.tests.other.manual_test_vcs_operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Test suite for making push/pull operations. Run it in two terminals:: paster serve kallithea/tests/test.ini KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 nosetests kallithea/tests/other/manual_test_vcs_operations.py You must have git > 1.8.1 for tests to work fine This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Dec 30, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import os import re import tempfile import time from os.path import join as jn from tempfile import _RandomNameSequence from subprocess import Popen, PIPE from kallithea.tests import * from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation from kallithea.model.meta import Session from kallithea.model.repo import RepoModel from kallithea.model.user import UserModel DEBUG = True HOST = '127.0.0.1:4999' # test host class Command(object): def __init__(self, cwd): self.cwd = cwd def execute(self, cmd, *args): """ Runs command on the system with given ``args``. """ command = cmd + ' ' + ' '.join(args) if DEBUG: print '*** CMD %s ***' % command testenv = dict(os.environ) testenv['LANG'] = 'en_US.UTF-8' testenv['LANGUAGE'] = 'en_US:en' p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv) stdout, stderr = p.communicate() if DEBUG: print 'stdout:', repr(stdout) print 'stderr:', repr(stderr) return stdout, stderr def _get_tmp_dir(): return tempfile.mkdtemp(prefix='rc_integration_test') def _construct_url(repo, dest=None, **kwargs): if dest is None: #make temp clone dest = _get_tmp_dir() params = { 'user': TEST_USER_ADMIN_LOGIN, 'passwd': TEST_USER_ADMIN_PASS, 'host': HOST, 'cloned_repo': repo, 'dest': dest } params.update(**kwargs) if params['user'] and params['passwd']: _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params else: _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params return _url def _add_files_and_push(vcs, DEST, **kwargs): """ Generate some files, add it to DEST repo and push back vcs is git or hg and defines what VCS we want to make those files for :param vcs: :param DEST: """ # commit some stuff into this repo cwd = path = jn(DEST) #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) Command(cwd).execute('touch %s' % added_file) Command(cwd).execute('%s add %s' % (vcs, added_file)) if os.name == 'nt': author_str = 'User <me@example.com>' else: author_str = 'User ǝɯɐᴎ <me@example.com>' for i in xrange(kwargs.get('files_no', 3)): cmd = """echo "added_line%s" >> %s""" % (i, added_file) Command(cwd).execute(cmd) if vcs == 'hg': cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % ( i, author_str, added_file ) elif vcs == 'git': cmd = """EMAIL="me@example.com" git commit -m "committed new %s" --author "%s" "%s" """ % ( i, author_str, added_file ) Command(cwd).execute(cmd) # PUSH it back _REPO = None if vcs == 'hg': _REPO = HG_REPO elif vcs == 'git': _REPO = GIT_REPO kwargs['dest'] = '' clone_url = _construct_url(_REPO, **kwargs) if 'clone_url' in kwargs: clone_url = kwargs['clone_url'] stdout = stderr = None if vcs == 'hg': stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) elif vcs == 'git': stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master") return stdout, stderr def set_anonymous_access(enable=True): user = User.get_by_username(User.DEFAULT_USER) user.active = enable Session().add(user) Session().commit() print '\tanonymous access is now:', enable if enable != User.get_by_username(User.DEFAULT_USER).active: raise Exception('Cannot set anonymous access') #============================================================================== # TESTS #============================================================================== def _check_proper_git_push(stdout, stderr): #WTF Git stderr is output ?! assert 'fatal' not in stderr assert 'rejected' not in stderr assert 'Pushing to' in stderr assert 'master -> master' in stderr class TestVCSOperations(TestController): @classmethod def setup_class(cls): #DISABLE ANONYMOUS ACCESS set_anonymous_access(False) def setup_method(self, method): r = Repository.get_by_repo_name(GIT_REPO) Repository.unlock(r) r.enable_locking = False Session().add(r) Session().commit() r = Repository.get_by_repo_name(HG_REPO) Repository.unlock(r) r.enable_locking = False Session().add(r) Session().commit() def test_clone_hg_repo_by_admin(self): clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' def test_clone_git_repo_by_admin(self): clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' def test_clone_wrong_credentials_hg(self): clone_url = _construct_url(HG_REPO, passwd='bad!') stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'abort: authorization failed' in stderr def test_clone_wrong_credentials_git(self): clone_url = _construct_url(GIT_REPO, passwd='bad!') stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) assert 'fatal: Authentication failed' in stderr def test_clone_git_dir_as_hg(self): clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'HTTP Error 404: Not Found' in stderr def test_clone_hg_repo_as_git(self): clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) assert 'not found' in stderr def test_clone_non_existing_path_hg(self): clone_url = _construct_url('trololo') stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'HTTP Error 404: Not Found' in stderr def test_clone_non_existing_path_git(self): clone_url = _construct_url('trololo') stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) assert 'not found' in stderr def test_push_new_file_hg(self): DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) stdout, stderr = _add_files_and_push('hg', DEST) assert 'pushing to' in stdout assert 'Repository size' in stdout assert 'Last revision is now' in stdout def test_push_new_file_git(self): DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) # commit some stuff into this repo stdout, stderr = _add_files_and_push('git', DEST) print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] _check_proper_git_push(stdout, stderr) def test_push_invalidates_cache_hg(self): key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==HG_REPO).scalar() if not key: key = CacheInvalidation(HG_REPO, HG_REPO) key.cache_active = True Session().add(key) Session().commit() DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==HG_REPO).all() assert key == [] def test_push_invalidates_cache_git(self): key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==GIT_REPO).scalar() if not key: key = CacheInvalidation(GIT_REPO, GIT_REPO) key.cache_active = True Session().add(key) Session().commit() DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) # commit some stuff into this repo stdout, stderr = _add_files_and_push('git', DEST, files_no=1) _check_proper_git_push(stdout, stderr) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==GIT_REPO).all() assert key == [] def test_push_wrong_credentials_hg(self): DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) stdout, stderr = _add_files_and_push('hg', DEST, user='bad', passwd='name') assert 'abort: authorization failed' in stderr def test_push_wrong_credentials_git(self): DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) stdout, stderr = _add_files_and_push('git', DEST, user='bad', passwd='name') assert 'fatal: Authentication failed' in stderr def test_push_back_to_wrong_url_hg(self): DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) stdout, stderr = _add_files_and_push('hg', DEST, clone_url='http://%s/tmp' % HOST) assert 'HTTP Error 404: Not Found' in stderr def test_push_back_to_wrong_url_git(self): DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) stdout, stderr = _add_files_and_push('git', DEST, clone_url='http://%s/tmp' % HOST) assert 'not found' in stderr def test_clone_and_create_lock_hg(self): # enable locking r = Repository.get_by_repo_name(HG_REPO) r.enable_locking = True Session().add(r) Session().commit() # clone clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) #check if lock was made r = Repository.get_by_repo_name(HG_REPO) assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id def test_clone_and_create_lock_git(self): # enable locking r = Repository.get_by_repo_name(GIT_REPO) r.enable_locking = True Session().add(r) Session().commit() # clone clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) #check if lock was made r = Repository.get_by_repo_name(GIT_REPO) assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id def test_clone_after_repo_was_locked_hg(self): #lock repo r = Repository.get_by_repo_name(HG_REPO) Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #pull fails since repo is locked clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" % (HG_REPO, TEST_USER_ADMIN_LOGIN)) assert msg in stderr def test_clone_after_repo_was_locked_git(self): #lock repo r = Repository.get_by_repo_name(GIT_REPO) Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #pull fails since repo is locked clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) msg = ("""The requested URL returned error: 423""") assert msg in stderr def test_push_on_locked_repo_by_other_user_hg(self): #clone some temp DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) #lock repo r = Repository.get_by_repo_name(HG_REPO) # let this user actually push ! RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, perm='repository.write') Session().commit() Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #push fails repo is locked by other user ! stdout, stderr = _add_files_and_push('hg', DEST, user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS) msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" % (HG_REPO, TEST_USER_ADMIN_LOGIN)) assert msg in stderr def test_push_on_locked_repo_by_other_user_git(self): #clone some temp DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) #lock repo r = Repository.get_by_repo_name(GIT_REPO) # let this user actually push ! RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, perm='repository.write') Session().commit() Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #push fails repo is locked by other user ! stdout, stderr = _add_files_and_push('git', DEST, user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS) err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN) assert err in stderr #TODO: fix this somehow later on Git, Git is stupid and even if we throw #back 423 to it, it makes ANOTHER request and we fail there with 405 :/ msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) #msg = "405 Method Not Allowed" #assert msg in stderr def test_push_unlocks_repository_hg(self): # enable locking r = Repository.get_by_repo_name(HG_REPO) r.enable_locking = True Session().add(r) Session().commit() #clone some temp DEST = _get_tmp_dir() clone_url = _construct_url(HG_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) #check for lock repo after clone r = Repository.get_by_repo_name(HG_REPO) uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id assert r.locked[0] == uid #push is ok and repo is now unlocked stdout, stderr = _add_files_and_push('hg', DEST) assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout #we need to cleanup the Session Here ! Session.remove() r = Repository.get_by_repo_name(HG_REPO) assert r.locked == [None, None] #TODO: fix me ! somehow during tests hooks don't get called on Git def test_push_unlocks_repository_git(self): # enable locking r = Repository.get_by_repo_name(GIT_REPO) r.enable_locking = True Session().add(r) Session().commit() #clone some temp DEST = _get_tmp_dir() clone_url = _construct_url(GIT_REPO, dest=DEST) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) #check for lock repo after clone r = Repository.get_by_repo_name(GIT_REPO) assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id #push is ok and repo is now unlocked stdout, stderr = _add_files_and_push('git', DEST) _check_proper_git_push(stdout, stderr) #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout #we need to cleanup the Session Here ! Session.remove() r = Repository.get_by_repo_name(GIT_REPO) assert r.locked == [None, None] def test_ip_restriction_hg(self): user_model = UserModel() try: user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') Session().commit() clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'abort: HTTP Error 403: Forbidden' in stderr finally: #release IP restrictions for ip in UserIpMap.getAll(): UserIpMap.delete(ip.ip_id) Session().commit() time.sleep(2) clone_url = _construct_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url) assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' def test_ip_restriction_git(self): user_model = UserModel() try: user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') Session().commit() clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) # The message apparently changed in Git 1.8.3, so match it loosely. assert re.search(r'\b403\b', stderr) finally: #release IP restrictions for ip in UserIpMap.getAll(): UserIpMap.delete(ip.ip_id) Session().commit() time.sleep(2) clone_url = _construct_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url) assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == ''