view rhodecode/tests/other/ @ 4116:ffd45b185016 rhodecode-2.2.5-gpl

Imported some of the GPLv3'd changes from RhodeCode v2.2.5. This imports changes between changesets 21af6c4eab3d and 6177597791c2 in RhodeCode's original repository, including only changes to Python files and HTML. RhodeCode clearly licensed its changes to these files under GPLv3 in their /LICENSE file, which states the following: The Python code and integrated HTML are licensed under the GPLv3 license. (See: or for an online copy of that LICENSE file) Conservancy reviewed these changes and confirmed that they can be licensed as a whole to the Kallithea project under GPLv3-only. While some of the contents committed herein are clearly licensed GPLv3-or-later, on the whole we must assume the are GPLv3-only, since the statement above from RhodeCode indicates that they intend GPLv3-only as their license, per GPLv3§14 and other relevant sections of GPLv3.
author Bradley M. Kuhn <>
date Wed, 02 Jul 2014 19:03:13 -0400
parents 5067d6e826a5
children 7e5f8c12a3fc
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.

Test suite for making push/pull operations.
Run using after doing paster serve test.ini::
 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/other/

You must have git > 1.8.1 for tests to work fine

:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH.
:license: GPLv3, see LICENSE for more details.


import tempfile
import time
from os.path import join as jn

from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE

from rhodecode.tests import *
from rhodecode.model.db import User, Repository, UserIpMap, CacheInvalidation
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel

DEBUG = True
HOST = ''  # test host

class Command(object):

    def __init__(self, cwd):
        self.cwd = cwd

    def execute(self, cmd, *args):
        Runs command on the system with given ``args``.

        command = cmd + ' ' + ' '.join(args)
        if DEBUG:
            print '*** CMD %s ***' % command
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
        stdout, stderr = p.communicate()
        if DEBUG:
            print stdout, stderr
        return stdout, stderr

def _get_tmp_dir():
    return tempfile.mkdtemp(prefix='rc_integration_test')

def _construct_url(repo, dest=None, **kwargs):
    if dest is None:
        #make temp clone
        dest = _get_tmp_dir()
    params = {
        'user': TEST_USER_ADMIN_LOGIN,
        'passwd': TEST_USER_ADMIN_PASS,
        'host': HOST,
        'cloned_repo': repo,
        'dest': dest
    if params['user'] and params['passwd']:
        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
    return _url

def _add_files_and_push(vcs, DEST, **kwargs):
    Generate some files, add it to DEST repo and push back
    vcs is git or hg and defines what VCS we want to make those files for

    :param vcs:
    :param DEST:
    # commit some stuff into this repo
    cwd = path = jn(DEST)
    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
    added_file = jn(path, '' % _RandomNameSequence().next())
    Command(cwd).execute('touch %s' % added_file)
    Command(cwd).execute('%s add %s' % (vcs, added_file))

    for i in xrange(kwargs.get('files_no', 3)):
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
        author_str = 'Marcin Kuźminski <>'
        if vcs == 'hg':
            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
                i, author_str, added_file
        elif vcs == 'git':
            cmd = """EMAIL="" git commit -m 'commited new %s' --author '%s' %s """ % (
                i, author_str, added_file

    # PUSH it back
    _REPO = None
    if vcs == 'hg':
        _REPO = HG_REPO
    elif vcs == 'git':
        _REPO = GIT_REPO

    kwargs['dest'] = ''
    clone_url = _construct_url(_REPO, **kwargs)
    if 'clone_url' in kwargs:
        clone_url = kwargs['clone_url']
    stdout = stderr = None
    if vcs == 'hg':
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
    elif vcs == 'git':
        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")

    return stdout, stderr

def set_anonymous_access(enable=True):
    user = User.get_by_username(User.DEFAULT_USER) = enable
    print '\tanonymous access is now:', enable
    if enable != User.get_by_username(User.DEFAULT_USER).active:
        raise Exception('Cannot set anonymous access')


def _check_proper_git_push(stdout, stderr):
    #WTF GIT stderr is output ?!
    assert 'fatal' not in stderr
    assert 'rejected' not in stderr
    assert 'Pushing to' in stderr
    assert 'master -> master' in stderr

class TestVCSOperations(BaseTestCase):

    def setup_class(cls):

    def setUp(self):
        r = Repository.get_by_repo_name(GIT_REPO)
        r.enable_locking = False

        r = Repository.get_by_repo_name(HG_REPO)
        r.enable_locking = False

    def test_clone_hg_repo_by_admin(self):
        clone_url = _construct_url(HG_REPO)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        assert 'requesting all changes' in stdout
        assert 'adding changesets' in stdout
        assert 'adding manifests' in stdout
        assert 'adding file changes' in stdout

        assert stderr == ''

    def test_clone_git_repo_by_admin(self):
        clone_url = _construct_url(GIT_REPO)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        assert 'Cloning into' in stdout
        assert stderr == ''

    def test_clone_wrong_credentials_hg(self):
        clone_url = _construct_url(HG_REPO, passwd='bad!')
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
        assert 'abort: authorization failed' in stderr

    def test_clone_wrong_credentials_git(self):
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
        assert 'fatal: Authentication failed' in stderr

    def test_clone_git_dir_as_hg(self):
        clone_url = _construct_url(GIT_REPO)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
        assert 'HTTP Error 404: Not Found' in stderr

    def test_clone_hg_repo_as_git(self):
        clone_url = _construct_url(HG_REPO)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
        assert 'not found' in stderr

    def test_clone_non_existing_path_hg(self):
        clone_url = _construct_url('trololo')
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
        assert 'HTTP Error 404: Not Found' in stderr

    def test_clone_non_existing_path_git(self):
        clone_url = _construct_url('trololo')
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
        assert 'not found' in stderr

    def test_push_new_file_hg(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        stdout, stderr = _add_files_and_push('hg', DEST)

        assert 'pushing to' in stdout
        assert 'Repository size' in stdout
        assert 'Last revision is now' in stdout

    def test_push_new_file_git(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        # commit some stuff into this repo
        stdout, stderr = _add_files_and_push('git', DEST)

        print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()]
        _check_proper_git_push(stdout, stderr)

    def test_push_invalidates_cache_hg(self):
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
        if not key:
            key = CacheInvalidation(HG_REPO, HG_REPO)

        key.cache_active = True

        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)

        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
        self.assertEqual(key.cache_active, False)

    def test_push_invalidates_cache_git(self):
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
        if not key:
            key = CacheInvalidation(GIT_REPO, GIT_REPO)

        key.cache_active = True

        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        # commit some stuff into this repo
        stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
        _check_proper_git_push(stdout, stderr)

        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
        print CacheInvalidation.get_all()
        self.assertEqual(key.cache_active, False)

    def test_push_wrong_credentials_hg(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',

        assert 'abort: authorization failed' in stderr

    def test_push_wrong_credentials_git(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        stdout, stderr = _add_files_and_push('git', DEST, user='bad',

        assert 'fatal: Authentication failed' in stderr

    def test_push_back_to_wrong_url_hg(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        stdout, stderr = _add_files_and_push('hg', DEST,

        assert 'HTTP Error 404: Not Found' in stderr

    def test_push_back_to_wrong_url_git(self):
        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        stdout, stderr = _add_files_and_push('git', DEST,

        assert 'not found' in stderr

    def test_clone_and_create_lock_hg(self):
        # enable locking
        r = Repository.get_by_repo_name(HG_REPO)
        r.enable_locking = True
        # clone
        clone_url = _construct_url(HG_REPO)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        #check if lock was made
        r = Repository.get_by_repo_name(HG_REPO)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

    def test_clone_and_create_lock_git(self):
        # enable locking
        r = Repository.get_by_repo_name(GIT_REPO)
        r.enable_locking = True
        # clone
        clone_url = _construct_url(GIT_REPO)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        #check if lock was made
        r = Repository.get_by_repo_name(GIT_REPO)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

    def test_clone_after_repo_was_locked_hg(self):
        #lock repo
        r = Repository.get_by_repo_name(HG_REPO)
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
        #pull fails since repo is locked
        clone_url = _construct_url(HG_REPO)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
        assert msg in stderr

    def test_clone_after_repo_was_locked_git(self):
        #lock repo
        r = Repository.get_by_repo_name(GIT_REPO)
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
        #pull fails since repo is locked
        clone_url = _construct_url(GIT_REPO)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
        msg = ("""The requested URL returned error: 423""")
        assert msg in stderr

    def test_push_on_locked_repo_by_other_user_hg(self):
        #clone some temp
        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        #lock repo
        r = Repository.get_by_repo_name(HG_REPO)
        # let this user actually push !
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)

        #push fails repo is locked by other user !
        stdout, stderr = _add_files_and_push('hg', DEST,
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
        assert msg in stderr

    def test_push_on_locked_repo_by_other_user_git(self):
        #clone some temp
        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        #lock repo
        r = Repository.get_by_repo_name(GIT_REPO)
        # let this user actually push !
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)

        #push fails repo is locked by other user !
        stdout, stderr = _add_files_and_push('git', DEST,
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
        assert err in stderr

        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
        #back 423 to it, it makes ANOTHER request and we fail there with 405 :/

        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
        #msg = "405 Method Not Allowed"
        #assert msg in stderr

    def test_push_unlocks_repository_hg(self):
        # enable locking
        r = Repository.get_by_repo_name(HG_REPO)
        r.enable_locking = True
        #clone some temp
        DEST = _get_tmp_dir()
        clone_url = _construct_url(HG_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        #check for lock repo after clone
        r = Repository.get_by_repo_name(HG_REPO)
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
        assert r.locked[0] == uid

        #push is ok and repo is now unlocked
        stdout, stderr = _add_files_and_push('hg', DEST)
        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
        #we need to cleanup the Session Here !
        r = Repository.get_by_repo_name(HG_REPO)
        assert r.locked == [None, None]

    #TODO: fix me ! somehow during tests hooks don't get called on GIT
    def test_push_unlocks_repository_git(self):
        # enable locking
        r = Repository.get_by_repo_name(GIT_REPO)
        r.enable_locking = True
        #clone some temp
        DEST = _get_tmp_dir()
        clone_url = _construct_url(GIT_REPO, dest=DEST)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        #check for lock repo after clone
        r = Repository.get_by_repo_name(GIT_REPO)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

        #push is ok and repo is now unlocked
        stdout, stderr = _add_files_and_push('git', DEST)
        _check_proper_git_push(stdout, stderr)

        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
        #we need to cleanup the Session Here !
        r = Repository.get_by_repo_name(GIT_REPO)
        assert r.locked == [None, None]

    def test_ip_restriction_hg(self):
        user_model = UserModel()
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '')
            clone_url = _construct_url(HG_REPO)
            stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
            assert 'abort: HTTP Error 403: Forbidden' in stderr
            #release IP restrictions
            for ip in UserIpMap.getAll():

        clone_url = _construct_url(HG_REPO)
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)

        assert 'requesting all changes' in stdout
        assert 'adding changesets' in stdout
        assert 'adding manifests' in stdout
        assert 'adding file changes' in stdout

        assert stderr == ''

    def test_ip_restriction_git(self):
        user_model = UserModel()
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '')
            clone_url = _construct_url(GIT_REPO)
            stdout, stderr = Command('/tmp').execute('git clone', clone_url)
            msg = ("""The requested URL returned error: 403""")
            assert msg in stderr
            #release IP restrictions
            for ip in UserIpMap.getAll():

        clone_url = _construct_url(GIT_REPO)
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)

        assert 'Cloning into' in stdout
        assert stderr == ''