comparison rhodecode/tests/test_hg_operations.py @ 1529:0b268dd369ec beta

Fixed test_hg_operations test and added concurency test
author Marcin Kuzminski <marcin@python-works.com>
date Fri, 07 Oct 2011 23:17:45 +0200
parents fcc676c6bf3b
children 04027bdb876c
comparison
equal deleted inserted replaced
1528:1d7a621d396f 1529:0b268dd369ec
27 from sqlalchemy import engine_from_config 27 from sqlalchemy import engine_from_config
28 28
29 from rhodecode.lib.utils import add_cache 29 from rhodecode.lib.utils import add_cache
30 from rhodecode.model import init_model 30 from rhodecode.model import init_model
31 from rhodecode.model import meta 31 from rhodecode.model import meta
32 from rhodecode.model.db import User, Repository 32 from rhodecode.model.db import User, Repository, UserLog
33 from rhodecode.lib.auth import get_crypt_password 33 from rhodecode.lib.auth import get_crypt_password
34 34
35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO 35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
36 from rhodecode.config.environment import load_environment 36 from rhodecode.config.environment import load_environment
37 37
43 43
44 USER = 'test_admin' 44 USER = 'test_admin'
45 PASS = 'test12' 45 PASS = 'test12'
46 HOST = '127.0.0.1:5000' 46 HOST = '127.0.0.1:5000'
47 DEBUG = bool(int(sys.argv[1])) 47 DEBUG = bool(int(sys.argv[1]))
48 print 'DEBUG:',DEBUG 48 print 'DEBUG:', DEBUG
49 log = logging.getLogger(__name__) 49 log = logging.getLogger(__name__)
50 50
51 51
52 class Command(object): 52 class Command(object):
53 53
68 print stdout, stderr 68 print stdout, stderr
69 return stdout, stderr 69 return stdout, stderr
70 70
71 71
72 def test_wrapp(func): 72 def test_wrapp(func):
73 73
74 def __wrapp(*args,**kwargs): 74 def __wrapp(*args, **kwargs):
75 print '###%s###' %func.__name__ 75 print '###%s###' % func.__name__
76 try: 76 try:
77 res = func(*args,**kwargs) 77 res = func(*args, **kwargs)
78 except: 78 except Exception, e:
79 print '--%s failed--' % func.__name__ 79 print ('###############\n-'
80 return 80 '--%s failed %s--\n'
81 print 'ok' 81 '###############\n' % (func.__name__, e))
82 sys.exit()
83 print '++OK++'
82 return res 84 return res
83 return __wrapp 85 return __wrapp
84 86
85 def get_session(): 87 def get_session():
86 engine = engine_from_config(conf, 'sqlalchemy.db1.') 88 engine = engine_from_config(conf, 'sqlalchemy.db1.')
88 sa = meta.Session 90 sa = meta.Session
89 return sa 91 return sa
90 92
91 93
92 def create_test_user(force=True): 94 def create_test_user(force=True):
93 print 'creating test user' 95 print '\tcreating test user'
94 sa = get_session() 96 sa = get_session()
95 97
96 user = sa.query(User).filter(User.username == USER).scalar() 98 user = sa.query(User).filter(User.username == USER).scalar()
97 99
98 if force and user is not None: 100 if force and user is not None:
99 print 'removing current user' 101 print '\tremoving current user'
100 for repo in sa.query(Repository).filter(Repository.user == user).all(): 102 for repo in sa.query(Repository).filter(Repository.user == user).all():
101 sa.delete(repo) 103 sa.delete(repo)
102 sa.delete(user) 104 sa.delete(user)
103 sa.commit() 105 sa.commit()
104 106
105 if user is None or force: 107 if user is None or force:
106 print 'creating new one' 108 print '\tcreating new one'
107 new_usr = User() 109 new_usr = User()
108 new_usr.username = USER 110 new_usr.username = USER
109 new_usr.password = get_crypt_password(PASS) 111 new_usr.password = get_crypt_password(PASS)
110 new_usr.email = 'mail@mail.com' 112 new_usr.email = 'mail@mail.com'
111 new_usr.name = 'test' 113 new_usr.name = 'test'
113 new_usr.active = True 115 new_usr.active = True
114 new_usr.admin = True 116 new_usr.admin = True
115 sa.add(new_usr) 117 sa.add(new_usr)
116 sa.commit() 118 sa.commit()
117 119
118 print 'done' 120 print '\tdone'
119 121
120 122
121 def create_test_repo(force=True): 123 def create_test_repo(force=True):
122 from rhodecode.model.repo import RepoModel 124 from rhodecode.model.repo import RepoModel
123 sa = get_session() 125 sa = get_session()
128 130
129 131
130 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() 132 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
131 133
132 if repo is None: 134 if repo is None:
133 print 'repo not found creating' 135 print '\trepo not found creating'
134 136
135 form_data = {'repo_name':HG_REPO, 137 form_data = {'repo_name':HG_REPO,
136 'repo_type':'hg', 138 'repo_type':'hg',
137 'private':False, 139 'private':False,
138 'clone_uri':'' } 140 'clone_uri':'' }
142 144
143 145
144 def set_anonymous_access(enable=True): 146 def set_anonymous_access(enable=True):
145 sa = get_session() 147 sa = get_session()
146 user = sa.query(User).filter(User.username == 'default').one() 148 user = sa.query(User).filter(User.username == 'default').one()
149 sa.expire(user)
147 user.active = enable 150 user.active = enable
148 sa.add(user) 151 sa.add(user)
149 sa.commit() 152 sa.commit()
150 sa.remove() 153 sa.remove()
151 154 import time;time.sleep(3)
152 print 'anonymous access is now:',enable 155 print '\tanonymous access is now:', enable
153 156
154 157
155 def get_anonymous_access(): 158 def get_anonymous_access():
156 sa = get_session() 159 sa = get_session()
157 obj1 = sa.query(User).filter(User.username == 'default').one() 160 obj1 = sa.query(User).filter(User.username == 'default').one()
171 os.makedirs(path) 174 os.makedirs(path)
172 #print 'made dirs %s' % jn(path) 175 #print 'made dirs %s' % jn(path)
173 except OSError: 176 except OSError:
174 raise 177 raise
175 178
176 print 'checking if anonymous access is enabled' 179 print '\tchecking if anonymous access is enabled'
177 anonymous_access = get_anonymous_access() 180 anonymous_access = get_anonymous_access()
178 if anonymous_access: 181 if anonymous_access:
179 print 'enabled, disabling it ' 182 print '\tenabled, disabling it '
180 set_anonymous_access(enable=False) 183 set_anonymous_access(enable=False)
181 time.sleep(1) 184 time.sleep(1)
182 185
183 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ 186 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
184 {'user':USER, 187 {'user':USER,
185 'pass':PASS, 188 'pass':PASS,
186 'host':HOST, 189 'host':HOST,
187 'cloned_repo':HG_REPO, 190 'cloned_repo':HG_REPO,
204 #print 'made dirs %s' % jn(path) 207 #print 'made dirs %s' % jn(path)
205 except OSError: 208 except OSError:
206 raise 209 raise
207 210
208 211
209 print 'checking if anonymous access is enabled' 212 print '\tchecking if anonymous access is enabled'
210 anonymous_access = get_anonymous_access() 213 anonymous_access = get_anonymous_access()
211 if not anonymous_access: 214 if not anonymous_access:
212 print 'not enabled, enabling it ' 215 print '\tnot enabled, enabling it '
213 set_anonymous_access(enable=True) 216 set_anonymous_access(enable=True)
214 time.sleep(1) 217 time.sleep(1)
215 218
216 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ 219 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
217 {'user':USER, 220 {'user':USER,
218 'pass':PASS, 221 'pass':PASS,
219 'host':HOST, 222 'host':HOST,
220 'cloned_repo':HG_REPO, 223 'cloned_repo':HG_REPO,
225 assert """adding file changes""" in stdout, 'no messages about cloning' 228 assert """adding file changes""" in stdout, 'no messages about cloning'
226 assert """abort""" not in stderr , 'got error from clone' 229 assert """abort""" not in stderr , 'got error from clone'
227 230
228 #disable if it was enabled 231 #disable if it was enabled
229 if not anonymous_access: 232 if not anonymous_access:
230 print 'disabling anonymous access' 233 print '\tdisabling anonymous access'
231 set_anonymous_access(enable=False) 234 set_anonymous_access(enable=False)
232 235
233 @test_wrapp 236 @test_wrapp
234 def test_clone_wrong_credentials(): 237 def test_clone_wrong_credentials():
235 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) 238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
239 os.makedirs(path) 242 os.makedirs(path)
240 #print 'made dirs %s' % jn(path) 243 #print 'made dirs %s' % jn(path)
241 except OSError: 244 except OSError:
242 raise 245 raise
243 246
244 print 'checking if anonymous access is enabled' 247 print '\tchecking if anonymous access is enabled'
245 anonymous_access = get_anonymous_access() 248 anonymous_access = get_anonymous_access()
246 if anonymous_access: 249 if anonymous_access:
247 print 'enabled, disabling it ' 250 print '\tenabled, disabling it '
248 set_anonymous_access(enable=False) 251 set_anonymous_access(enable=False)
249 252
250 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ 253 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
251 {'user':USER + 'error', 254 {'user':USER + 'error',
252 'pass':PASS, 255 'pass':PASS,
253 'host':HOST, 256 'host':HOST,
254 'cloned_repo':HG_REPO, 257 'cloned_repo':HG_REPO,
255 'dest':path} 258 'dest':path}
256 259
257 stdout, stderr = Command(cwd).execute('hg clone', clone_url) 260 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
258 261
259 if not """abort: authorization failed""" in stderr: 262 if not """abort: authorization failed""" in stderr:
260 raise Exception('Failure') 263 raise Exception('Failure')
261 264
262 @test_wrapp 265 @test_wrapp
263 def test_pull(): 266 def test_pull():
264 pass 267 pass
265 268
333 added_file = jn(path, 'somefile.py') 336 added_file = jn(path, 'somefile.py')
334 337
335 try: 338 try:
336 shutil.rmtree(path, ignore_errors=True) 339 shutil.rmtree(path, ignore_errors=True)
337 os.makedirs(path) 340 os.makedirs(path)
338 print 'made dirs %s' % jn(path) 341 print '\tmade dirs %s' % jn(path)
339 except OSError: 342 except OSError:
340 raise 343 raise
341 344
342 Command(cwd).execute("""echo '' > %s""" % added_file) 345 Command(cwd).execute("""echo '' > %s""" % added_file)
343 Command(cwd).execute("""hg init %s""" % path) 346 Command(cwd).execute("""hg init %s""" % path)
359 362
360 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) 363 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
361 if not """abort: HTTP Error 403: Forbidden""" in stderr: 364 if not """abort: HTTP Error 403: Forbidden""" in stderr:
362 raise Exception('Failure') 365 raise Exception('Failure')
363 366
367 @test_wrapp
368 def get_logs():
369 sa = get_session()
370 return len(sa.query(UserLog).all())
371
372 @test_wrapp
373 def test_logs(initial):
374 sa = get_session()
375 logs = sa.query(UserLog).all()
376 operations = 7
377 if initial + operations != len(logs):
378 raise Exception("missing number of logs %s vs %s" % (initial, len(logs)))
379
364 380
365 if __name__ == '__main__': 381 if __name__ == '__main__':
366 create_test_user(force=False) 382 create_test_user(force=False)
367 create_test_repo() 383 create_test_repo()
368 384
385 initial_logs = get_logs()
386
369 # test_push_modify_file() 387 # test_push_modify_file()
370 test_clone_with_credentials() 388 test_clone_with_credentials()
371 test_clone_wrong_credentials() 389 test_clone_wrong_credentials()
372 390
373 391
374 test_push_new_file(commits=2, with_clone=True) 392 test_push_new_file(commits=2, with_clone=True)
375 # 393
394 test_clone_anonymous()
376 test_push_wrong_path() 395 test_push_wrong_path()
377 396
378 test_clone_anonymous() 397
379 test_push_wrong_credentials() 398 test_push_wrong_credentials()
399
400 test_logs(initial_logs)