comparison rhodecode/tests/test_concurency.py @ 1529:0b268dd369ec beta

Fixed test_hg_operations test and added concurency test
author Marcin Kuzminski <marcin@python-works.com>
date Fri, 07 Oct 2011 23:17:45 +0200
parents
children 2afe9320d5e6
comparison
equal deleted inserted replaced
1528:1d7a621d396f 1529:0b268dd369ec
1 # -*- coding: utf-8 -*-
2 """
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
6 Test suite for making push/pull operations
7
8 :created_on: Dec 30, 2010
9 :copyright: (c) 2010 by marcink.
10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 """
12
13 import os
14 import sys
15 import shutil
16 import logging
17 from os.path import join as jn
18 from os.path import dirname as dn
19
20 from tempfile import _RandomNameSequence
21 from subprocess import Popen, PIPE
22
23 from paste.deploy import appconfig
24 from pylons import config
25 from sqlalchemy import engine_from_config
26
27 from rhodecode.lib.utils import add_cache
28 from rhodecode.model import init_model
29 from rhodecode.model import meta
30 from rhodecode.model.db import User, Repository
31 from rhodecode.lib.auth import get_crypt_password
32
33 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
34 from rhodecode.config.environment import load_environment
35
36 rel_path = dn(dn(dn(os.path.abspath(__file__))))
37 conf = appconfig('config:development.ini', relative_to=rel_path)
38 load_environment(conf.global_conf, conf.local_conf)
39
40 add_cache(conf)
41
42 USER = 'test_admin'
43 PASS = 'test12'
44 HOST = '127.0.0.1:5000'
45 DEBUG = True
46 log = logging.getLogger(__name__)
47
48
49 class Command(object):
50
51 def __init__(self, cwd):
52 self.cwd = cwd
53
54 def execute(self, cmd, *args):
55 """Runs command on the system with given ``args``.
56 """
57
58 command = cmd + ' ' + ' '.join(args)
59 log.debug('Executing %s' % command)
60 if DEBUG:
61 print command
62 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
63 stdout, stderr = p.communicate()
64 if DEBUG:
65 print stdout, stderr
66 return stdout, stderr
67
68 def get_session():
69 engine = engine_from_config(conf, 'sqlalchemy.db1.')
70 init_model(engine)
71 sa = meta.Session()
72 return sa
73
74
75 def create_test_user(force=True):
76 print 'creating test user'
77 sa = get_session()
78
79 user = sa.query(User).filter(User.username == USER).scalar()
80
81 if force and user is not None:
82 print 'removing current user'
83 for repo in sa.query(Repository).filter(Repository.user == user).all():
84 sa.delete(repo)
85 sa.delete(user)
86 sa.commit()
87
88 if user is None or force:
89 print 'creating new one'
90 new_usr = User()
91 new_usr.username = USER
92 new_usr.password = get_crypt_password(PASS)
93 new_usr.email = 'mail@mail.com'
94 new_usr.name = 'test'
95 new_usr.lastname = 'lasttestname'
96 new_usr.active = True
97 new_usr.admin = True
98 sa.add(new_usr)
99 sa.commit()
100
101 print 'done'
102
103
104 def create_test_repo(force=True):
105 print 'creating test repo'
106 from rhodecode.model.repo import RepoModel
107 sa = get_session()
108
109 user = sa.query(User).filter(User.username == USER).scalar()
110 if user is None:
111 raise Exception('user not found')
112
113
114 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
115
116 if repo is None:
117 print 'repo not found creating'
118
119 form_data = {'repo_name':HG_REPO,
120 'repo_type':'hg',
121 'private':False,
122 'clone_uri':'' }
123 rm = RepoModel(sa)
124 rm.base_path = '/home/hg'
125 rm.create(form_data, user)
126
127 print 'done'
128
129 def set_anonymous_access(enable=True):
130 sa = get_session()
131 user = sa.query(User).filter(User.username == 'default').one()
132 user.active = enable
133 sa.add(user)
134 sa.commit()
135
136 def get_anonymous_access():
137 sa = get_session()
138 return sa.query(User).filter(User.username == 'default').one().active
139
140
141 #==============================================================================
142 # TESTS
143 #==============================================================================
144 def test_clone_with_credentials(no_errors=False,repo=HG_REPO):
145 cwd = path = jn(TESTS_TMP_PATH, repo)
146
147
148 try:
149 shutil.rmtree(path, ignore_errors=True)
150 os.makedirs(path)
151 #print 'made dirs %s' % jn(path)
152 except OSError:
153 raise
154
155
156 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
157 {'user':USER,
158 'pass':PASS,
159 'host':HOST,
160 'cloned_repo':repo,
161 'dest':path+_RandomNameSequence().next()}
162
163 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
164
165 if no_errors is False:
166 assert """adding file changes""" in stdout, 'no messages about cloning'
167 assert """abort""" not in stderr , 'got error from clone'
168
169 if __name__ == '__main__':
170 try:
171 create_test_user(force=False)
172
173 for i in range(int(sys.argv[2])):
174 test_clone_with_credentials(repo=sys.argv[1])
175
176 except Exception,e:
177 sys.exit('stop on %s' % e)