Mercurial > kallithea
view kallithea/tests/other/test_vcs_operations.py @ 8988:511b20a6475d stable
tests: skip reading Git system and global configuration in test_vcs_operations
The parent changeset reduced the dependency on global configuration and made it
possible to run tests without any global git configuration. But it is still
unfortunate to even look at the global configuration when running tests.
Global configuration is already disabled for Mercurial by setting HGRCPATH.
Now do something similar for Git. According to the git man page,
GIT_CONFIG_GLOBAL and GIT_CONFIG_SYSTEM set to /dev/null will make Git skip
reading the configuration files on all platforms.
Note that the GIT_CONFIG variables were introduced in Git 2.32.0, so this will
not work with all the Git versions supported by Kallithea.
author | Manuel Jacob <me@manueljacob.de> |
---|---|
date | Thu, 30 Mar 2023 03:22:35 +0200 |
parents | d6d3cb5991e2 |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Test suite for vcs push/pull operations. The tests need Git > 1.8.1. This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Dec 30, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import json import os import re import tempfile import time import urllib.request from subprocess import PIPE, Popen from tempfile import _RandomNameSequence import pytest import kallithea from kallithea.lib.utils2 import ascii_bytes, safe_str from kallithea.model import db, meta from kallithea.model.ssh_key import SshKeyModel from kallithea.model.user import UserModel from kallithea.tests import base from kallithea.tests.fixture import Fixture DEBUG = True HOST = '127.0.0.1:4999' # test host fixture = Fixture() # Parameterize different kinds of VCS testing - both the kind of VCS and the # access method (HTTP/SSH) # Mixin for using HTTP and SSH URLs class HttpVcsTest(object): @staticmethod def repo_url_param(webserver, repo_name, **kwargs): return webserver.repo_url(repo_name, **kwargs) class SshVcsTest(object): public_keys = { base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost', base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost', } @classmethod def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR): user = db.User.get_by_username(username) if user.ssh_keys: ssh_key = user.ssh_keys[0] else: sshkeymodel = SshKeyModel() ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username]) meta.Session().commit() return cls._ssh_param(repo_name, user, ssh_key, client_ip) # Mixins for using Mercurial and Git class HgVcsTest(object): repo_type = 'hg' repo_name = base.HG_REPO class GitVcsTest(object): repo_type = 'git' repo_name = base.GIT_REPO # Combine mixins to give the combinations we want to parameterize tests with class HgHttpVcsTest(HgVcsTest, HttpVcsTest): pass class GitHttpVcsTest(GitVcsTest, HttpVcsTest): pass class HgSshVcsTest(HgVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Specify a custom ssh command on the command line return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id, repo_name) class GitSshVcsTest(GitVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Set a custom ssh command in the global environment os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id) return "ssh://someuser@somehost/%s""" % repo_name parametrize_vcs_test = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, HgSshVcsTest, GitSshVcsTest, ]) parametrize_vcs_test_hg = base.parametrize('vt', [ HgHttpVcsTest, HgSshVcsTest, ]) parametrize_vcs_test_http = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, ]) class Command(object): def __init__(self, cwd): self.cwd = cwd def execute(self, *args, **environ): """ Runs command on the system with given ``args`` using simple space join without safe quoting. """ command = ' '.join(args) ignoreReturnCode = environ.pop('ignoreReturnCode', False) if DEBUG: print('*** CMD %s ***' % command) testenv = dict(os.environ) testenv['LANG'] = 'en_US.UTF-8' testenv['LANGUAGE'] = 'en_US:en' testenv['HGPLAIN'] = '' testenv['HGRCPATH'] = '' testenv['GIT_CONFIG_SYSTEM'] = '/dev/null' testenv['GIT_CONFIG_GLOBAL'] = '/dev/null' testenv['GIT_COMMITTER_NAME'] = base.TEST_USER_ADMIN_LOGIN testenv['GIT_COMMITTER_EMAIL'] = base.TEST_USER_ADMIN_EMAIL testenv['GIT_AUTHOR_NAME'] = base.TEST_USER_REGULAR_LOGIN testenv['GIT_AUTHOR_EMAIL'] = base.TEST_USER_REGULAR_EMAIL testenv.update(environ) p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv) stdout, stderr = p.communicate() if DEBUG: if stdout: print('stdout:', stdout) if stderr: print('stderr:', stderr) if not ignoreReturnCode: assert p.returncode == 0 return safe_str(stdout), safe_str(stderr) def _get_tmp_dir(prefix='vcs_operations-', suffix=''): return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix) def _add_files(vcs, dest_dir, files_no=3): """ Generate some files, add it to dest_dir repo and push back vcs is git or hg and defines what VCS we want to make those files for :param vcs: :param dest_dir: """ added_file = '%ssetup.py' % next(_RandomNameSequence()) open(os.path.join(dest_dir, added_file), 'a').close() Command(dest_dir).execute(vcs, 'add', added_file) email = 'me@example.com' if os.name == 'nt': author_str = 'User <%s>' % email else: author_str = 'User ǝɯɐᴎ <%s>' % email for i in range(files_no): cmd = """echo "added_line%s" >> %s""" % (i, added_file) Command(dest_dir).execute(cmd) if vcs == 'hg': cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % ( i, author_str, added_file ) elif vcs == 'git': cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % ( i, author_str, added_file ) Command(dest_dir).execute(cmd) def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3): _add_files(vt.repo_type, dest_dir, files_no=files_no) # PUSH it back stdout = stderr = None if vt.repo_type == 'hg': stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode) elif vt.repo_type == 'git': stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode) return stdout, stderr def _check_outgoing(vcs, cwd, clone_url): if vcs == 'hg': # hg removes the password from default URLs, so we have to provide it here via the clone_url return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True) elif vcs == 'git': Command(cwd).execute('git remote update') return Command(cwd).execute('git log origin/master..master') def set_anonymous_access(enable=True): user = db.User.get_default_user() user.active = enable meta.Session().commit() if enable != db.User.get_default_user().active: raise Exception('Cannot set anonymous access') #============================================================================== # TESTS #============================================================================== def _check_proper_git_push(stdout, stderr): assert 'fatal' not in stderr assert 'rejected' not in stderr assert 'Pushing to' in stderr assert 'master -> master' in stderr @pytest.mark.usefixtures("test_context_fixture") class TestVCSOperations(base.TestController): @classmethod def setup_class(cls): # DISABLE ANONYMOUS ACCESS set_anonymous_access(False) @pytest.fixture() def testhook_cleanup(self): yield # remove hook for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']: entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook) if entry: meta.Session().delete(entry) meta.Session().commit() @pytest.fixture(scope="module") def testfork(self): # create fork so the repo stays untouched git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence())) fixture.create_fork(base.GIT_REPO, git_fork_name) hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence())) fixture.create_fork(base.HG_REPO, hg_fork_name) return {'git': git_fork_name, 'hg': hg_fork_name} @parametrize_vcs_test def test_clone_repo_by_admin(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_http def test_clone_wrong_credentials(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr def test_clone_git_dir_as_hg(self, webserver): clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr def test_clone_hg_repo_as_git(self, webserver): clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'not found' in stderr @parametrize_vcs_test def test_clone_non_existing_path(self, webserver, vt): clone_url = vt.repo_url_param(webserver, 'trololo') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr @parametrize_vcs_test def test_push_new_repo(self, webserver, vt): # Clear the log so we know what is added db.UserLog.query().delete() meta.Session().commit() # Create an empty server repo using the API repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence())) usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) params = { "id": 7, "api_key": usr.api_key, "method": 'create_repo', "args": dict(repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=vt.repo_type), } req = urllib.request.Request( 'http://%s:%s/_admin/api' % webserver.server_address, data=ascii_bytes(json.dumps(params)), headers={'content-type': 'application/json'}) response = urllib.request.urlopen(req) result = json.loads(response.read()) # Expect something like: # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None} assert result['result']['success'] # Create local clone of the empty server repo local_clone_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir) # Make 3 commits and push to the empty server repo. # The server repo doesn't have any other heads than the # refs/heads/master we are pushing, but the `git log` in the push hook # should still list the 3 commits. stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to ' in stdout assert 'remote: added ' in stdout # Verify that we got the right events in UserLog. Expect something like: # <UserLog('id:new_git_XXX:started_following_repo')> # <UserLog('id:new_git_XXX:user_created_repo')> # <UserLog('id:new_git_XXX:pull')> # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')> meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([ ('started_following_repo', 0), ('user_created_repo', 0), ('pull', 0), ('push', 3)] if vt.repo_type == 'git' else [ ('started_following_repo', 0), ('user_created_repo', 0), # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones ('push', 3)]) @parametrize_vcs_test def test_push_new_file(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to' in stdout assert 'Repository size' in stdout assert 'Last revision is now' in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0), ('push', 3)] @parametrize_vcs_test def test_pull(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url) meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) if vt.repo_type == 'git': assert 'FETCH_HEAD' in stderr elif vt.repo_type == 'hg': assert 'new changesets' in stdout action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert action_parts == ['pull'] # Test handling of URLs with extra '/' around repo_name stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True) if issubclass(vt, HttpVcsTest): if vt.repo_type == 'git': # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "abort: HTTP Error 404: Not Found" in stderr else: assert issubclass(vt, SshVcsTest) if vt.repo_type == 'git': assert "abort: Access to './%s' denied" % vt.repo_name in stderr else: assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "no changes found" in stdout assert "denied" not in stderr assert "denied" not in stdout assert "404" not in stdout @parametrize_vcs_test def test_push_invalidates_cache(self, webserver, testfork, vt): pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) meta.Session.close() # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] assert pre_cached_tip != post_cached_tip @parametrize_vcs_test_http def test_push_wrong_credentials(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name') stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url, ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr @parametrize_vcs_test def test_push_with_readonly_credentials(self, webserver, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url) if vt.repo_type == 'git': assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0)] @parametrize_vcs_test def test_push_back_to_wrong_url(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push( webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % ( webserver.server_address[0], webserver.server_address[1]), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr @parametrize_vcs_test def test_ip_restriction(self, webserver, vt): user_model = UserModel() try: # Add IP constraint that excludes the test context: user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': # The message apparently changed in Git 1.8.3, so match it loosely. assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr finally: # release IP restrictions for ip in db.UserIpMap.query(): db.UserIpMap.delete(ip.ip_id) meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns True) db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH) \ .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True) if vt.repo_type == 'hg': assert 'preoutgoing.testhook hook failed' in stdout + stderr elif vt.repo_type == 'git': assert 'error: 406' in stderr @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns exit code 1) db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=True) assert 'failing_test_hook failed' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' in stdout + stderr # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to exception throwing method db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) if vt is HgHttpVcsTest: # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr elif vt is HgSshVcsTest: assert 'remote: Exception: exception_test_hook threw an exception' in stdout else: assert False # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to method that returns False db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) assert 'passing_test_hook succeeded' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' not in stdout + stderr # no more outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout == '' assert stderr == '' def test_add_submodule_git(self, webserver, testfork): dest_dir = _get_tmp_dir() clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git']) # add submodule stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir) stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule') stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"') stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master') # check for testsubmodule link in files page self.log_user() response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/')) # check _repo_files_url that will be used to reload as AJAX response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git']) response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url) # check that following a submodule link actually works - and redirects response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/testsubmodule'), status=302) assert response.location == clone_url