Mercurial > kallithea
view kallithea/tests/other/test_vcs_operations.py @ 8987:d6d3cb5991e2 stable
tests: stabilize Git committer in test_vcs_operations
Git tries to find out name and email in this order:
1. The author can be set e.g. via the `--author` option of `git commit`.
2. If set, the environment variables GIT_AUTHOR_NAME, GIT_AUTHOR_EMAIL,
GIT_COMMITTER_NAME and GIT_COMMITTER_EMAIL are taken.
3. If set, various (global) config files are considered.
4. Unless disabled by the user.useconfigonly config, the names and emails are
inferred from various system sources such as various fields from /etc/passwd,
/etc/mailname and the environment variable EMAIL.
The author can be provided on the command line (1), but that is not possible
for the committer.
It is not an option to modify Git’s configuration files, so the result of (3)
depends on the system the tests run on, which should be avoided. A follow-up
patch will try to instruct Git to not read the system Git configuration files.
(4) is also system-dependent. On some systems, (4) is disabled in the Git
configuration. If enabled, Git will try to infer the committer name from the
gecko field in /etc/passwd, but will fail if it is empty. The previous code
passed the environment variable EMAIL to provide the corresponding email
address.
By passing the names and emails via (2), we can set the author and committer
name and email uniformly and prevent Git from using the system-dependent ways
(3) and (4). This will replace the use of of EMAIL. The environment variables
were introduced in 2005, so there should be no backwards compatibility
problems.
The tests will specify --author explicitly in the cases where the actual name
matters. We just need default values that can be used for committing when we
don't care.
We set it as static defaults to:
Author: test_regular <test_regular@example.com>
Commit: test_admin <test_admin@example.com>
Based on changes and research by Manuel Jacob <me@manueljacob.de>.
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Thu, 20 Apr 2023 00:30:29 +0200 |
parents | 01e123180339 |
children | 511b20a6475d |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Test suite for vcs push/pull operations. The tests need Git > 1.8.1. This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Dec 30, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import json import os import re import tempfile import time import urllib.request from subprocess import PIPE, Popen from tempfile import _RandomNameSequence import pytest import kallithea from kallithea.lib.utils2 import ascii_bytes, safe_str from kallithea.model import db, meta from kallithea.model.ssh_key import SshKeyModel from kallithea.model.user import UserModel from kallithea.tests import base from kallithea.tests.fixture import Fixture DEBUG = True HOST = '127.0.0.1:4999' # test host fixture = Fixture() # Parameterize different kinds of VCS testing - both the kind of VCS and the # access method (HTTP/SSH) # Mixin for using HTTP and SSH URLs class HttpVcsTest(object): @staticmethod def repo_url_param(webserver, repo_name, **kwargs): return webserver.repo_url(repo_name, **kwargs) class SshVcsTest(object): public_keys = { base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost', base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost', } @classmethod def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR): user = db.User.get_by_username(username) if user.ssh_keys: ssh_key = user.ssh_keys[0] else: sshkeymodel = SshKeyModel() ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username]) meta.Session().commit() return cls._ssh_param(repo_name, user, ssh_key, client_ip) # Mixins for using Mercurial and Git class HgVcsTest(object): repo_type = 'hg' repo_name = base.HG_REPO class GitVcsTest(object): repo_type = 'git' repo_name = base.GIT_REPO # Combine mixins to give the combinations we want to parameterize tests with class HgHttpVcsTest(HgVcsTest, HttpVcsTest): pass class GitHttpVcsTest(GitVcsTest, HttpVcsTest): pass class HgSshVcsTest(HgVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Specify a custom ssh command on the command line return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id, repo_name) class GitSshVcsTest(GitVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Set a custom ssh command in the global environment os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id) return "ssh://someuser@somehost/%s""" % repo_name parametrize_vcs_test = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, HgSshVcsTest, GitSshVcsTest, ]) parametrize_vcs_test_hg = base.parametrize('vt', [ HgHttpVcsTest, HgSshVcsTest, ]) parametrize_vcs_test_http = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, ]) class Command(object): def __init__(self, cwd): self.cwd = cwd def execute(self, *args, **environ): """ Runs command on the system with given ``args`` using simple space join without safe quoting. """ command = ' '.join(args) ignoreReturnCode = environ.pop('ignoreReturnCode', False) if DEBUG: print('*** CMD %s ***' % command) testenv = dict(os.environ) testenv['LANG'] = 'en_US.UTF-8' testenv['LANGUAGE'] = 'en_US:en' testenv['HGPLAIN'] = '' testenv['HGRCPATH'] = '' testenv['GIT_COMMITTER_NAME'] = base.TEST_USER_ADMIN_LOGIN testenv['GIT_COMMITTER_EMAIL'] = base.TEST_USER_ADMIN_EMAIL testenv['GIT_AUTHOR_NAME'] = base.TEST_USER_REGULAR_LOGIN testenv['GIT_AUTHOR_EMAIL'] = base.TEST_USER_REGULAR_EMAIL testenv.update(environ) p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv) stdout, stderr = p.communicate() if DEBUG: if stdout: print('stdout:', stdout) if stderr: print('stderr:', stderr) if not ignoreReturnCode: assert p.returncode == 0 return safe_str(stdout), safe_str(stderr) def _get_tmp_dir(prefix='vcs_operations-', suffix=''): return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix) def _add_files(vcs, dest_dir, files_no=3): """ Generate some files, add it to dest_dir repo and push back vcs is git or hg and defines what VCS we want to make those files for :param vcs: :param dest_dir: """ added_file = '%ssetup.py' % next(_RandomNameSequence()) open(os.path.join(dest_dir, added_file), 'a').close() Command(dest_dir).execute(vcs, 'add', added_file) email = 'me@example.com' if os.name == 'nt': author_str = 'User <%s>' % email else: author_str = 'User ǝɯɐᴎ <%s>' % email for i in range(files_no): cmd = """echo "added_line%s" >> %s""" % (i, added_file) Command(dest_dir).execute(cmd) if vcs == 'hg': cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % ( i, author_str, added_file ) elif vcs == 'git': cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % ( i, author_str, added_file ) Command(dest_dir).execute(cmd) def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3): _add_files(vt.repo_type, dest_dir, files_no=files_no) # PUSH it back stdout = stderr = None if vt.repo_type == 'hg': stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode) elif vt.repo_type == 'git': stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode) return stdout, stderr def _check_outgoing(vcs, cwd, clone_url): if vcs == 'hg': # hg removes the password from default URLs, so we have to provide it here via the clone_url return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True) elif vcs == 'git': Command(cwd).execute('git remote update') return Command(cwd).execute('git log origin/master..master') def set_anonymous_access(enable=True): user = db.User.get_default_user() user.active = enable meta.Session().commit() if enable != db.User.get_default_user().active: raise Exception('Cannot set anonymous access') #============================================================================== # TESTS #============================================================================== def _check_proper_git_push(stdout, stderr): assert 'fatal' not in stderr assert 'rejected' not in stderr assert 'Pushing to' in stderr assert 'master -> master' in stderr @pytest.mark.usefixtures("test_context_fixture") class TestVCSOperations(base.TestController): @classmethod def setup_class(cls): # DISABLE ANONYMOUS ACCESS set_anonymous_access(False) @pytest.fixture() def testhook_cleanup(self): yield # remove hook for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']: entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook) if entry: meta.Session().delete(entry) meta.Session().commit() @pytest.fixture(scope="module") def testfork(self): # create fork so the repo stays untouched git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence())) fixture.create_fork(base.GIT_REPO, git_fork_name) hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence())) fixture.create_fork(base.HG_REPO, hg_fork_name) return {'git': git_fork_name, 'hg': hg_fork_name} @parametrize_vcs_test def test_clone_repo_by_admin(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_http def test_clone_wrong_credentials(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr def test_clone_git_dir_as_hg(self, webserver): clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr def test_clone_hg_repo_as_git(self, webserver): clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'not found' in stderr @parametrize_vcs_test def test_clone_non_existing_path(self, webserver, vt): clone_url = vt.repo_url_param(webserver, 'trololo') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr @parametrize_vcs_test def test_push_new_repo(self, webserver, vt): # Clear the log so we know what is added db.UserLog.query().delete() meta.Session().commit() # Create an empty server repo using the API repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence())) usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) params = { "id": 7, "api_key": usr.api_key, "method": 'create_repo', "args": dict(repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=vt.repo_type), } req = urllib.request.Request( 'http://%s:%s/_admin/api' % webserver.server_address, data=ascii_bytes(json.dumps(params)), headers={'content-type': 'application/json'}) response = urllib.request.urlopen(req) result = json.loads(response.read()) # Expect something like: # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None} assert result['result']['success'] # Create local clone of the empty server repo local_clone_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir) # Make 3 commits and push to the empty server repo. # The server repo doesn't have any other heads than the # refs/heads/master we are pushing, but the `git log` in the push hook # should still list the 3 commits. stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to ' in stdout assert 'remote: added ' in stdout # Verify that we got the right events in UserLog. Expect something like: # <UserLog('id:new_git_XXX:started_following_repo')> # <UserLog('id:new_git_XXX:user_created_repo')> # <UserLog('id:new_git_XXX:pull')> # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')> meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([ ('started_following_repo', 0), ('user_created_repo', 0), ('pull', 0), ('push', 3)] if vt.repo_type == 'git' else [ ('started_following_repo', 0), ('user_created_repo', 0), # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones ('push', 3)]) @parametrize_vcs_test def test_push_new_file(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to' in stdout assert 'Repository size' in stdout assert 'Last revision is now' in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0), ('push', 3)] @parametrize_vcs_test def test_pull(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url) meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) if vt.repo_type == 'git': assert 'FETCH_HEAD' in stderr elif vt.repo_type == 'hg': assert 'new changesets' in stdout action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert action_parts == ['pull'] # Test handling of URLs with extra '/' around repo_name stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True) if issubclass(vt, HttpVcsTest): if vt.repo_type == 'git': # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "abort: HTTP Error 404: Not Found" in stderr else: assert issubclass(vt, SshVcsTest) if vt.repo_type == 'git': assert "abort: Access to './%s' denied" % vt.repo_name in stderr else: assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "no changes found" in stdout assert "denied" not in stderr assert "denied" not in stdout assert "404" not in stdout @parametrize_vcs_test def test_push_invalidates_cache(self, webserver, testfork, vt): pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) meta.Session.close() # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] assert pre_cached_tip != post_cached_tip @parametrize_vcs_test_http def test_push_wrong_credentials(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name') stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url, ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr @parametrize_vcs_test def test_push_with_readonly_credentials(self, webserver, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url) if vt.repo_type == 'git': assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0)] @parametrize_vcs_test def test_push_back_to_wrong_url(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push( webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % ( webserver.server_address[0], webserver.server_address[1]), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr @parametrize_vcs_test def test_ip_restriction(self, webserver, vt): user_model = UserModel() try: # Add IP constraint that excludes the test context: user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': # The message apparently changed in Git 1.8.3, so match it loosely. assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr finally: # release IP restrictions for ip in db.UserIpMap.query(): db.UserIpMap.delete(ip.ip_id) meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns True) db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH) \ .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True) if vt.repo_type == 'hg': assert 'preoutgoing.testhook hook failed' in stdout + stderr elif vt.repo_type == 'git': assert 'error: 406' in stderr @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns exit code 1) db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=True) assert 'failing_test_hook failed' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' in stdout + stderr # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to exception throwing method db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) if vt is HgHttpVcsTest: # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr elif vt is HgSshVcsTest: assert 'remote: Exception: exception_test_hook threw an exception' in stdout else: assert False # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to method that returns False db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) assert 'passing_test_hook succeeded' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' not in stdout + stderr # no more outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout == '' assert stderr == '' def test_add_submodule_git(self, webserver, testfork): dest_dir = _get_tmp_dir() clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git']) # add submodule stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir) stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule') stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"') stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master') # check for testsubmodule link in files page self.log_user() response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/')) # check _repo_files_url that will be used to reload as AJAX response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git']) response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url) # check that following a submodule link actually works - and redirects response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/testsubmodule'), status=302) assert response.location == clone_url