Mercurial > kallithea
comparison rhodecode/tests/test_hg_operations.py @ 1529:0b268dd369ec beta
Fixed test_hg_operations test and added concurency test
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Fri, 07 Oct 2011 23:17:45 +0200 |
parents | fcc676c6bf3b |
children | 04027bdb876c |
comparison
equal
deleted
inserted
replaced
1528:1d7a621d396f | 1529:0b268dd369ec |
---|---|
27 from sqlalchemy import engine_from_config | 27 from sqlalchemy import engine_from_config |
28 | 28 |
29 from rhodecode.lib.utils import add_cache | 29 from rhodecode.lib.utils import add_cache |
30 from rhodecode.model import init_model | 30 from rhodecode.model import init_model |
31 from rhodecode.model import meta | 31 from rhodecode.model import meta |
32 from rhodecode.model.db import User, Repository | 32 from rhodecode.model.db import User, Repository, UserLog |
33 from rhodecode.lib.auth import get_crypt_password | 33 from rhodecode.lib.auth import get_crypt_password |
34 | 34 |
35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | 35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
36 from rhodecode.config.environment import load_environment | 36 from rhodecode.config.environment import load_environment |
37 | 37 |
43 | 43 |
44 USER = 'test_admin' | 44 USER = 'test_admin' |
45 PASS = 'test12' | 45 PASS = 'test12' |
46 HOST = '127.0.0.1:5000' | 46 HOST = '127.0.0.1:5000' |
47 DEBUG = bool(int(sys.argv[1])) | 47 DEBUG = bool(int(sys.argv[1])) |
48 print 'DEBUG:',DEBUG | 48 print 'DEBUG:', DEBUG |
49 log = logging.getLogger(__name__) | 49 log = logging.getLogger(__name__) |
50 | 50 |
51 | 51 |
52 class Command(object): | 52 class Command(object): |
53 | 53 |
68 print stdout, stderr | 68 print stdout, stderr |
69 return stdout, stderr | 69 return stdout, stderr |
70 | 70 |
71 | 71 |
72 def test_wrapp(func): | 72 def test_wrapp(func): |
73 | 73 |
74 def __wrapp(*args,**kwargs): | 74 def __wrapp(*args, **kwargs): |
75 print '###%s###' %func.__name__ | 75 print '###%s###' % func.__name__ |
76 try: | 76 try: |
77 res = func(*args,**kwargs) | 77 res = func(*args, **kwargs) |
78 except: | 78 except Exception, e: |
79 print '--%s failed--' % func.__name__ | 79 print ('###############\n-' |
80 return | 80 '--%s failed %s--\n' |
81 print 'ok' | 81 '###############\n' % (func.__name__, e)) |
82 sys.exit() | |
83 print '++OK++' | |
82 return res | 84 return res |
83 return __wrapp | 85 return __wrapp |
84 | 86 |
85 def get_session(): | 87 def get_session(): |
86 engine = engine_from_config(conf, 'sqlalchemy.db1.') | 88 engine = engine_from_config(conf, 'sqlalchemy.db1.') |
88 sa = meta.Session | 90 sa = meta.Session |
89 return sa | 91 return sa |
90 | 92 |
91 | 93 |
92 def create_test_user(force=True): | 94 def create_test_user(force=True): |
93 print 'creating test user' | 95 print '\tcreating test user' |
94 sa = get_session() | 96 sa = get_session() |
95 | 97 |
96 user = sa.query(User).filter(User.username == USER).scalar() | 98 user = sa.query(User).filter(User.username == USER).scalar() |
97 | 99 |
98 if force and user is not None: | 100 if force and user is not None: |
99 print 'removing current user' | 101 print '\tremoving current user' |
100 for repo in sa.query(Repository).filter(Repository.user == user).all(): | 102 for repo in sa.query(Repository).filter(Repository.user == user).all(): |
101 sa.delete(repo) | 103 sa.delete(repo) |
102 sa.delete(user) | 104 sa.delete(user) |
103 sa.commit() | 105 sa.commit() |
104 | 106 |
105 if user is None or force: | 107 if user is None or force: |
106 print 'creating new one' | 108 print '\tcreating new one' |
107 new_usr = User() | 109 new_usr = User() |
108 new_usr.username = USER | 110 new_usr.username = USER |
109 new_usr.password = get_crypt_password(PASS) | 111 new_usr.password = get_crypt_password(PASS) |
110 new_usr.email = 'mail@mail.com' | 112 new_usr.email = 'mail@mail.com' |
111 new_usr.name = 'test' | 113 new_usr.name = 'test' |
113 new_usr.active = True | 115 new_usr.active = True |
114 new_usr.admin = True | 116 new_usr.admin = True |
115 sa.add(new_usr) | 117 sa.add(new_usr) |
116 sa.commit() | 118 sa.commit() |
117 | 119 |
118 print 'done' | 120 print '\tdone' |
119 | 121 |
120 | 122 |
121 def create_test_repo(force=True): | 123 def create_test_repo(force=True): |
122 from rhodecode.model.repo import RepoModel | 124 from rhodecode.model.repo import RepoModel |
123 sa = get_session() | 125 sa = get_session() |
128 | 130 |
129 | 131 |
130 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | 132 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
131 | 133 |
132 if repo is None: | 134 if repo is None: |
133 print 'repo not found creating' | 135 print '\trepo not found creating' |
134 | 136 |
135 form_data = {'repo_name':HG_REPO, | 137 form_data = {'repo_name':HG_REPO, |
136 'repo_type':'hg', | 138 'repo_type':'hg', |
137 'private':False, | 139 'private':False, |
138 'clone_uri':'' } | 140 'clone_uri':'' } |
142 | 144 |
143 | 145 |
144 def set_anonymous_access(enable=True): | 146 def set_anonymous_access(enable=True): |
145 sa = get_session() | 147 sa = get_session() |
146 user = sa.query(User).filter(User.username == 'default').one() | 148 user = sa.query(User).filter(User.username == 'default').one() |
149 sa.expire(user) | |
147 user.active = enable | 150 user.active = enable |
148 sa.add(user) | 151 sa.add(user) |
149 sa.commit() | 152 sa.commit() |
150 sa.remove() | 153 sa.remove() |
151 | 154 import time;time.sleep(3) |
152 print 'anonymous access is now:',enable | 155 print '\tanonymous access is now:', enable |
153 | 156 |
154 | 157 |
155 def get_anonymous_access(): | 158 def get_anonymous_access(): |
156 sa = get_session() | 159 sa = get_session() |
157 obj1 = sa.query(User).filter(User.username == 'default').one() | 160 obj1 = sa.query(User).filter(User.username == 'default').one() |
171 os.makedirs(path) | 174 os.makedirs(path) |
172 #print 'made dirs %s' % jn(path) | 175 #print 'made dirs %s' % jn(path) |
173 except OSError: | 176 except OSError: |
174 raise | 177 raise |
175 | 178 |
176 print 'checking if anonymous access is enabled' | 179 print '\tchecking if anonymous access is enabled' |
177 anonymous_access = get_anonymous_access() | 180 anonymous_access = get_anonymous_access() |
178 if anonymous_access: | 181 if anonymous_access: |
179 print 'enabled, disabling it ' | 182 print '\tenabled, disabling it ' |
180 set_anonymous_access(enable=False) | 183 set_anonymous_access(enable=False) |
181 time.sleep(1) | 184 time.sleep(1) |
182 | 185 |
183 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | 186 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
184 {'user':USER, | 187 {'user':USER, |
185 'pass':PASS, | 188 'pass':PASS, |
186 'host':HOST, | 189 'host':HOST, |
187 'cloned_repo':HG_REPO, | 190 'cloned_repo':HG_REPO, |
204 #print 'made dirs %s' % jn(path) | 207 #print 'made dirs %s' % jn(path) |
205 except OSError: | 208 except OSError: |
206 raise | 209 raise |
207 | 210 |
208 | 211 |
209 print 'checking if anonymous access is enabled' | 212 print '\tchecking if anonymous access is enabled' |
210 anonymous_access = get_anonymous_access() | 213 anonymous_access = get_anonymous_access() |
211 if not anonymous_access: | 214 if not anonymous_access: |
212 print 'not enabled, enabling it ' | 215 print '\tnot enabled, enabling it ' |
213 set_anonymous_access(enable=True) | 216 set_anonymous_access(enable=True) |
214 time.sleep(1) | 217 time.sleep(1) |
215 | 218 |
216 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ | 219 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ |
217 {'user':USER, | 220 {'user':USER, |
218 'pass':PASS, | 221 'pass':PASS, |
219 'host':HOST, | 222 'host':HOST, |
220 'cloned_repo':HG_REPO, | 223 'cloned_repo':HG_REPO, |
225 assert """adding file changes""" in stdout, 'no messages about cloning' | 228 assert """adding file changes""" in stdout, 'no messages about cloning' |
226 assert """abort""" not in stderr , 'got error from clone' | 229 assert """abort""" not in stderr , 'got error from clone' |
227 | 230 |
228 #disable if it was enabled | 231 #disable if it was enabled |
229 if not anonymous_access: | 232 if not anonymous_access: |
230 print 'disabling anonymous access' | 233 print '\tdisabling anonymous access' |
231 set_anonymous_access(enable=False) | 234 set_anonymous_access(enable=False) |
232 | 235 |
233 @test_wrapp | 236 @test_wrapp |
234 def test_clone_wrong_credentials(): | 237 def test_clone_wrong_credentials(): |
235 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | 238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
239 os.makedirs(path) | 242 os.makedirs(path) |
240 #print 'made dirs %s' % jn(path) | 243 #print 'made dirs %s' % jn(path) |
241 except OSError: | 244 except OSError: |
242 raise | 245 raise |
243 | 246 |
244 print 'checking if anonymous access is enabled' | 247 print '\tchecking if anonymous access is enabled' |
245 anonymous_access = get_anonymous_access() | 248 anonymous_access = get_anonymous_access() |
246 if anonymous_access: | 249 if anonymous_access: |
247 print 'enabled, disabling it ' | 250 print '\tenabled, disabling it ' |
248 set_anonymous_access(enable=False) | 251 set_anonymous_access(enable=False) |
249 | 252 |
250 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | 253 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
251 {'user':USER + 'error', | 254 {'user':USER + 'error', |
252 'pass':PASS, | 255 'pass':PASS, |
253 'host':HOST, | 256 'host':HOST, |
254 'cloned_repo':HG_REPO, | 257 'cloned_repo':HG_REPO, |
255 'dest':path} | 258 'dest':path} |
256 | 259 |
257 stdout, stderr = Command(cwd).execute('hg clone', clone_url) | 260 stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
258 | 261 |
259 if not """abort: authorization failed""" in stderr: | 262 if not """abort: authorization failed""" in stderr: |
260 raise Exception('Failure') | 263 raise Exception('Failure') |
261 | 264 |
262 @test_wrapp | 265 @test_wrapp |
263 def test_pull(): | 266 def test_pull(): |
264 pass | 267 pass |
265 | 268 |
333 added_file = jn(path, 'somefile.py') | 336 added_file = jn(path, 'somefile.py') |
334 | 337 |
335 try: | 338 try: |
336 shutil.rmtree(path, ignore_errors=True) | 339 shutil.rmtree(path, ignore_errors=True) |
337 os.makedirs(path) | 340 os.makedirs(path) |
338 print 'made dirs %s' % jn(path) | 341 print '\tmade dirs %s' % jn(path) |
339 except OSError: | 342 except OSError: |
340 raise | 343 raise |
341 | 344 |
342 Command(cwd).execute("""echo '' > %s""" % added_file) | 345 Command(cwd).execute("""echo '' > %s""" % added_file) |
343 Command(cwd).execute("""hg init %s""" % path) | 346 Command(cwd).execute("""hg init %s""" % path) |
359 | 362 |
360 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) | 363 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) |
361 if not """abort: HTTP Error 403: Forbidden""" in stderr: | 364 if not """abort: HTTP Error 403: Forbidden""" in stderr: |
362 raise Exception('Failure') | 365 raise Exception('Failure') |
363 | 366 |
367 @test_wrapp | |
368 def get_logs(): | |
369 sa = get_session() | |
370 return len(sa.query(UserLog).all()) | |
371 | |
372 @test_wrapp | |
373 def test_logs(initial): | |
374 sa = get_session() | |
375 logs = sa.query(UserLog).all() | |
376 operations = 7 | |
377 if initial + operations != len(logs): | |
378 raise Exception("missing number of logs %s vs %s" % (initial, len(logs))) | |
379 | |
364 | 380 |
365 if __name__ == '__main__': | 381 if __name__ == '__main__': |
366 create_test_user(force=False) | 382 create_test_user(force=False) |
367 create_test_repo() | 383 create_test_repo() |
368 | 384 |
385 initial_logs = get_logs() | |
386 | |
369 # test_push_modify_file() | 387 # test_push_modify_file() |
370 test_clone_with_credentials() | 388 test_clone_with_credentials() |
371 test_clone_wrong_credentials() | 389 test_clone_wrong_credentials() |
372 | 390 |
373 | 391 |
374 test_push_new_file(commits=2, with_clone=True) | 392 test_push_new_file(commits=2, with_clone=True) |
375 # | 393 |
394 test_clone_anonymous() | |
376 test_push_wrong_path() | 395 test_push_wrong_path() |
377 | 396 |
378 test_clone_anonymous() | 397 |
379 test_push_wrong_credentials() | 398 test_push_wrong_credentials() |
399 | |
400 test_logs(initial_logs) |