comparison rhodecode/tests/_test_concurency.py @ 1628:de71a4bde097 beta

Some code cleanups and fixes
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 31 Oct 2011 21:42:41 +0200
parents rhodecode/tests/test_concurency.py@2afe9320d5e6
children a404060706c0
comparison
equal deleted inserted replaced
1627:c622e3a85499 1628:de71a4bde097
1 # -*- coding: utf-8 -*-
2 """
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
6 Test suite for making push/pull operations
7
8 :created_on: Dec 30, 2010
9 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :license: GPLv3, see COPYING for more details.
11 """
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24
25 import os
26 import sys
27 import shutil
28 import logging
29 from os.path import join as jn
30 from os.path import dirname as dn
31
32 from tempfile import _RandomNameSequence
33 from subprocess import Popen, PIPE
34
35 from paste.deploy import appconfig
36 from pylons import config
37 from sqlalchemy import engine_from_config
38
39 from rhodecode.lib.utils import add_cache
40 from rhodecode.model import init_model
41 from rhodecode.model import meta
42 from rhodecode.model.db import User, Repository
43 from rhodecode.lib.auth import get_crypt_password
44
45 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
46 from rhodecode.config.environment import load_environment
47
48 rel_path = dn(dn(dn(os.path.abspath(__file__))))
49 conf = appconfig('config:development.ini', relative_to=rel_path)
50 load_environment(conf.global_conf, conf.local_conf)
51
52 add_cache(conf)
53
54 USER = 'test_admin'
55 PASS = 'test12'
56 HOST = '127.0.0.1:5000'
57 DEBUG = True
58 log = logging.getLogger(__name__)
59
60
61 class Command(object):
62
63 def __init__(self, cwd):
64 self.cwd = cwd
65
66 def execute(self, cmd, *args):
67 """Runs command on the system with given ``args``.
68 """
69
70 command = cmd + ' ' + ' '.join(args)
71 log.debug('Executing %s' % command)
72 if DEBUG:
73 print command
74 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
75 stdout, stderr = p.communicate()
76 if DEBUG:
77 print stdout, stderr
78 return stdout, stderr
79
80 def get_session():
81 engine = engine_from_config(conf, 'sqlalchemy.db1.')
82 init_model(engine)
83 sa = meta.Session()
84 return sa
85
86
87 def create_test_user(force=True):
88 print 'creating test user'
89 sa = get_session()
90
91 user = sa.query(User).filter(User.username == USER).scalar()
92
93 if force and user is not None:
94 print 'removing current user'
95 for repo in sa.query(Repository).filter(Repository.user == user).all():
96 sa.delete(repo)
97 sa.delete(user)
98 sa.commit()
99
100 if user is None or force:
101 print 'creating new one'
102 new_usr = User()
103 new_usr.username = USER
104 new_usr.password = get_crypt_password(PASS)
105 new_usr.email = 'mail@mail.com'
106 new_usr.name = 'test'
107 new_usr.lastname = 'lasttestname'
108 new_usr.active = True
109 new_usr.admin = True
110 sa.add(new_usr)
111 sa.commit()
112
113 print 'done'
114
115
116 def create_test_repo(force=True):
117 print 'creating test repo'
118 from rhodecode.model.repo import RepoModel
119 sa = get_session()
120
121 user = sa.query(User).filter(User.username == USER).scalar()
122 if user is None:
123 raise Exception('user not found')
124
125
126 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
127
128 if repo is None:
129 print 'repo not found creating'
130
131 form_data = {'repo_name':HG_REPO,
132 'repo_type':'hg',
133 'private':False,
134 'clone_uri':'' }
135 rm = RepoModel(sa)
136 rm.base_path = '/home/hg'
137 rm.create(form_data, user)
138
139 print 'done'
140
141 def set_anonymous_access(enable=True):
142 sa = get_session()
143 user = sa.query(User).filter(User.username == 'default').one()
144 user.active = enable
145 sa.add(user)
146 sa.commit()
147
148 def get_anonymous_access():
149 sa = get_session()
150 return sa.query(User).filter(User.username == 'default').one().active
151
152
153 #==============================================================================
154 # TESTS
155 #==============================================================================
156 def test_clone_with_credentials(no_errors=False, repo=HG_REPO):
157 cwd = path = jn(TESTS_TMP_PATH, repo)
158
159
160 try:
161 shutil.rmtree(path, ignore_errors=True)
162 os.makedirs(path)
163 #print 'made dirs %s' % jn(path)
164 except OSError:
165 raise
166
167
168 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
169 {'user':USER,
170 'pass':PASS,
171 'host':HOST,
172 'cloned_repo':repo,
173 'dest':path + _RandomNameSequence().next()}
174
175 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
176
177 if no_errors is False:
178 assert """adding file changes""" in stdout, 'no messages about cloning'
179 assert """abort""" not in stderr , 'got error from clone'
180
181 if __name__ == '__main__':
182 try:
183 create_test_user(force=False)
184
185 for i in range(int(sys.argv[2])):
186 test_clone_with_credentials(repo=sys.argv[1])
187
188 except Exception, e:
189 sys.exit('stop on %s' % e)