changeset 1596:089ef495e9dd beta

fixed test_hg_push operations
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 24 Oct 2011 23:28:04 +0200
parents 7cd8fd4d1e38
children 019026a8cf67
files rhodecode/tests/test_hg_operations.py
diffstat 1 files changed, 22 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/tests/test_hg_operations.py	Mon Oct 24 23:07:06 2011 +0200
+++ b/rhodecode/tests/test_hg_operations.py	Mon Oct 24 23:28:04 2011 +0200
@@ -48,7 +48,8 @@
 from rhodecode.config.environment import load_environment
 
 rel_path = dn(dn(dn(os.path.abspath(__file__))))
-conf = appconfig('config:development.ini', relative_to=rel_path)
+
+conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
 load_environment(conf.global_conf, conf.local_conf)
 
 add_cache(conf)
@@ -56,10 +57,13 @@
 USER = 'test_admin'
 PASS = 'test12'
 HOST = '127.0.0.1:5000'
-DEBUG = True if sys.argv[1:] else False
+DEBUG = False
 print 'DEBUG:', DEBUG
 log = logging.getLogger(__name__)
 
+engine = engine_from_config(conf, 'sqlalchemy.db1.')
+init_model(engine)
+sa = meta.Session
 
 class Command(object):
 
@@ -96,22 +100,15 @@
         return res
     return __wrapp
 
-def get_session():
-    engine = engine_from_config(conf, 'sqlalchemy.db1.')
-    init_model(engine)
-    sa = meta.Session
-    return sa
-
 
 def create_test_user(force=True):
     print '\tcreating test user'
-    sa = get_session()
 
-    user = sa.query(User).filter(User.username == USER).scalar()
+    user = User.get_by_username(USER)
 
     if force and user is not None:
         print '\tremoving current user'
-        for repo in sa.query(Repository).filter(Repository.user == user).all():
+        for repo in Repository.query().filter(Repository.user == user).all():
             sa.delete(repo)
         sa.delete(user)
         sa.commit()
@@ -134,9 +131,8 @@
 
 def create_test_repo(force=True):
     from rhodecode.model.repo import RepoModel
-    sa = get_session()
 
-    user = sa.query(User).filter(User.username == USER).scalar()
+    user = User.get_by_username(USER)
     if user is None:
         raise Exception('user not found')
 
@@ -156,22 +152,17 @@
 
 
 def set_anonymous_access(enable=True):
-    sa = get_session()
-    user = sa.query(User).filter(User.username == 'default').one()
-    sa.expire(user)
+    user = User.get_by_username('default')
     user.active = enable
     sa.add(user)
     sa.commit()
-    sa.remove()
-    import time;time.sleep(3)
     print '\tanonymous access is now:', enable
-
+    if enable != User.get_by_username('default').active:
+        raise Exception('Cannot set anonymous access')
 
 def get_anonymous_access():
-    sa = get_session()
-    obj1 = sa.query(User).filter(User.username == 'default').one()
-    sa.expire(obj1)
-    return obj1.active
+    user = User.get_by_username('default')
+    return user.active
 
 
 #==============================================================================
@@ -378,16 +369,15 @@
 
 @test_wrapp
 def get_logs():
-    sa = get_session()
-    return len(sa.query(UserLog).all())
+    return UserLog.query().all()
 
 @test_wrapp
 def test_logs(initial):
-    sa = get_session()
-    logs = sa.query(UserLog).all()
-    operations = 7
-    if initial + operations != len(logs):
-        raise Exception("missing number of logs %s vs %s" % (initial, len(logs)))
+    logs = UserLog.query().all()
+    operations = 4
+    if len(initial) + operations != len(logs):
+        raise Exception("missing number of logs initial:%s vs current:%s" % \
+                            (len(initial), len(logs)))
 
 
 if __name__ == '__main__':
@@ -395,18 +385,17 @@
     create_test_repo()
 
     initial_logs = get_logs()
+    print 'initial activity logs: %s' % len(initial_logs)
 
-#    test_push_modify_file()
+    #test_push_modify_file()
     test_clone_with_credentials()
     test_clone_wrong_credentials()
 
-
     test_push_new_file(commits=2, with_clone=True)
 
     test_clone_anonymous()
     test_push_wrong_path()
 
-
     test_push_wrong_credentials()
 
     test_logs(initial_logs)