# HG changeset patch # User Marcin Kuzminski # Date 1345594467 -7200 # Node ID 6341084b7a2f3259e83cadd3af9e136cb3227c73 # Parent 5899fe08f063332c2756d60372bcaddd41fd5960 rewrote test_scm_operations, now run by nosetests diff -r 5899fe08f063 -r 6341084b7a2f rhodecode/tests/__init__.py --- a/rhodecode/tests/__init__.py Wed Aug 22 00:40:24 2012 +0200 +++ b/rhodecode/tests/__init__.py Wed Aug 22 02:14:27 2012 +0200 @@ -41,12 +41,13 @@ __all__ = [ 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', - 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', - 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL', - 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS', - 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE', - 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE', - 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS', + 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS', + 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS', + 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN', + 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', + 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', + 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', + 'GIT_REMOTE_REPO', 'SCM_TESTS', ] # Invoke websetup with the current config file diff -r 5899fe08f063 -r 6341084b7a2f rhodecode/tests/scripts/test_scm_operations.py --- a/rhodecode/tests/scripts/test_scm_operations.py Wed Aug 22 00:40:24 2012 +0200 +++ b/rhodecode/tests/scripts/test_scm_operations.py Wed Aug 22 02:14:27 2012 +0200 @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- """ - rhodecode.tests.test_hg_operations - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + rhodecode.tests.test_scm_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Test suite for making push/pull operations + Test suite for making push/pull operations. + Run using:: + + RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py :created_on: Dec 30, 2010 :author: marcink @@ -24,47 +27,19 @@ # along with this program. If not, see . import os -import time -import sys -import shutil -import logging - +import tempfile from os.path import join as jn from os.path import dirname as dn from tempfile import _RandomNameSequence from subprocess import Popen, PIPE -from paste.deploy import appconfig -from pylons import config -from sqlalchemy import engine_from_config - -from rhodecode.lib.utils import add_cache -from rhodecode.model import init_model -from rhodecode.model import meta +from rhodecode.tests import * from rhodecode.model.db import User, Repository, UserLog -from rhodecode.lib.auth import get_crypt_password - -from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO -from rhodecode.config.environment import load_environment - -rel_path = dn(dn(dn(os.path.abspath(__file__)))) +from rhodecode.model.meta import Session -conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path) -load_environment(conf.global_conf, conf.local_conf) - -add_cache(conf) - -USER = 'test_admin' -PASS = 'test12' -HOST = '127.0.0.1:5000' -DEBUG = False -print 'DEBUG:', DEBUG -log = logging.getLogger(__name__) - -engine = engine_from_config(conf, 'sqlalchemy.db1.') -init_model(engine) -sa = meta.Session() +DEBUG = True +HOST = '127.0.0.1:5000' # test host class Command(object): @@ -73,13 +48,13 @@ self.cwd = cwd def execute(self, cmd, *args): - """Runs command on the system with given ``args``. + """ + Runs command on the system with given ``args``. """ command = cmd + ' ' + ' '.join(args) - log.debug('Executing %s' % command) if DEBUG: - print command + print '*** CMD %s ***' % command p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) stdout, stderr = p.communicate() if DEBUG: @@ -87,324 +62,181 @@ return stdout, stderr -def test_wrapp(func): - - def __wrapp(*args, **kwargs): - print '>>>%s' % func.__name__ - try: - res = func(*args, **kwargs) - except Exception, e: - print ('###############\n-' - '--%s failed %s--\n' - '###############\n' % (func.__name__, e)) - sys.exit() - print '++OK++' - return res - return __wrapp +def _get_tmp_dir(): + return tempfile.mkdtemp(prefix='rc_integration_test') -def create_test_user(force=True): - print '\tcreating test user' - - user = User.get_by_username(USER) - - if force and user is not None: - print '\tremoving current user' - for repo in Repository.query().filter(Repository.user == user).all(): - sa.delete(repo) - sa.delete(user) - sa.commit() - - if user is None or force: - print '\tcreating new one' - new_usr = User() - new_usr.username = USER - new_usr.password = get_crypt_password(PASS) - new_usr.email = 'mail@mail.com' - new_usr.name = 'test' - new_usr.lastname = 'lasttestname' - new_usr.active = True - new_usr.admin = True - sa.add(new_usr) - sa.commit() - - print '\tdone' - - -def create_test_repo(force=True): - from rhodecode.model.repo import RepoModel - - user = User.get_by_username(USER) - if user is None: - raise Exception('user not found') - - repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() - - if repo is None: - print '\trepo not found creating' - - form_data = {'repo_name':HG_REPO, - 'repo_type':'hg', - 'private':False, - 'clone_uri':'' } - rm = RepoModel(sa) - rm.base_path = '/home/hg' - rm.create(form_data, user) +def _construct_url(repo, dest=None, **kwargs): + if dest is None: + #make temp clone + dest = _get_tmp_dir() + params = { + 'user': TEST_USER_ADMIN_LOGIN, + 'passwd': TEST_USER_ADMIN_PASS, + 'host': HOST, + 'cloned_repo': repo, + 'dest': dest + } + params.update(**kwargs) + if params['user'] and params['passwd']: + _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params + else: + _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params + return _url def set_anonymous_access(enable=True): - user = User.get_by_username('default') + user = User.get_by_username(User.DEFAULT_USER) user.active = enable - sa.add(user) - sa.commit() + Session().add(user) + Session().commit() print '\tanonymous access is now:', enable - if enable != User.get_by_username('default').active: + if enable != User.get_by_username(User.DEFAULT_USER).active: raise Exception('Cannot set anonymous access') -def get_anonymous_access(): - user = User.get_by_username('default') - return user.active +def setup_module(): + #DISABLE ANONYMOUS ACCESS + set_anonymous_access(False) + + +def test_clone_hg_repo_by_admin(): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' -#============================================================================== -# TESTS -#============================================================================== -@test_wrapp -def test_clone_with_credentials(no_errors=False): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) - - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - #print 'made dirs %s' % jn(path) - except OSError: - raise +def test_clone_git_repo_by_admin(): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) - print '\tchecking if anonymous access is enabled' - anonymous_access = get_anonymous_access() - if anonymous_access: - print '\tenabled, disabling it ' - set_anonymous_access(enable=False) - - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ - {'user':USER, - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO, - 'dest':path} - - stdout, stderr = Command(cwd).execute('hg clone', clone_url) - - if no_errors is False: - assert """adding file changes""" in stdout, 'no messages about cloning' - assert """abort""" not in stderr , 'got error from clone' + assert 'Cloning into' in stdout + assert stderr == '' -@test_wrapp -def test_clone_anonymous(): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) +def test_clone_wrong_credentials_hg(): + clone_url = _construct_url(HG_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: authorization failed' in stderr - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - #print 'made dirs %s' % jn(path) - except OSError: - raise + +def test_clone_wrong_credentials_git(): + clone_url = _construct_url(GIT_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'fatal: Authentication failed' in stderr - print '\tchecking if anonymous access is enabled' - anonymous_access = get_anonymous_access() - if not anonymous_access: - print '\tnot enabled, enabling it ' - set_anonymous_access(enable=True) +def test_clone_git_dir_as_hg(): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + - clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ - {'user':USER, - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO, - 'dest':path} +def test_clone_hg_repo_as_git(): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found: did you run git update-server-info on the server' in stderr - stdout, stderr = Command(cwd).execute('hg clone', clone_url) - assert """adding file changes""" in stdout, 'no messages about cloning' - assert """abort""" not in stderr , 'got error from clone' - - #disable if it was enabled - if not anonymous_access: - print '\tdisabling anonymous access' - set_anonymous_access(enable=False) +def test_clone_non_existing_path_hg(): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr -@test_wrapp -def test_clone_wrong_credentials(): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) - - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - #print 'made dirs %s' % jn(path) - except OSError: - raise - - print '\tchecking if anonymous access is enabled' - anonymous_access = get_anonymous_access() - if anonymous_access: - print '\tenabled, disabling it ' - set_anonymous_access(enable=False) - - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ - {'user':USER + 'error', - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO, - 'dest':path} - - stdout, stderr = Command(cwd).execute('hg clone', clone_url) - - if not """abort: authorization failed""" in stderr: - raise Exception('Failure') +def test_clone_non_existing_path_git(): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found: did you run git update-server-info on the server' in stderr -@test_wrapp -def test_pull(): - pass - - -@test_wrapp -def test_push_modify_file(f_name='setup.py'): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) - modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) - for i in xrange(5): - cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) - Command(cwd).execute(cmd) +def test_push_new_file_hg(): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) - Command(cwd).execute(cmd) - - Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) - - -@test_wrapp -def test_push_new_file(commits=15, with_clone=True): - - if with_clone: - test_clone_with_credentials(no_errors=True) - - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + # commit some stuff into this repo + cwd = path = jn(DEST) added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) - Command(cwd).execute('touch %s' % added_file) - Command(cwd).execute('hg add %s' % added_file) - for i in xrange(commits): + for i in xrange(3): cmd = """echo 'added_line%s' >> %s""" % (i, added_file) Command(cwd).execute(cmd) - cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, - 'Marcin Kuźminski ', - added_file) + cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % ( + i, + 'Marcin Kuźminski ', + added_file + ) Command(cwd).execute(cmd) + # PUSH it back + clone_url = _construct_url(HG_REPO, dest='') + stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) - push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ - {'user':USER, - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO, - 'dest':jn(TESTS_TMP_PATH, HG_REPO)} - - Command(cwd).execute('hg push --verbose --debug %s' % push_url) + assert 'pushing to' in stdout + assert 'Repository size' in stdout + assert 'Last revision is now' in stdout -@test_wrapp -def test_push_wrong_credentials(): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ - {'user':USER + 'xxx', - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO, - 'dest':jn(TESTS_TMP_PATH, HG_REPO)} - - modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') - for i in xrange(5): - cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) - Command(cwd).execute(cmd) - - cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) - Command(cwd).execute(cmd) +def test_push_new_file_git(): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) - Command(cwd).execute('hg push %s' % clone_url) - - -@test_wrapp -def test_push_wrong_path(): - cwd = path = jn(TESTS_TMP_PATH, HG_REPO) - added_file = jn(path, 'somefile.py') + # commit some stuff into this repo + cwd = path = jn(DEST) + added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) + Command(cwd).execute('touch %s' % added_file) + Command(cwd).execute('git add %s' % added_file) - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - print '\tmade dirs %s' % jn(path) - except OSError: - raise - - Command(cwd).execute("""echo '' > %s""" % added_file) - Command(cwd).execute("""hg init %s""" % path) - Command(cwd).execute("""hg add %s""" % added_file) - - for i in xrange(2): + for i in xrange(3): cmd = """echo 'added_line%s' >> %s""" % (i, added_file) Command(cwd).execute(cmd) - cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) + cmd = """git ci -m 'commited new %s' --author '%s' %s """ % ( + i, + 'Marcin Kuźminski ', + added_file + ) Command(cwd).execute(cmd) + # PUSH it back + clone_url = _construct_url(GIT_REPO, dest='') + stdout, stderr = Command(cwd).execute('git push --verbose', clone_url) - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ - {'user':USER, - 'pass':PASS, - 'host':HOST, - 'cloned_repo':HG_REPO + '_error', - 'dest':jn(TESTS_TMP_PATH, HG_REPO)} - - stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) - if not """abort: HTTP Error 403: Forbidden""" in stderr: - raise Exception('Failure') + #WTF git stderr ?! + assert 'master -> master' in stderr -@test_wrapp -def get_logs(): - return UserLog.query().all() +def test_push_modify_existing_file_hg(): + assert 0 -@test_wrapp -def test_logs(initial): - logs = UserLog.query().all() - operations = 4 - if len(initial) + operations != len(logs): - raise Exception("missing number of logs initial:%s vs current:%s" % \ - (len(initial), len(logs))) +def test_push_modify_existing_file_git(): + assert 0 + + +def test_push_wrong_credentials_hg(): + assert 0 -if __name__ == '__main__': - create_test_user(force=False) - create_test_repo() +def test_push_wrong_credentials_git(): + assert 0 + + +def test_push_back_to_wrong_url_hg(): + assert 0 - initial_logs = get_logs() - print 'initial activity logs: %s' % len(initial_logs) - s = time.time() - #test_push_modify_file() - test_clone_with_credentials() - test_clone_wrong_credentials() - test_push_new_file(commits=2, with_clone=True) - - test_clone_anonymous() - test_push_wrong_path() +def test_push_back_to_wrong_url_git(): + assert 0 - test_push_wrong_credentials() - test_logs(initial_logs) - print 'finished ok in %.3f' % (time.time() - s) +#TODO: write all locking tests