Mercurial > kallithea
comparison rhodecode/tests/scripts/test_vcs_operations.py @ 2752:6d904a0cd48d beta
added new suite of tests for VCS operations
- locking tests
- push pull tests
- added new ENV option to bypass re-creation of test enviroment
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Tue, 28 Aug 2012 09:28:09 +0200 |
parents | rhodecode/tests/scripts/test_scm_operations.py@6341084b7a2f |
children | 15dc5b2ff672 |
comparison
equal
deleted
inserted
replaced
2751:e291f25ea87f | 2752:6d904a0cd48d |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """ | |
3 rhodecode.tests.test_scm_operations | |
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
5 | |
6 Test suite for making push/pull operations. | |
7 Run using:: | |
8 | |
9 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py | |
10 | |
11 :created_on: Dec 30, 2010 | |
12 :author: marcink | |
13 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | |
14 :license: GPLv3, see COPYING for more details. | |
15 """ | |
16 # This program is free software: you can redistribute it and/or modify | |
17 # it under the terms of the GNU General Public License as published by | |
18 # the Free Software Foundation, either version 3 of the License, or | |
19 # (at your option) any later version. | |
20 # | |
21 # This program is distributed in the hope that it will be useful, | |
22 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
23 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
24 # GNU General Public License for more details. | |
25 # | |
26 # You should have received a copy of the GNU General Public License | |
27 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
28 | |
29 import os | |
30 import tempfile | |
31 import unittest | |
32 from os.path import join as jn | |
33 from os.path import dirname as dn | |
34 | |
35 from tempfile import _RandomNameSequence | |
36 from subprocess import Popen, PIPE | |
37 | |
38 from rhodecode.tests import * | |
39 from rhodecode.model.db import User, Repository, UserLog | |
40 from rhodecode.model.meta import Session | |
41 from rhodecode.model.repo import RepoModel | |
42 | |
43 DEBUG = True | |
44 HOST = '127.0.0.1:5000' # test host | |
45 | |
46 | |
47 class Command(object): | |
48 | |
49 def __init__(self, cwd): | |
50 self.cwd = cwd | |
51 | |
52 def execute(self, cmd, *args): | |
53 """ | |
54 Runs command on the system with given ``args``. | |
55 """ | |
56 | |
57 command = cmd + ' ' + ' '.join(args) | |
58 if DEBUG: | |
59 print '*** CMD %s ***' % command | |
60 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | |
61 stdout, stderr = p.communicate() | |
62 if DEBUG: | |
63 print stdout, stderr | |
64 return stdout, stderr | |
65 | |
66 | |
67 def _get_tmp_dir(): | |
68 return tempfile.mkdtemp(prefix='rc_integration_test') | |
69 | |
70 | |
71 def _construct_url(repo, dest=None, **kwargs): | |
72 if dest is None: | |
73 #make temp clone | |
74 dest = _get_tmp_dir() | |
75 params = { | |
76 'user': TEST_USER_ADMIN_LOGIN, | |
77 'passwd': TEST_USER_ADMIN_PASS, | |
78 'host': HOST, | |
79 'cloned_repo': repo, | |
80 'dest': dest | |
81 } | |
82 params.update(**kwargs) | |
83 if params['user'] and params['passwd']: | |
84 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params | |
85 else: | |
86 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params | |
87 return _url | |
88 | |
89 | |
90 def _add_files_and_push(vcs, DEST, **kwargs): | |
91 """ | |
92 Generate some files, add it to DEST repo and push back | |
93 vcs is git or hg and defines what VCS we want to make those files for | |
94 | |
95 :param vcs: | |
96 :param DEST: | |
97 """ | |
98 # commit some stuff into this repo | |
99 cwd = path = jn(DEST) | |
100 #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) | |
101 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) | |
102 Command(cwd).execute('touch %s' % added_file) | |
103 Command(cwd).execute('%s add %s' % (vcs, added_file)) | |
104 | |
105 for i in xrange(3): | |
106 cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |
107 Command(cwd).execute(cmd) | |
108 if vcs == 'hg': | |
109 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( | |
110 i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file | |
111 ) | |
112 elif vcs == 'git': | |
113 cmd = """git ci -m 'commited new %s' --author '%s' %s """ % ( | |
114 i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file | |
115 ) | |
116 Command(cwd).execute(cmd) | |
117 # PUSH it back | |
118 if vcs == 'hg': | |
119 _REPO = HG_REPO | |
120 elif vcs == 'git': | |
121 _REPO = GIT_REPO | |
122 | |
123 kwargs['dest'] = '' | |
124 clone_url = _construct_url(_REPO, **kwargs) | |
125 if 'clone_url' in kwargs: | |
126 clone_url = kwargs['clone_url'] | |
127 if vcs == 'hg': | |
128 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) | |
129 elif vcs == 'git': | |
130 stdout, stderr = Command(cwd).execute('git push', clone_url + " master") | |
131 | |
132 return stdout, stderr | |
133 | |
134 | |
135 def set_anonymous_access(enable=True): | |
136 user = User.get_by_username(User.DEFAULT_USER) | |
137 user.active = enable | |
138 Session().add(user) | |
139 Session().commit() | |
140 print '\tanonymous access is now:', enable | |
141 if enable != User.get_by_username(User.DEFAULT_USER).active: | |
142 raise Exception('Cannot set anonymous access') | |
143 | |
144 | |
145 #============================================================================== | |
146 # TESTS | |
147 #============================================================================== | |
148 | |
149 class TestVCSOperations(unittest.TestCase): | |
150 | |
151 @classmethod | |
152 def setup_class(cls): | |
153 #DISABLE ANONYMOUS ACCESS | |
154 set_anonymous_access(False) | |
155 | |
156 def setUp(self): | |
157 r = Repository.get_by_repo_name(GIT_REPO) | |
158 Repository.unlock(r) | |
159 r.enable_locking = False | |
160 Session().add(r) | |
161 Session().commit() | |
162 | |
163 r = Repository.get_by_repo_name(HG_REPO) | |
164 Repository.unlock(r) | |
165 r.enable_locking = False | |
166 Session().add(r) | |
167 Session().commit() | |
168 | |
169 def test_clone_hg_repo_by_admin(self): | |
170 clone_url = _construct_url(HG_REPO) | |
171 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
172 | |
173 assert 'requesting all changes' in stdout | |
174 assert 'adding changesets' in stdout | |
175 assert 'adding manifests' in stdout | |
176 assert 'adding file changes' in stdout | |
177 | |
178 assert stderr == '' | |
179 | |
180 def test_clone_git_repo_by_admin(self): | |
181 clone_url = _construct_url(GIT_REPO) | |
182 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
183 | |
184 assert 'Cloning into' in stdout | |
185 assert stderr == '' | |
186 | |
187 def test_clone_wrong_credentials_hg(self): | |
188 clone_url = _construct_url(HG_REPO, passwd='bad!') | |
189 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
190 assert 'abort: authorization failed' in stderr | |
191 | |
192 def test_clone_wrong_credentials_git(self): | |
193 clone_url = _construct_url(GIT_REPO, passwd='bad!') | |
194 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
195 assert 'fatal: Authentication failed' in stderr | |
196 | |
197 def test_clone_git_dir_as_hg(self): | |
198 clone_url = _construct_url(GIT_REPO) | |
199 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
200 assert 'HTTP Error 404: Not Found' in stderr | |
201 | |
202 def test_clone_hg_repo_as_git(self): | |
203 clone_url = _construct_url(HG_REPO) | |
204 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
205 assert 'not found:' in stderr | |
206 | |
207 def test_clone_non_existing_path_hg(self): | |
208 clone_url = _construct_url('trololo') | |
209 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
210 assert 'HTTP Error 404: Not Found' in stderr | |
211 | |
212 def test_clone_non_existing_path_git(self): | |
213 clone_url = _construct_url('trololo') | |
214 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
215 assert 'not found:' in stderr | |
216 | |
217 def test_push_new_file_hg(self): | |
218 DEST = _get_tmp_dir() | |
219 clone_url = _construct_url(HG_REPO, dest=DEST) | |
220 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
221 | |
222 stdout, stderr = _add_files_and_push('hg', DEST) | |
223 | |
224 assert 'pushing to' in stdout | |
225 assert 'Repository size' in stdout | |
226 assert 'Last revision is now' in stdout | |
227 | |
228 def test_push_new_file_git(self): | |
229 DEST = _get_tmp_dir() | |
230 clone_url = _construct_url(GIT_REPO, dest=DEST) | |
231 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
232 | |
233 # commit some stuff into this repo | |
234 stdout, stderr = _add_files_and_push('git', DEST) | |
235 | |
236 #WTF git stderr ?! | |
237 assert 'master -> master' in stderr | |
238 | |
239 def test_push_wrong_credentials_hg(self): | |
240 DEST = _get_tmp_dir() | |
241 clone_url = _construct_url(HG_REPO, dest=DEST) | |
242 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
243 | |
244 stdout, stderr = _add_files_and_push('hg', DEST, user='bad', | |
245 passwd='name') | |
246 | |
247 assert 'abort: authorization failed' in stderr | |
248 | |
249 def test_push_wrong_credentials_git(self): | |
250 DEST = _get_tmp_dir() | |
251 clone_url = _construct_url(GIT_REPO, dest=DEST) | |
252 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
253 | |
254 stdout, stderr = _add_files_and_push('git', DEST, user='bad', | |
255 passwd='name') | |
256 | |
257 assert 'fatal: Authentication failed' in stderr | |
258 | |
259 def test_push_back_to_wrong_url_hg(self): | |
260 DEST = _get_tmp_dir() | |
261 clone_url = _construct_url(HG_REPO, dest=DEST) | |
262 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
263 | |
264 stdout, stderr = _add_files_and_push('hg', DEST, | |
265 clone_url='http://127.0.0.1:5000/tmp',) | |
266 | |
267 assert 'HTTP Error 404: Not Found' in stderr | |
268 | |
269 def test_push_back_to_wrong_url_git(self): | |
270 DEST = _get_tmp_dir() | |
271 clone_url = _construct_url(GIT_REPO, dest=DEST) | |
272 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
273 | |
274 stdout, stderr = _add_files_and_push('git', DEST, | |
275 clone_url='http://127.0.0.1:5000/tmp',) | |
276 | |
277 assert 'not found:' in stderr | |
278 | |
279 def test_clone_and_create_lock_hg(self): | |
280 # enable locking | |
281 r = Repository.get_by_repo_name(HG_REPO) | |
282 r.enable_locking = True | |
283 Session().add(r) | |
284 Session().commit() | |
285 # clone | |
286 clone_url = _construct_url(HG_REPO) | |
287 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
288 | |
289 #check if lock was made | |
290 r = Repository.get_by_repo_name(HG_REPO) | |
291 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | |
292 | |
293 def test_clone_and_create_lock_git(self): | |
294 # enable locking | |
295 r = Repository.get_by_repo_name(GIT_REPO) | |
296 r.enable_locking = True | |
297 Session().add(r) | |
298 Session().commit() | |
299 # clone | |
300 clone_url = _construct_url(GIT_REPO) | |
301 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
302 | |
303 #check if lock was made | |
304 r = Repository.get_by_repo_name(GIT_REPO) | |
305 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | |
306 | |
307 def test_clone_after_repo_was_locked_hg(self): | |
308 #lock repo | |
309 r = Repository.get_by_repo_name(HG_REPO) | |
310 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | |
311 #pull fails since repo is locked | |
312 clone_url = _construct_url(HG_REPO) | |
313 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
314 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | |
315 % (HG_REPO, TEST_USER_ADMIN_LOGIN)) | |
316 assert msg in stderr | |
317 | |
318 def test_clone_after_repo_was_locked_git(self): | |
319 #lock repo | |
320 r = Repository.get_by_repo_name(GIT_REPO) | |
321 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | |
322 #pull fails since repo is locked | |
323 clone_url = _construct_url(GIT_REPO) | |
324 stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
325 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | |
326 % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) | |
327 #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw | |
328 # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ | |
329 msg = "405 Method Not Allowed" | |
330 assert msg in stderr | |
331 | |
332 def test_push_on_locked_repo_by_other_user_hg(self): | |
333 #clone some temp | |
334 DEST = _get_tmp_dir() | |
335 clone_url = _construct_url(HG_REPO, dest=DEST) | |
336 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
337 | |
338 #lock repo | |
339 r = Repository.get_by_repo_name(HG_REPO) | |
340 # let this user actually push ! | |
341 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, | |
342 perm='repository.write') | |
343 Session().commit() | |
344 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | |
345 | |
346 #push fails repo is locked by other user ! | |
347 stdout, stderr = _add_files_and_push('hg', DEST, | |
348 user=TEST_USER_REGULAR_LOGIN, | |
349 passwd=TEST_USER_REGULAR_PASS) | |
350 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | |
351 % (HG_REPO, TEST_USER_ADMIN_LOGIN)) | |
352 assert msg in stderr | |
353 | |
354 #TODO: fix me ! somehow during tests hooks don't get called on GIT | |
355 # def test_push_on_locked_repo_by_other_user_git(self): | |
356 # #clone some temp | |
357 # DEST = _get_tmp_dir() | |
358 # clone_url = _construct_url(GIT_REPO, dest=DEST) | |
359 # stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
360 # | |
361 # #lock repo | |
362 # r = Repository.get_by_repo_name(GIT_REPO) | |
363 # # let this user actually push ! | |
364 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, | |
365 # perm='repository.write') | |
366 # Session().commit() | |
367 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | |
368 # | |
369 # #push fails repo is locked by other user ! | |
370 # stdout, stderr = _add_files_and_push('git', DEST, | |
371 # user=TEST_USER_REGULAR_LOGIN, | |
372 # passwd=TEST_USER_REGULAR_PASS) | |
373 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | |
374 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) | |
375 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw | |
376 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ | |
377 # msg = "405 Method Not Allowed" | |
378 # assert msg in stderr | |
379 | |
380 def test_push_unlocks_repository_hg(self): | |
381 # enable locking | |
382 r = Repository.get_by_repo_name(HG_REPO) | |
383 r.enable_locking = True | |
384 Session().add(r) | |
385 Session().commit() | |
386 #clone some temp | |
387 DEST = _get_tmp_dir() | |
388 clone_url = _construct_url(HG_REPO, dest=DEST) | |
389 stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | |
390 | |
391 #check for lock repo after clone | |
392 r = Repository.get_by_repo_name(HG_REPO) | |
393 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | |
394 | |
395 #push is ok and repo is now unlocked | |
396 stdout, stderr = _add_files_and_push('hg', DEST) | |
397 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout | |
398 #we need to cleanup the Session Here ! | |
399 Session.remove() | |
400 r = Repository.get_by_repo_name(HG_REPO) | |
401 assert r.locked == [None, None] | |
402 | |
403 #TODO: fix me ! somehow during tests hooks don't get called on GIT | |
404 # def test_push_unlocks_repository_git(self): | |
405 # # enable locking | |
406 # r = Repository.get_by_repo_name(GIT_REPO) | |
407 # r.enable_locking = True | |
408 # Session().add(r) | |
409 # Session().commit() | |
410 # #clone some temp | |
411 # DEST = _get_tmp_dir() | |
412 # clone_url = _construct_url(GIT_REPO, dest=DEST) | |
413 # stdout, stderr = Command('/tmp').execute('git clone', clone_url) | |
414 # | |
415 # #check for lock repo after clone | |
416 # r = Repository.get_by_repo_name(GIT_REPO) | |
417 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | |
418 # | |
419 # #push is ok and repo is now unlocked | |
420 # stdout, stderr = _add_files_and_push('git', DEST) | |
421 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout | |
422 # #we need to cleanup the Session Here ! | |
423 # Session.remove() | |
424 # r = Repository.get_by_repo_name(GIT_REPO) | |
425 # assert r.locked == [None, None] |