Mercurial > kallithea
comparison rhodecode/tests/test_hg_operations.py @ 1008:a9421a8a874f beta
updated testing script for hg operations
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 09 Feb 2011 00:36:12 +0100 |
parents | 7f4943c90876 |
children | 15b60f83420c |
comparison
equal
deleted
inserted
replaced
1007:6faef09309e0 | 1008:a9421a8a874f |
---|---|
23 from sqlalchemy import engine_from_config | 23 from sqlalchemy import engine_from_config |
24 | 24 |
25 from rhodecode.lib.utils import add_cache | 25 from rhodecode.lib.utils import add_cache |
26 from rhodecode.model import init_model | 26 from rhodecode.model import init_model |
27 from rhodecode.model import meta | 27 from rhodecode.model import meta |
28 from rhodecode.model.db import User | 28 from rhodecode.model.db import User, Repository |
29 from rhodecode.lib.auth import get_crypt_password | 29 from rhodecode.lib.auth import get_crypt_password |
30 | 30 |
31 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | 31 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
32 from rhodecode.config.environment import load_environment | 32 from rhodecode.config.environment import load_environment |
33 | 33 |
68 sa = meta.Session() | 68 sa = meta.Session() |
69 return sa | 69 return sa |
70 | 70 |
71 | 71 |
72 def create_test_user(force=True): | 72 def create_test_user(force=True): |
73 print 'creating test user' | |
73 sa = get_session() | 74 sa = get_session() |
74 | 75 |
75 user = sa.query(User).filter(User.username == USER).scalar() | 76 user = sa.query(User).filter(User.username == USER).scalar() |
76 | 77 |
77 if force and user: | 78 if force and user is not None: |
79 print 'removing current user' | |
78 sa.delete(user) | 80 sa.delete(user) |
79 sa.commit() | 81 sa.commit() |
80 | 82 |
81 if user is None or force: | 83 if user is None or force: |
84 print 'creating new one' | |
82 new_usr = User() | 85 new_usr = User() |
83 new_usr.username = USER | 86 new_usr.username = USER |
84 new_usr.password = get_crypt_password(PASS) | 87 new_usr.password = get_crypt_password(PASS) |
85 new_usr.email = 'mail@mail.com' | 88 new_usr.email = 'mail@mail.com' |
86 new_usr.name = 'test' | 89 new_usr.name = 'test' |
87 new_usr.lastname = 'lasttestname' | 90 new_usr.lastname = 'lasttestname' |
88 new_usr.active = True | 91 new_usr.active = True |
89 | 92 new_usr.admin = True |
90 sa.add(new_usr) | 93 sa.add(new_usr) |
91 sa.commit() | 94 sa.commit() |
92 | 95 |
93 | 96 print 'done' |
94 | 97 |
98 | |
99 def create_test_repo(force=True): | |
100 from rhodecode.model.repo import RepoModel | |
101 sa = get_session() | |
102 | |
103 user = sa.query(User).filter(User.username == USER).scalar() | |
104 if user is None: | |
105 raise Exception('user not found') | |
106 | |
107 | |
108 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | |
109 | |
110 if repo is None: | |
111 print 'repo not found creating' | |
112 | |
113 form_data = {'repo_name':HG_REPO, | |
114 'repo_type':'hg', | |
115 'private':False, } | |
116 rm = RepoModel(sa) | |
117 rm.base_path = '/home/hg' | |
118 rm.create(form_data, user) | |
95 | 119 |
96 #============================================================================== | 120 #============================================================================== |
97 # TESTS | 121 # TESTS |
98 #============================================================================== | 122 #============================================================================== |
99 def test_clone(): | 123 def test_clone(no_errors=False): |
100 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | 124 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
101 | 125 |
102 try: | 126 try: |
103 shutil.rmtree(path, ignore_errors=True) | 127 shutil.rmtree(path, ignore_errors=True) |
104 os.makedirs(path) | 128 os.makedirs(path) |
114 'cloned_repo':HG_REPO, | 138 'cloned_repo':HG_REPO, |
115 'dest':path} | 139 'dest':path} |
116 | 140 |
117 stdout, stderr = Command(cwd).execute('hg clone', clone_url) | 141 stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
118 | 142 |
119 assert """adding file changes""" in stdout, 'no messages about cloning' | 143 if no_errors is False: |
120 assert """abort""" not in stderr , 'got error from clone' | 144 assert """adding file changes""" in stdout, 'no messages about cloning' |
145 assert """abort""" not in stderr , 'got error from clone' | |
121 | 146 |
122 | 147 |
123 | 148 |
124 def test_clone_anonymous_ok(): | 149 def test_clone_anonymous_ok(): |
125 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | 150 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
168 | 193 |
169 | 194 |
170 def test_pull(): | 195 def test_pull(): |
171 pass | 196 pass |
172 | 197 |
173 def test_push(): | 198 def test_push_modify_file(f_name='setup.py'): |
174 | 199 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
175 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') | 200 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) |
176 for i in xrange(5): | 201 for i in xrange(5): |
177 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | 202 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) |
178 Command(cwd).execute(cmd) | 203 Command(cwd).execute(cmd) |
179 | 204 |
180 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) | 205 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) |
182 | 207 |
183 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) | 208 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) |
184 | 209 |
185 def test_push_new_file(commits=15): | 210 def test_push_new_file(commits=15): |
186 | 211 |
187 test_clone() | 212 test_clone(no_errors=True) |
188 | 213 |
189 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | 214 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
190 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) | 215 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) |
191 | 216 |
192 Command(cwd).execute('touch %s' % added_file) | 217 Command(cwd).execute('touch %s' % added_file) |
208 'dest':jn(TESTS_TMP_PATH, HG_REPO)} | 233 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
209 | 234 |
210 Command(cwd).execute('hg push --verbose --debug %s' % push_url) | 235 Command(cwd).execute('hg push --verbose --debug %s' % push_url) |
211 | 236 |
212 def test_push_wrong_credentials(): | 237 def test_push_wrong_credentials(): |
213 | 238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
214 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | 239 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
215 {'user':USER + 'xxx', | 240 {'user':USER + 'xxx', |
216 'pass':PASS, | 241 'pass':PASS, |
217 'host':HOST, | 242 'host':HOST, |
218 'cloned_repo':HG_REPO, | 243 'cloned_repo':HG_REPO, |
260 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) | 285 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) |
261 assert """abort: HTTP Error 403: Forbidden""" in stderr | 286 assert """abort: HTTP Error 403: Forbidden""" in stderr |
262 | 287 |
263 | 288 |
264 if __name__ == '__main__': | 289 if __name__ == '__main__': |
265 #create_test_user() | 290 create_test_user(force=False) |
266 test_clone() | 291 create_test_repo() |
267 test_clone_anonymous_ok() | 292 #test_push_modify_file() |
293 #test_clone() | |
294 #test_clone_anonymous_ok() | |
268 | 295 |
269 #test_clone_wrong_credentials() | 296 #test_clone_wrong_credentials() |
270 | 297 |
271 #test_pull() | 298 #test_pull() |
272 #test_push_new_file(3) | 299 test_push_new_file(commits=3) |
273 #test_push_wrong_path() | 300 #test_push_wrong_path() |
274 #test_push_wrong_credentials() | 301 #test_push_wrong_credentials() |
275 | 302 |
276 | 303 |