view kallithea/tests/other/test_vcs_operations.py @ 8984:55715fe0a8e1 stable

meta: update copyrights
author Mads Kiilerich <mads@kiilerich.com>
date Fri, 31 Mar 2023 21:17:02 +0200
parents 01e123180339
children d6d3cb5991e2
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Test suite for vcs push/pull operations.

The tests need Git > 1.8.1.

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.

"""

import json
import os
import re
import tempfile
import time
import urllib.request
from subprocess import PIPE, Popen
from tempfile import _RandomNameSequence

import pytest

import kallithea
from kallithea.lib.utils2 import ascii_bytes, safe_str
from kallithea.model import db, meta
from kallithea.model.ssh_key import SshKeyModel
from kallithea.model.user import UserModel
from kallithea.tests import base
from kallithea.tests.fixture import Fixture


DEBUG = True
HOST = '127.0.0.1:4999'  # test host

fixture = Fixture()


# Parameterize different kinds of VCS testing - both the kind of VCS and the
# access method (HTTP/SSH)

# Mixin for using HTTP and SSH URLs
class HttpVcsTest(object):
    @staticmethod
    def repo_url_param(webserver, repo_name, **kwargs):
        return webserver.repo_url(repo_name, **kwargs)

class SshVcsTest(object):
    public_keys = {
        base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
        base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
    }

    @classmethod
    def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
        user = db.User.get_by_username(username)
        if user.ssh_keys:
            ssh_key = user.ssh_keys[0]
        else:
            sshkeymodel = SshKeyModel()
            ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username])
            meta.Session().commit()

        return cls._ssh_param(repo_name, user, ssh_key, client_ip)

# Mixins for using Mercurial and Git
class HgVcsTest(object):
    repo_type = 'hg'
    repo_name = base.HG_REPO

class GitVcsTest(object):
    repo_type = 'git'
    repo_name = base.GIT_REPO

# Combine mixins to give the combinations we want to parameterize tests with
class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
    pass

class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
    pass

class HgSshVcsTest(HgVcsTest, SshVcsTest):
    @staticmethod
    def _ssh_param(repo_name, user, ssh_key, client_ip):
        # Specify a custom ssh command on the command line
        return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
            client_ip,
            kallithea.CONFIG['__file__'],
            user.user_id,
            ssh_key.user_ssh_key_id,
            repo_name)

class GitSshVcsTest(GitVcsTest, SshVcsTest):
    @staticmethod
    def _ssh_param(repo_name, user, ssh_key, client_ip):
        # Set a custom ssh command in the global environment
        os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
            client_ip,
            kallithea.CONFIG['__file__'],
            user.user_id,
            ssh_key.user_ssh_key_id)
        return "ssh://someuser@somehost/%s""" % repo_name

parametrize_vcs_test = base.parametrize('vt', [
    HgHttpVcsTest,
    GitHttpVcsTest,
    HgSshVcsTest,
    GitSshVcsTest,
])
parametrize_vcs_test_hg = base.parametrize('vt', [
    HgHttpVcsTest,
    HgSshVcsTest,
])
parametrize_vcs_test_http = base.parametrize('vt', [
    HgHttpVcsTest,
    GitHttpVcsTest,
])

class Command(object):

    def __init__(self, cwd):
        self.cwd = cwd

    def execute(self, *args, **environ):
        """
        Runs command on the system with given ``args`` using simple space
        join without safe quoting.
        """
        command = ' '.join(args)
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
        if DEBUG:
            print('*** CMD %s ***' % command)
        testenv = dict(os.environ)
        testenv['LANG'] = 'en_US.UTF-8'
        testenv['LANGUAGE'] = 'en_US:en'
        testenv['HGPLAIN'] = ''
        testenv['HGRCPATH'] = ''
        testenv.update(environ)
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
        stdout, stderr = p.communicate()
        if DEBUG:
            if stdout:
                print('stdout:', stdout)
            if stderr:
                print('stderr:', stderr)
        if not ignoreReturnCode:
            assert p.returncode == 0
        return safe_str(stdout), safe_str(stderr)


def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
    return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)


def _add_files(vcs, dest_dir, files_no=3):
    """
    Generate some files, add it to dest_dir repo and push back
    vcs is git or hg and defines what VCS we want to make those files for

    :param vcs:
    :param dest_dir:
    """
    added_file = '%ssetup.py' % next(_RandomNameSequence())
    open(os.path.join(dest_dir, added_file), 'a').close()
    Command(dest_dir).execute(vcs, 'add', added_file)

    email = 'me@example.com'
    if os.name == 'nt':
        author_str = 'User <%s>' % email
    else:
        author_str = 'User ǝɯɐᴎ <%s>' % email
    for i in range(files_no):
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
        Command(dest_dir).execute(cmd)
        if vcs == 'hg':
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
                i, author_str, added_file
            )
        elif vcs == 'git':
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
                i, author_str, added_file
            )
        # git commit needs EMAIL on some machines
        Command(dest_dir).execute(cmd, EMAIL=email)

def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
    _add_files(vt.repo_type, dest_dir, files_no=files_no)
    # PUSH it back
    stdout = stderr = None
    if vt.repo_type == 'hg':
        stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
    elif vt.repo_type == 'git':
        stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)

    return stdout, stderr


def _check_outgoing(vcs, cwd, clone_url):
    if vcs == 'hg':
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
    elif vcs == 'git':
        Command(cwd).execute('git remote update')
        return Command(cwd).execute('git log origin/master..master')


def set_anonymous_access(enable=True):
    user = db.User.get_default_user()
    user.active = enable
    meta.Session().commit()
    if enable != db.User.get_default_user().active:
        raise Exception('Cannot set anonymous access')


#==============================================================================
# TESTS
#==============================================================================


def _check_proper_git_push(stdout, stderr):
    assert 'fatal' not in stderr
    assert 'rejected' not in stderr
    assert 'Pushing to' in stderr
    assert 'master -> master' in stderr


@pytest.mark.usefixtures("test_context_fixture")
class TestVCSOperations(base.TestController):

    @classmethod
    def setup_class(cls):
        # DISABLE ANONYMOUS ACCESS
        set_anonymous_access(False)

    @pytest.fixture()
    def testhook_cleanup(self):
        yield
        # remove hook
        for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
            entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook)
            if entry:
                meta.Session().delete(entry)
        meta.Session().commit()

    @pytest.fixture(scope="module")
    def testfork(self):
        # create fork so the repo stays untouched
        git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence()))
        fixture.create_fork(base.GIT_REPO, git_fork_name)
        hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence()))
        fixture.create_fork(base.HG_REPO, hg_fork_name)
        return {'git': git_fork_name, 'hg': hg_fork_name}

    @parametrize_vcs_test
    def test_clone_repo_by_admin(self, webserver, vt):
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())

        if vt.repo_type == 'git':
            assert 'Cloning into' in stdout + stderr
            assert stderr == '' or stdout == ''
        elif vt.repo_type == 'hg':
            assert 'requesting all changes' in stdout
            assert 'adding changesets' in stdout
            assert 'adding manifests' in stdout
            assert 'adding file changes' in stdout
            assert stderr == ''

    @parametrize_vcs_test_http
    def test_clone_wrong_credentials(self, webserver, vt):
        clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!')
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        if vt.repo_type == 'git':
            assert 'fatal: Authentication failed' in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: authorization failed' in stderr

    def test_clone_git_dir_as_hg(self, webserver):
        clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr

    def test_clone_hg_repo_as_git(self, webserver):
        clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'not found' in stderr

    @parametrize_vcs_test
    def test_clone_non_existing_path(self, webserver, vt):
        clone_url = vt.repo_url_param(webserver, 'trololo')
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        if vt.repo_type == 'git':
            assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr
        elif vt.repo_type == 'hg':
            assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr

    @parametrize_vcs_test
    def test_push_new_repo(self, webserver, vt):
        # Clear the log so we know what is added
        db.UserLog.query().delete()
        meta.Session().commit()

        # Create an empty server repo using the API
        repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence()))
        usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
        params = {
            "id": 7,
            "api_key": usr.api_key,
            "method": 'create_repo',
            "args": dict(repo_name=repo_name,
                         owner=base.TEST_USER_ADMIN_LOGIN,
                         repo_type=vt.repo_type),
        }
        req = urllib.request.Request(
            'http://%s:%s/_admin/api' % webserver.server_address,
            data=ascii_bytes(json.dumps(params)),
            headers={'content-type': 'application/json'})
        response = urllib.request.urlopen(req)
        result = json.loads(response.read())
        # Expect something like:
        # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
        assert result['result']['success']

        # Create local clone of the empty server repo
        local_clone_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir)

        # Make 3 commits and push to the empty server repo.
        # The server repo doesn't have any other heads than the
        # refs/heads/master we are pushing, but the `git log` in the push hook
        # should still list the 3 commits.
        stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url)
        if vt.repo_type == 'git':
            _check_proper_git_push(stdout, stderr)
        elif vt.repo_type == 'hg':
            assert 'pushing to ' in stdout
            assert 'remote: added ' in stdout

        # Verify that we got the right events in UserLog. Expect something like:
        # <UserLog('id:new_git_XXX:started_following_repo')>
        # <UserLog('id:new_git_XXX:user_created_repo')>
        # <UserLog('id:new_git_XXX:pull')>
        # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
        meta.Session.close()  # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
        action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([
            ('started_following_repo', 0),
            ('user_created_repo', 0),
            ('pull', 0),
            ('push', 3)]
            if vt.repo_type == 'git' else [
            ('started_following_repo', 0),
            ('user_created_repo', 0),
            # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones
            ('push', 3)])

    @parametrize_vcs_test
    def test_push_new_file(self, webserver, testfork, vt):
        db.UserLog.query().delete()
        meta.Session().commit()

        dest_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url)

        if vt.repo_type == 'git':
            _check_proper_git_push(stdout, stderr)
        elif vt.repo_type == 'hg':
            assert 'pushing to' in stdout
            assert 'Repository size' in stdout
            assert 'Last revision is now' in stdout

        meta.Session.close()  # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
        action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
            [('pull', 0), ('push', 3)]

    @parametrize_vcs_test
    def test_pull(self, webserver, testfork, vt):
        db.UserLog.query().delete()
        meta.Session().commit()

        dest_dir = _get_tmp_dir()
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir)

        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url)
        meta.Session.close()  # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)

        if vt.repo_type == 'git':
            assert 'FETCH_HEAD' in stderr
        elif vt.repo_type == 'hg':
            assert 'new changesets' in stdout

        action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
        assert action_parts == ['pull']

        # Test handling of URLs with extra '/' around repo_name
        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True)
        if issubclass(vt, HttpVcsTest):
            if vt.repo_type == 'git':
                # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs
                assert 'Already up to date.' in stdout
            else:
                assert vt.repo_type == 'hg'
                assert "abort: HTTP Error 404: Not Found" in stderr
        else:
            assert issubclass(vt, SshVcsTest)
            if vt.repo_type == 'git':
                assert "abort: Access to './%s' denied" % vt.repo_name in stderr
            else:
                assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr

        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True)
        if vt.repo_type == 'git':
            assert 'Already up to date.' in stdout
        else:
            assert vt.repo_type == 'hg'
            assert "no changes found" in stdout
        assert "denied" not in stderr
        assert "denied" not in stdout
        assert "404" not in stdout

    @parametrize_vcs_test
    def test_push_invalidates_cache(self, webserver, testfork, vt):
        pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]

        dest_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url)

        if vt.repo_type == 'git':
            _check_proper_git_push(stdout, stderr)

        meta.Session.close()  # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process
        post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]
        assert pre_cached_tip != post_cached_tip

    @parametrize_vcs_test_http
    def test_push_wrong_credentials(self, webserver, vt):
        dest_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name')
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir,
                                             clone_url=clone_url, ignoreReturnCode=True)

        if vt.repo_type == 'git':
            assert 'fatal: Authentication failed' in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: authorization failed' in stderr

    @parametrize_vcs_test
    def test_push_with_readonly_credentials(self, webserver, vt):
        db.UserLog.query().delete()
        meta.Session().commit()

        dest_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url)

        if vt.repo_type == 'git':
            assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout

        meta.Session.close()  # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
        action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
            [('pull', 0)]

    @parametrize_vcs_test
    def test_push_back_to_wrong_url(self, webserver, vt):
        dest_dir = _get_tmp_dir()
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(
            webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % (
                webserver.server_address[0], webserver.server_address[1]),
            ignoreReturnCode=True)

        if vt.repo_type == 'git':
            assert 'not found' in stderr
        elif vt.repo_type == 'hg':
            assert 'HTTP Error 404: Not Found' in stderr

    @parametrize_vcs_test
    def test_ip_restriction(self, webserver, vt):
        user_model = UserModel()
        try:
            # Add IP constraint that excludes the test context:
            user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
            meta.Session().commit()
            # IP permissions are cached, need to wait for the cache in the server process to expire
            time.sleep(1.5)
            clone_url = vt.repo_url_param(webserver, vt.repo_name)
            stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
            if vt.repo_type == 'git':
                # The message apparently changed in Git 1.8.3, so match it loosely.
                assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr
            elif vt.repo_type == 'hg':
                assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr
        finally:
            # release IP restrictions
            for ip in db.UserIpMap.query():
                db.UserIpMap.delete(ip.ip_id)
            meta.Session().commit()
            # IP permissions are cached, need to wait for the cache in the server process to expire
            time.sleep(1.5)

        clone_url = vt.repo_url_param(webserver, vt.repo_name)
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())

        if vt.repo_type == 'git':
            assert 'Cloning into' in stdout + stderr
            assert stderr == '' or stdout == ''
        elif vt.repo_type == 'hg':
            assert 'requesting all changes' in stdout
            assert 'adding changesets' in stdout
            assert 'adding manifests' in stdout
            assert 'adding file changes' in stdout

            assert stderr == ''

    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
    def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt):
        # set prechangegroup to failing hook (returns True)
        db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
        meta.Session().commit()
        # clone repo
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
        dest_dir = _get_tmp_dir()
        stdout, stderr = Command(base.TESTS_TMP_PATH) \
            .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True)
        if vt.repo_type == 'hg':
            assert 'preoutgoing.testhook hook failed' in stdout + stderr
        elif vt.repo_type == 'git':
            assert 'error: 406' in stderr

    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
    def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt):
        # set prechangegroup to failing hook (returns exit code 1)
        db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
        meta.Session().commit()
        # clone repo
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
        dest_dir = _get_tmp_dir()
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url,
                                             ignoreReturnCode=True)
        assert 'failing_test_hook failed' in stdout + stderr
        assert 'Traceback' not in stdout + stderr
        assert 'prechangegroup.testhook hook failed' in stdout + stderr
        # there are still outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout != ''

        # set prechangegroup hook to exception throwing method
        db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
        meta.Session().commit()
        # re-try to push
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
        if vt is HgHttpVcsTest:
            # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
            assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
        elif vt is HgSshVcsTest:
            assert 'remote: Exception: exception_test_hook threw an exception' in stdout
        else: assert False
        # there are still outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout != ''

        # set prechangegroup hook to method that returns False
        db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
        meta.Session().commit()
        # re-try to push
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
        assert 'passing_test_hook succeeded' in stdout + stderr
        assert 'Traceback' not in stdout + stderr
        assert 'prechangegroup.testhook hook failed' not in stdout + stderr
        # no more outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout == ''
        assert stderr == ''

    def test_add_submodule_git(self, webserver, testfork):
        dest_dir = _get_tmp_dir()
        clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)

        fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git'])

        # add submodule
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir)
        stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule')
        stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"', EMAIL=base.TEST_USER_ADMIN_EMAIL)
        stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master')

        # check for testsubmodule link in files page
        self.log_user()
        response = self.app.get(base.url(controller='files', action='index',
                                    repo_name=testfork['git'],
                                    revision='tip',
                                    f_path='/'))
        # check _repo_files_url that will be used to reload as AJAX
        response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git'])

        response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url)

        # check that following a submodule link actually works - and redirects
        response = self.app.get(base.url(controller='files', action='index',
                                    repo_name=testfork['git'],
                                    revision='tip',
                                    f_path='/testsubmodule'),
                                status=302)
        assert response.location == clone_url