changeset 2728:6341084b7a2f beta

rewrote test_scm_operations, now run by nosetests
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 22 Aug 2012 02:14:27 +0200
parents 5899fe08f063
children e9e7c40b4f1a
files rhodecode/tests/__init__.py rhodecode/tests/scripts/test_scm_operations.py
diffstat 2 files changed, 145 insertions(+), 312 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/tests/__init__.py	Wed Aug 22 00:40:24 2012 +0200
+++ b/rhodecode/tests/__init__.py	Wed Aug 22 02:14:27 2012 +0200
@@ -41,12 +41,13 @@
 __all__ = [
     'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
     'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
-    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
-    'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
-    'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
-    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
-    'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
-    'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
+    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
+    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
+    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
+    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
+    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
+    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
+    'GIT_REMOTE_REPO', 'SCM_TESTS',
 ]
 
 # Invoke websetup with the current config file
--- a/rhodecode/tests/scripts/test_scm_operations.py	Wed Aug 22 00:40:24 2012 +0200
+++ b/rhodecode/tests/scripts/test_scm_operations.py	Wed Aug 22 02:14:27 2012 +0200
@@ -1,9 +1,12 @@
 # -*- coding: utf-8 -*-
 """
-    rhodecode.tests.test_hg_operations
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+    rhodecode.tests.test_scm_operations
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-    Test suite for making push/pull operations
+    Test suite for making push/pull operations.
+    Run using::
+
+     RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
 
     :created_on: Dec 30, 2010
     :author: marcink
@@ -24,47 +27,19 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
-import time
-import sys
-import shutil
-import logging
-
+import tempfile
 from os.path import join as jn
 from os.path import dirname as dn
 
 from tempfile import _RandomNameSequence
 from subprocess import Popen, PIPE
 
-from paste.deploy import appconfig
-from pylons import config
-from sqlalchemy import engine_from_config
-
-from rhodecode.lib.utils import add_cache
-from rhodecode.model import init_model
-from rhodecode.model import meta
+from rhodecode.tests import *
 from rhodecode.model.db import User, Repository, UserLog
-from rhodecode.lib.auth import get_crypt_password
-
-from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
-from rhodecode.config.environment import load_environment
-
-rel_path = dn(dn(dn(os.path.abspath(__file__))))
+from rhodecode.model.meta import Session
 
-conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
-load_environment(conf.global_conf, conf.local_conf)
-
-add_cache(conf)
-
-USER = 'test_admin'
-PASS = 'test12'
-HOST = '127.0.0.1:5000'
-DEBUG = False
-print 'DEBUG:', DEBUG
-log = logging.getLogger(__name__)
-
-engine = engine_from_config(conf, 'sqlalchemy.db1.')
-init_model(engine)
-sa = meta.Session()
+DEBUG = True
+HOST = '127.0.0.1:5000'  # test host
 
 
 class Command(object):
@@ -73,13 +48,13 @@
         self.cwd = cwd
 
     def execute(self, cmd, *args):
-        """Runs command on the system with given ``args``.
+        """
+        Runs command on the system with given ``args``.
         """
 
         command = cmd + ' ' + ' '.join(args)
-        log.debug('Executing %s' % command)
         if DEBUG:
-            print command
+            print '*** CMD %s ***' % command
         p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
         stdout, stderr = p.communicate()
         if DEBUG:
@@ -87,324 +62,181 @@
         return stdout, stderr
 
 
-def test_wrapp(func):
-
-    def __wrapp(*args, **kwargs):
-        print '>>>%s' % func.__name__
-        try:
-            res = func(*args, **kwargs)
-        except Exception, e:
-            print ('###############\n-'
-                   '--%s failed %s--\n'
-                   '###############\n' % (func.__name__, e))
-            sys.exit()
-        print '++OK++'
-        return res
-    return __wrapp
+def _get_tmp_dir():
+    return tempfile.mkdtemp(prefix='rc_integration_test')
 
 
-def create_test_user(force=True):
-    print '\tcreating test user'
-
-    user = User.get_by_username(USER)
-
-    if force and user is not None:
-        print '\tremoving current user'
-        for repo in Repository.query().filter(Repository.user == user).all():
-            sa.delete(repo)
-        sa.delete(user)
-        sa.commit()
-
-    if user is None or force:
-        print '\tcreating new one'
-        new_usr = User()
-        new_usr.username = USER
-        new_usr.password = get_crypt_password(PASS)
-        new_usr.email = 'mail@mail.com'
-        new_usr.name = 'test'
-        new_usr.lastname = 'lasttestname'
-        new_usr.active = True
-        new_usr.admin = True
-        sa.add(new_usr)
-        sa.commit()
-
-    print '\tdone'
-
-
-def create_test_repo(force=True):
-    from rhodecode.model.repo import RepoModel
-
-    user = User.get_by_username(USER)
-    if user is None:
-        raise Exception('user not found')
-
-    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
-
-    if repo is None:
-        print '\trepo not found creating'
-
-        form_data = {'repo_name':HG_REPO,
-                     'repo_type':'hg',
-                     'private':False,
-                     'clone_uri':'' }
-        rm = RepoModel(sa)
-        rm.base_path = '/home/hg'
-        rm.create(form_data, user)
+def _construct_url(repo, dest=None, **kwargs):
+    if dest is None:
+        #make temp clone
+        dest = _get_tmp_dir()
+    params = {
+        'user': TEST_USER_ADMIN_LOGIN,
+        'passwd': TEST_USER_ADMIN_PASS,
+        'host': HOST,
+        'cloned_repo': repo,
+        'dest': dest
+    }
+    params.update(**kwargs)
+    if params['user'] and params['passwd']:
+        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
+    else:
+        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
+    return _url
 
 
 def set_anonymous_access(enable=True):
-    user = User.get_by_username('default')
+    user = User.get_by_username(User.DEFAULT_USER)
     user.active = enable
-    sa.add(user)
-    sa.commit()
+    Session().add(user)
+    Session().commit()
     print '\tanonymous access is now:', enable
-    if enable != User.get_by_username('default').active:
+    if enable != User.get_by_username(User.DEFAULT_USER).active:
         raise Exception('Cannot set anonymous access')
 
 
-def get_anonymous_access():
-    user = User.get_by_username('default')
-    return user.active
+def setup_module():
+    #DISABLE ANONYMOUS ACCESS
+    set_anonymous_access(False)
+
+
+def test_clone_hg_repo_by_admin():
+    clone_url = _construct_url(HG_REPO)
+    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+    assert 'requesting all changes' in stdout
+    assert 'adding changesets' in stdout
+    assert 'adding manifests' in stdout
+    assert 'adding file changes' in stdout
+
+    assert stderr == ''
 
 
-#==============================================================================
-# TESTS
-#==============================================================================
-@test_wrapp
-def test_clone_with_credentials(no_errors=False):
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
+def test_clone_git_repo_by_admin():
+    clone_url = _construct_url(GIT_REPO)
+    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if anonymous_access:
-        print '\tenabled, disabling it '
-        set_anonymous_access(enable=False)
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
-
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
-    if no_errors is False:
-        assert """adding file changes""" in stdout, 'no messages about cloning'
-        assert """abort""" not in stderr , 'got error from clone'
+    assert 'Cloning into' in stdout
+    assert stderr == ''
 
 
-@test_wrapp
-def test_clone_anonymous():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+def test_clone_wrong_credentials_hg():
+    clone_url = _construct_url(HG_REPO, passwd='bad!')
+    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+    assert 'abort: authorization failed' in stderr
 
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
+
+def test_clone_wrong_credentials_git():
+    clone_url = _construct_url(GIT_REPO, passwd='bad!')
+    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+    assert 'fatal: Authentication failed' in stderr
 
 
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if not anonymous_access:
-        print '\tnot enabled, enabling it '
-        set_anonymous_access(enable=True)
+def test_clone_git_dir_as_hg():
+    clone_url = _construct_url(GIT_REPO)
+    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+    assert 'HTTP Error 404: Not Found' in stderr
+
 
-    clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
+def test_clone_hg_repo_as_git():
+    clone_url = _construct_url(HG_REPO)
+    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+    assert 'not found: did you run git update-server-info on the server' in stderr
 
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
 
-    assert """adding file changes""" in stdout, 'no messages about cloning'
-    assert """abort""" not in stderr , 'got error from clone'
-
-    #disable if it was enabled
-    if not anonymous_access:
-        print '\tdisabling anonymous access'
-        set_anonymous_access(enable=False)
+def test_clone_non_existing_path_hg():
+    clone_url = _construct_url('trololo')
+    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+    assert 'HTTP Error 404: Not Found' in stderr
 
 
-@test_wrapp
-def test_clone_wrong_credentials():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        #print 'made dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    print '\tchecking if anonymous access is enabled'
-    anonymous_access = get_anonymous_access()
-    if anonymous_access:
-        print '\tenabled, disabling it '
-        set_anonymous_access(enable=False)
-
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
-                  {'user':USER + 'error',
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':path}
-
-    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
-    if not """abort: authorization failed"""  in stderr:
-        raise Exception('Failure')
+def test_clone_non_existing_path_git():
+    clone_url = _construct_url('trololo')
+    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+    assert 'not found: did you run git update-server-info on the server' in stderr
 
 
-@test_wrapp
-def test_pull():
-    pass
-
-
-@test_wrapp
-def test_push_modify_file(f_name='setup.py'):
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
-    for i in xrange(5):
-        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
-        Command(cwd).execute(cmd)
+def test_push_new_file_hg():
+    DEST = _get_tmp_dir()
+    clone_url = _construct_url(HG_REPO, dest=DEST)
+    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
-        cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-    Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
-
-
-@test_wrapp
-def test_push_new_file(commits=15, with_clone=True):
-
-    if with_clone:
-        test_clone_with_credentials(no_errors=True)
-
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+    # commit some stuff into this repo
+    cwd = path = jn(DEST)
     added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
-
     Command(cwd).execute('touch %s' % added_file)
-
     Command(cwd).execute('hg add %s' % added_file)
 
-    for i in xrange(commits):
+    for i in xrange(3):
         cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
         Command(cwd).execute(cmd)
 
-        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
-                                'Marcin Kuźminski <marcin@python-blog.com>',
-                                added_file)
+        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
+                i,
+                'Marcin Kuźminski <marcin@python-blog.com>',
+                added_file
+        )
         Command(cwd).execute(cmd)
+    # PUSH it back
+    clone_url = _construct_url(HG_REPO, dest='')
+    stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
-    push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    Command(cwd).execute('hg push --verbose --debug %s' % push_url)
+    assert 'pushing to' in stdout
+    assert 'Repository size' in stdout
+    assert 'Last revision is now' in stdout
 
 
-@test_wrapp
-def test_push_wrong_credentials():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER + 'xxx',
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO,
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
-    for i in xrange(5):
-        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
-        Command(cwd).execute(cmd)
-
-        cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
-        Command(cwd).execute(cmd)
+def test_push_new_file_git():
+    DEST = _get_tmp_dir()
+    clone_url = _construct_url(GIT_REPO, dest=DEST)
+    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
-    Command(cwd).execute('hg push %s' % clone_url)
-
-
-@test_wrapp
-def test_push_wrong_path():
-    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-    added_file = jn(path, 'somefile.py')
+    # commit some stuff into this repo
+    cwd = path = jn(DEST)
+    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+    Command(cwd).execute('touch %s' % added_file)
+    Command(cwd).execute('git add %s' % added_file)
 
-    try:
-        shutil.rmtree(path, ignore_errors=True)
-        os.makedirs(path)
-        print '\tmade dirs %s' % jn(path)
-    except OSError:
-        raise
-
-    Command(cwd).execute("""echo '' > %s""" % added_file)
-    Command(cwd).execute("""hg init %s""" % path)
-    Command(cwd).execute("""hg add %s""" % added_file)
-
-    for i in xrange(2):
+    for i in xrange(3):
         cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
         Command(cwd).execute(cmd)
 
-        cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
+        cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
+                i,
+                'Marcin Kuźminski <marcin@python-blog.com>',
+                added_file
+        )
         Command(cwd).execute(cmd)
+    # PUSH it back
+    clone_url = _construct_url(GIT_REPO, dest='')
+    stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
 
-    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
-                  {'user':USER,
-                   'pass':PASS,
-                   'host':HOST,
-                   'cloned_repo':HG_REPO + '_error',
-                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
-    stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
-    if not """abort: HTTP Error 403: Forbidden"""  in stderr:
-        raise Exception('Failure')
+    #WTF git stderr ?!
+    assert 'master -> master' in stderr
 
 
-@test_wrapp
-def get_logs():
-    return UserLog.query().all()
+def test_push_modify_existing_file_hg():
+    assert 0
 
 
-@test_wrapp
-def test_logs(initial):
-    logs = UserLog.query().all()
-    operations = 4
-    if len(initial) + operations != len(logs):
-        raise Exception("missing number of logs initial:%s vs current:%s" % \
-                            (len(initial), len(logs)))
+def test_push_modify_existing_file_git():
+    assert 0
+
+
+def test_push_wrong_credentials_hg():
+    assert 0
 
 
-if __name__ == '__main__':
-    create_test_user(force=False)
-    create_test_repo()
+def test_push_wrong_credentials_git():
+    assert 0
+
+
+def test_push_back_to_wrong_url_hg():
+    assert 0
 
-    initial_logs = get_logs()
-    print 'initial activity logs: %s' % len(initial_logs)
-    s = time.time()
-    #test_push_modify_file()
-    test_clone_with_credentials()
-    test_clone_wrong_credentials()
 
-    test_push_new_file(commits=2, with_clone=True)
-
-    test_clone_anonymous()
-    test_push_wrong_path()
+def test_push_back_to_wrong_url_git():
+    assert 0
 
-    test_push_wrong_credentials()
 
-    test_logs(initial_logs)
-    print 'finished ok in %.3f' % (time.time() - s)
+#TODO: write all locking tests