# HG changeset patch # User Marcin Kuzminski # Date 1317330871 -10800 # Node ID fcc676c6bf3bbe4483ea10fdc37aa2af24c2755c # Parent 71738535ed78261bef4c77b9f737dff70c44af48 updated test_hg_operation script diff -r 71738535ed78 -r fcc676c6bf3b rhodecode/tests/test_hg_operations.py --- a/rhodecode/tests/test_hg_operations.py Thu Sep 29 23:57:44 2011 +0300 +++ b/rhodecode/tests/test_hg_operations.py Fri Sep 30 00:14:31 2011 +0300 @@ -11,8 +11,11 @@ """ import os +import time +import sys import shutil import logging + from os.path import join as jn from os.path import dirname as dn @@ -41,7 +44,8 @@ USER = 'test_admin' PASS = 'test12' HOST = '127.0.0.1:5000' -DEBUG = True +DEBUG = bool(int(sys.argv[1])) +print 'DEBUG:',DEBUG log = logging.getLogger(__name__) @@ -64,10 +68,24 @@ print stdout, stderr return stdout, stderr + +def test_wrapp(func): + + def __wrapp(*args,**kwargs): + print '###%s###' %func.__name__ + try: + res = func(*args,**kwargs) + except: + print '--%s failed--' % func.__name__ + return + print 'ok' + return res + return __wrapp + def get_session(): engine = engine_from_config(conf, 'sqlalchemy.db1.') init_model(engine) - sa = meta.Session() + sa = meta.Session return sa @@ -129,16 +147,23 @@ user.active = enable sa.add(user) sa.commit() + sa.remove() + + print 'anonymous access is now:',enable + def get_anonymous_access(): sa = get_session() - return sa.query(User).filter(User.username == 'default').one().active + obj1 = sa.query(User).filter(User.username == 'default').one() + sa.expire(obj1) + return obj1.active #============================================================================== # TESTS #============================================================================== -def test_clone(no_errors=False): +@test_wrapp +def test_clone_with_credentials(no_errors=False): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) try: @@ -148,7 +173,13 @@ except OSError: raise - + print 'checking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print 'enabled, disabling it ' + set_anonymous_access(enable=False) + time.sleep(1) + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, 'pass':PASS, @@ -163,8 +194,8 @@ assert """abort""" not in stderr , 'got error from clone' - -def test_clone_anonymous_ok(): +@test_wrapp +def test_clone_anonymous(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) try: @@ -180,7 +211,8 @@ if not anonymous_access: print 'not enabled, enabling it ' set_anonymous_access(enable=True) - + time.sleep(1) + clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, 'pass':PASS, @@ -189,8 +221,6 @@ 'dest':path} stdout, stderr = Command(cwd).execute('hg clone', clone_url) - print stdout, stderr - assert """adding file changes""" in stdout, 'no messages about cloning' assert """abort""" not in stderr , 'got error from clone' @@ -200,7 +230,7 @@ print 'disabling anonymous access' set_anonymous_access(enable=False) - +@test_wrapp def test_clone_wrong_credentials(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) @@ -211,7 +241,12 @@ except OSError: raise - + print 'checking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print 'enabled, disabling it ' + set_anonymous_access(enable=False) + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER + 'error', 'pass':PASS, @@ -221,12 +256,14 @@ stdout, stderr = Command(cwd).execute('hg clone', clone_url) - assert """abort: authorization failed""" in stderr , 'no error from wrong credentials' + if not """abort: authorization failed""" in stderr: + raise Exception('Failure') - +@test_wrapp def test_pull(): pass +@test_wrapp def test_push_modify_file(f_name='setup.py'): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) @@ -239,10 +276,11 @@ Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) +@test_wrapp def test_push_new_file(commits=15, with_clone=True): if with_clone: - test_clone(no_errors=True) + test_clone_with_credentials(no_errors=True) cwd = path = jn(TESTS_TMP_PATH, HG_REPO) added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) @@ -269,6 +307,7 @@ Command(cwd).execute('hg push --verbose --debug %s' % push_url) +@test_wrapp def test_push_wrong_credentials(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ @@ -288,6 +327,7 @@ Command(cwd).execute('hg push %s' % clone_url) +@test_wrapp def test_push_wrong_path(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) added_file = jn(path, 'somefile.py') @@ -318,20 +358,22 @@ 'dest':jn(TESTS_TMP_PATH, HG_REPO)} stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) - assert """abort: HTTP Error 403: Forbidden""" in stderr + if not """abort: HTTP Error 403: Forbidden""" in stderr: + raise Exception('Failure') if __name__ == '__main__': create_test_user(force=False) create_test_repo() - #test_push_modify_file() - #test_clone() - #test_clone_anonymous_ok() + +# test_push_modify_file() + test_clone_with_credentials() + test_clone_wrong_credentials() - #test_clone_wrong_credentials() - test_pull() test_push_new_file(commits=2, with_clone=True) - - #test_push_wrong_path() - #test_push_wrong_credentials() +# + test_push_wrong_path() + + test_clone_anonymous() + test_push_wrong_credentials() \ No newline at end of file