view kallithea/tests/other/test_vcs_operations.py @ 7676:902cbe668001

tests: run the same vcs tests both for hg and git using http ... and soon also ssh Parametrize vcs tests and remove duplicate code. Refactored by Mads Kiilerich.
author domruf <dominikruf@gmail.com>
date Tue, 13 Mar 2018 23:52:10 +0100
parents 99edd97366e3
children cffb5e5bf7d6
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Test suite for vcs push/pull operations.

The tests need Git > 1.8.1.

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.

"""

import os
import re
import tempfile
import time
import urllib2
import json
from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE

import pytest

from kallithea.tests.base import *
from kallithea.tests.fixture import Fixture
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation, Ui, UserLog
from kallithea.model.meta import Session
from kallithea.model.repo import RepoModel
from kallithea.model.user import UserModel

DEBUG = True
HOST = '127.0.0.1:4999'  # test host

fixture = Fixture()


# Parameterize different kinds of VCS testing - both the kind of VCS and the
# access method (HTTP/SSH)

class HttpVcsTest(object):
    @staticmethod
    def repo_url_param(webserver, repo_name, **kwargs):
        return webserver.repo_url(repo_name, **kwargs)

class HgVcsTest(object):
    repo_type = 'hg'
    repo_name = HG_REPO

class GitVcsTest(object):
    repo_type = 'git'
    repo_name = GIT_REPO

class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
    pass

class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
    pass

parametrize_vcs_test = parametrize('vt', [
    HgHttpVcsTest,
    GitHttpVcsTest,
])
parametrize_vcs_test_hg = parametrize('vt', [
    HgHttpVcsTest,
])
parametrize_vcs_test_http = parametrize('vt', [
    HgHttpVcsTest,
    GitHttpVcsTest,
])

class Command(object):

    def __init__(self, cwd):
        self.cwd = cwd

    def execute(self, *args, **environ):
        """
        Runs command on the system with given ``args`` using simple space
        join without safe quoting.
        """
        command = ' '.join(args)
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
        if DEBUG:
            print '*** CMD %s ***' % command
        testenv = dict(os.environ)
        testenv['LANG'] = 'en_US.UTF-8'
        testenv['LANGUAGE'] = 'en_US:en'
        testenv['HGPLAIN'] = ''
        testenv['HGRCPATH'] = ''
        testenv.update(environ)
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
        stdout, stderr = p.communicate()
        if DEBUG:
            if stdout:
                print 'stdout:', stdout
            if stderr:
                print 'stderr:', stderr
        if not ignoreReturnCode:
            assert p.returncode == 0
        return stdout, stderr


def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)


def _add_files(vcs, dest_dir, files_no=3):
    """
    Generate some files, add it to dest_dir repo and push back
    vcs is git or hg and defines what VCS we want to make those files for

    :param vcs:
    :param dest_dir:
    """
    added_file = '%ssetup.py' % _RandomNameSequence().next()
    open(os.path.join(dest_dir, added_file), 'a').close()
    Command(dest_dir).execute(vcs, 'add', added_file)

    email = 'me@example.com'
    if os.name == 'nt':
        author_str = 'User <%s>' % email
    else:
        author_str = 'User ǝɯɐᴎ <%s>' % email
    for i in xrange(files_no):
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
        Command(dest_dir).execute(cmd)
        if vcs == 'hg':
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
                i, author_str, added_file
            )
        elif vcs == 'git':
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
                i, author_str, added_file
            )
        # git commit needs EMAIL on some machines
        Command(dest_dir).execute(cmd, EMAIL=email)

def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
                            clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
    _add_files(vcs, dest_dir, files_no=files_no)
    # PUSH it back
    _REPO = None
    if vcs == 'hg':
        _REPO = HG_REPO
    elif vcs == 'git':
        _REPO = GIT_REPO

    if clone_url is None:
        clone_url = webserver.repo_url(_REPO, username=username, password=password)

    stdout = stderr = None
    if vcs == 'hg':
        stdout, stderr = Command(dest_dir).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
    elif vcs == 'git':
        stdout, stderr = Command(dest_dir).execute('git push --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)

    return stdout, stderr


def _check_outgoing(vcs, cwd, clone_url=''):
    if vcs == 'hg':
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
    elif vcs == 'git':
        Command(cwd).execute('git remote update')
        return Command(cwd).execute('git log origin/master..master')


def set_anonymous_access(enable=True):
    user = User.get_default_user()
    user.active = enable
    Session().commit()
    if enable != User.get_default_user().active:
        raise Exception('Cannot set anonymous access')


#==============================================================================
# TESTS
#==============================================================================


def _check_proper_git_push(stdout, stderr):
    # WTF Git stderr is output ?!
    assert 'fatal' not in stderr
    assert 'rejected' not in stderr
    assert 'Pushing to' in stderr
    assert 'master -> master' in stderr


@pytest.mark.usefixtures("test_context_fixture")
class TestVCSOperations(TestController):

    @classmethod
    def setup_class(cls):
        # DISABLE ANONYMOUS ACCESS
        set_anonymous_access(False)

    @pytest.fixture()
    def testhook_cleanup(self):
        yield
        # remove hook
        for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
            entry = Ui.get_by_key('hooks', '%s.testhook' % hook)
            if entry:
                Session().delete(entry)
        Session().commit()

    @pytest.fixture(scope="module")
    def testfork(self):
        # create fork so the repo stays untouched
        git_fork_name = u'%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
        fixture.create_fork(GIT_REPO, git_fork_name)
        hg_fork_name = u'%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
        fixture.create_fork(HG_REPO, hg_fork_name)
        return {'git': git_fork_name, 'hg': hg_fork_name}

    @parametrize_vcs_test
    def test_clone_repo_by_admin(self, webserver, vt):
        clone_url = webserver.repo_url(vt.repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())

        if vt.repo_type == 'git':
            assert 'Cloning into' in stdout + stderr
            assert stderr == '' or stdout == ''
        elif vt.repo_type == 'hg':
            assert 'requesting all changes' in stdout
            assert 'adding changesets' in stdout
            assert 'adding manifests' in stdout
            assert 'adding file changes' in stdout
            assert stderr == ''

    @parametrize_vcs_test_http
    def test_clone_wrong_credentials(self, webserver, vt):
        clone_url = webserver.repo_url(vt.repo_name, password='bad!')
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        if vt.repo_type == 'git':
            assert 'fatal: Authentication failed' in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: authorization failed' in stderr

    def test_clone_git_dir_as_hg(self, webserver):
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'HTTP Error 404: Not Found' in stderr

    def test_clone_hg_repo_as_git(self, webserver):
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'not found' in stderr

    @parametrize_vcs_test
    def test_clone_non_existing_path(self, webserver, vt):
        clone_url = webserver.repo_url('trololo')
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        if vt.repo_type == 'git':
            assert 'not found' in stderr
        elif vt.repo_type == 'hg':
            assert 'HTTP Error 404: Not Found' in stderr

    # TODO: use @parametrize_vcs_test and run on hg
    def test_push_new_repo_git(self, webserver):
        # Clear the log so we know what is added
        UserLog.query().delete()
        Session().commit()

        # Create an empty server repo using the API
        repo_name = u'new_git_%s' % _RandomNameSequence().next()
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
        params = {
            "id": 7,
            "api_key": usr.api_key,
            "method": 'create_repo',
            "args": dict(repo_name=repo_name,
                         owner=TEST_USER_ADMIN_LOGIN,
                         repo_type='git'),
        }
        req = urllib2.Request(
            'http://%s:%s/_admin/api' % webserver.server_address,
            data=json.dumps(params),
            headers={'content-type': 'application/json'})
        response = urllib2.urlopen(req)
        result = json.loads(response.read())
        # Expect something like:
        # {u'result': {u'msg': u'Created new repository `new_git_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
        assert result[u'result'][u'success']

        # Create local clone of the empty server repo
        local_clone_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, local_clone_dir, ignoreReturnCode=True)

        # Make 3 commits and push to the empty server repo.
        # The server repo doesn't have any other heads than the
        # refs/heads/master we are pushing, but the `git log` in the push hook
        # should still list the 3 commits.
        stdout, stderr = _add_files_and_push(webserver, 'git', local_clone_dir, clone_url=clone_url)
        _check_proper_git_push(stdout, stderr)

        # Verify that we got the right events in UserLog. Expect something like:
        # <UserLog('id:new_git_XXX:started_following_repo')>
        # <UserLog('id:new_git_XXX:user_created_repo')>
        # <UserLog('id:new_git_XXX:pull')>
        # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
        uls = list(UserLog.query().order_by(UserLog.user_log_id))
        assert len(uls) == 4
        assert uls[0].action == 'started_following_repo'
        assert uls[1].action == 'user_created_repo'
        assert uls[2].action == 'pull'
        assert uls[3].action.startswith(u'push:')
        assert uls[3].action.count(',') == 2 # expect 3 commits

    @parametrize_vcs_test
    def test_push_new_file(self, webserver, testfork, vt):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(vt.repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        clone_url = webserver.repo_url(testfork[vt.repo_type])
        stdout, stderr = _add_files_and_push(webserver, vt.repo_type, dest_dir, clone_url=clone_url)

        if vt.repo_type == 'git':
            print [(x.repo_full_path, x.repo_path) for x in Repository.query()]  # TODO: what is this for
            _check_proper_git_push(stdout, stderr)
        elif vt.repo_type == 'hg':
            assert 'pushing to' in stdout
            assert 'Repository size' in stdout
            assert 'Last revision is now' in stdout

    @parametrize_vcs_test
    def test_push_invalidates_cache(self, webserver, testfork, vt):
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == vt.repo_name).scalar()
        if not key:
            key = CacheInvalidation(vt.repo_name, vt.repo_name)
            Session().add(key)

        key.cache_active = True
        Session().commit()

        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(testfork[vt.repo_type])
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt.repo_type, dest_dir, files_no=1, clone_url=clone_url)

        if vt.repo_type == 'git':
            _check_proper_git_push(stdout, stderr)
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == testfork[vt.repo_type]).all()
        assert key == []

    @parametrize_vcs_test_http
    def test_push_wrong_credentials(self, webserver, vt):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(vt.repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt.repo_type, dest_dir, username='bad',
                                             password='name', ignoreReturnCode=True)

        if vt.repo_type == 'git':
            assert 'fatal: Authentication failed' in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: authorization failed' in stderr

    @parametrize_vcs_test
    def test_push_with_readonly_credentials(self, webserver, vt):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(vt.repo_name, username=TEST_USER_REGULAR_LOGIN, password=TEST_USER_REGULAR_PASS)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt.repo_type, dest_dir, username=TEST_USER_REGULAR_LOGIN,
                                             password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True)

        if vt.repo_type == 'git':
            assert 'The requested URL returned error: 403' in stderr
        elif vt.repo_type == 'hg':
            assert 'abort: HTTP Error 403: Forbidden' in stderr

    @parametrize_vcs_test
    def test_push_back_to_wrong_url(self, webserver, vt):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(vt.repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(
            webserver, vt.repo_type, dest_dir, clone_url='http://%s:%s/tmp' % (
                webserver.server_address[0], webserver.server_address[1]),
            ignoreReturnCode=True)

        if vt.repo_type == 'git':
            assert 'not found' in stderr
        elif vt.repo_type == 'hg':
            assert 'HTTP Error 404: Not Found' in stderr

    @parametrize_vcs_test
    def test_ip_restriction(self, webserver, vt):
        user_model = UserModel()
        try:
            # Add IP constraint that excludes the test context:
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
            Session().commit()
            # IP permissions are cached, need to wait for the cache in the server process to expire
            time.sleep(1.5)
            clone_url = webserver.repo_url(vt.repo_name)
            stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
            if vt.repo_type == 'git':
                # The message apparently changed in Git 1.8.3, so match it loosely.
                assert re.search(r'\b403\b', stderr)
            elif vt.repo_type == 'hg':
                assert 'abort: HTTP Error 403: Forbidden' in stderr
        finally:
            # release IP restrictions
            for ip in UserIpMap.query():
                UserIpMap.delete(ip.ip_id)
            Session().commit()
            # IP permissions are cached, need to wait for the cache in the server process to expire
            time.sleep(1.5)

        clone_url = webserver.repo_url(vt.repo_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())

        if vt.repo_type == 'git':
            assert 'Cloning into' in stdout + stderr
            assert stderr == '' or stdout == ''
        elif vt.repo_type == 'hg':
            assert 'requesting all changes' in stdout
            assert 'adding changesets' in stdout
            assert 'adding manifests' in stdout
            assert 'adding file changes' in stdout

            assert stderr == ''

    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
    def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt):
        # set prechangegroup to failing hook (returns True)
        Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
        Session().commit()
        # clone repo
        clone_url = webserver.repo_url(testfork[vt.repo_type], username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS)
        dest_dir = _get_tmp_dir()
        stdout, stderr = Command(TESTS_TMP_PATH) \
            .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True)
        if vt.repo_type == 'hg':
            assert 'preoutgoing.testhook hook failed' in stdout
        elif vt.repo_type == 'git':
            assert 'error: 406' in stderr

    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
    def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt):
        # set prechangegroup to failing hook (returns True)
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
        Session().commit()
        # clone repo
        clone_url = webserver.repo_url(testfork[vt.repo_type], username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS)
        dest_dir = _get_tmp_dir()
        stdout, stderr = Command(TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, vt.repo_type, dest_dir,
                                             username=TEST_USER_ADMIN_LOGIN,
                                             password=TEST_USER_ADMIN_PASS,
                                             ignoreReturnCode=True)
        assert 'failing_test_hook failed' in stdout + stderr
        assert 'Traceback' not in stdout + stderr
        assert 'prechangegroup.testhook hook failed' in stdout + stderr
        # there are still outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout != ''

        # set prechangegroup hook to exception throwing method
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
        Session().commit()
        # re-try to push
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
        if vt.repo_type == 'hg':
            # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
            assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
        elif vt.repo_type == 'git':
            assert 'exception_test_hook threw an exception' in stderr
        # there are still outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout != ''

        # set prechangegroup hook to method that returns False
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
        Session().commit()
        # re-try to push
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
        assert 'passing_test_hook succeeded' in stdout + stderr
        assert 'Traceback' not in stdout + stderr
        assert 'prechangegroup.testhook hook failed' not in stdout + stderr
        # no more outgoing changesets
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
        assert stdout == ''
        assert stderr == ''

    def test_add_submodule_git(self, webserver, testfork):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)

        fork_url = webserver.repo_url(testfork['git'])

        # add submodule
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir)
        stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule')
        stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"', EMAIL=TEST_USER_ADMIN_EMAIL)
        stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master')

        # check for testsubmodule link in files page
        self.log_user()
        response = self.app.get(url(controller='files', action='index',
                                    repo_name=testfork['git'],
                                    revision='tip',
                                    f_path='/'))
        response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url)

        # check that following a submodule link actually works - and redirects
        response = self.app.get(url(controller='files', action='index',
                                    repo_name=testfork['git'],
                                    revision='tip',
                                    f_path='/testsubmodule'),
                                status=302)
        assert response.location == clone_url