comparison rhodecode/tests/test_hg_operations.py @ 909:1f0e37c0854d beta

fixed hg operations test script
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 03 Jan 2011 00:30:05 +0100
parents a7efcee0f399
children 811fa5d45de8
comparison
equal deleted inserted replaced
908:de560c47dd03 909:1f0e37c0854d
24 PASS = 'test12' 24 PASS = 'test12'
25 HOST = '127.0.0.1:5000' 25 HOST = '127.0.0.1:5000'
26 26
27 log = logging.getLogger(__name__) 27 log = logging.getLogger(__name__)
28 28
29 def __execute_cmd(cmd, *args): 29
30 """Runs command on the system with given ``args``. 30 class Command(object):
31 """ 31
32 32 def __init__(self, cwd):
33 command = cmd + ' ' + ' '.join(args) 33 self.cwd = cwd
34 log.debug('Executing %s' % command) 34
35 print command 35 def execute(self, cmd, *args):
36 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE) 36 """Runs command on the system with given ``args``.
37 stdout, stderr = p.communicate() 37 """
38 print stdout, stderr 38
39 return stdout, stderr 39 command = cmd + ' ' + ' '.join(args)
40 log.debug('Executing %s' % command)
41 print command
42 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
43 stdout, stderr = p.communicate()
44 print stdout, stderr
45 return stdout, stderr
40 46
41 47
42 #=============================================================================== 48 #===============================================================================
43 # TESTS 49 # TESTS
44 #=============================================================================== 50 #===============================================================================
45 def test_clone(): 51 def test_clone():
46 #rm leftovers 52 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
47 try: 53
48 log.debug('removing old directory') 54 try:
49 shutil.rmtree(jn(TESTS_TMP_PATH, HG_REPO)) 55 shutil.rmtree(path, ignore_errors=True)
50 except OSError: 56 os.makedirs(path)
51 pass 57 #print 'made dirs %s' % jn(path)
58 except OSError:
59 raise
60
52 61
53 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ 62 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
54 {'user':USER, 63 {'user':USER,
55 'pass':PASS, 64 'pass':PASS,
56 'host':HOST, 65 'host':HOST,
57 'cloned_repo':HG_REPO, 66 'cloned_repo':HG_REPO,
58 'dest':jn(TESTS_TMP_PATH, HG_REPO)} 67 'dest':path}
59 68
60 stdout, stderr = __execute_cmd('hg clone', clone_url) 69 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
70
71 assert """adding file changes""" in stdout, 'no messages about cloning'
72 assert """abort""" not in stderr , 'got error from clone'
73
74
75
76 def test_clone_anonymous_ok():
77 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
78
79 try:
80 shutil.rmtree(path, ignore_errors=True)
81 os.makedirs(path)
82 #print 'made dirs %s' % jn(path)
83 except OSError:
84 raise
85
86
87 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
88 {'user':USER,
89 'pass':PASS,
90 'host':HOST,
91 'cloned_repo':HG_REPO,
92 'dest':path}
93
94 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
95 print stdout, stderr
96 assert """adding file changes""" in stdout, 'no messages about cloning'
97 assert """abort""" not in stderr , 'got error from clone'
98
99 def test_clone_wrong_credentials():
100 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
101
102 try:
103 shutil.rmtree(path, ignore_errors=True)
104 os.makedirs(path)
105 #print 'made dirs %s' % jn(path)
106 except OSError:
107 raise
108
109
110 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
111 {'user':USER + 'error',
112 'pass':PASS,
113 'host':HOST,
114 'cloned_repo':HG_REPO,
115 'dest':path}
116
117 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
118
119 assert """abort: authorization failed""" in stderr , 'no error from wrong credentials'
120
61 121
62 def test_pull(): 122 def test_pull():
63 pass 123 pass
64 124
65 def test_push(): 125 def test_push():
66 126
67 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') 127 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
68 for i in xrange(5): 128 for i in xrange(5):
69 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) 129 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
70 __execute_cmd(cmd) 130 Command(cwd).execute(cmd)
71 131
72 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) 132 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
73 __execute_cmd(cmd) 133 Command(cwd).execute(cmd)
74 134
75 __execute_cmd('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) 135 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
76 136
77 def test_push_new_file(): 137 def test_push_new_file():
78 added_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') 138
79 139 test_clone()
80 __execute_cmd('touch %s' % added_file) 140
81 141 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
82 __execute_cmd('hg add %s' % added_file) 142 added_file = jn(path, 'setup.py')
143
144 Command(cwd).execute('touch %s' % added_file)
145
146 Command(cwd).execute('hg add %s' % added_file)
83 147
84 for i in xrange(15): 148 for i in xrange(15):
85 cmd = """echo 'added_line%s' >> %s""" % (i, added_file) 149 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
86 __execute_cmd(cmd) 150 Command(cwd).execute(cmd)
87 151
88 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) 152 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
89 __execute_cmd(cmd) 153 Command(cwd).execute(cmd)
90 154
91 __execute_cmd('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) 155 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
156 {'user':USER,
157 'pass':PASS,
158 'host':HOST,
159 'cloned_repo':HG_REPO,
160 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
161
162 Command(cwd).execute('hg push %s' % push_url)
92 163
93 def test_push_wrong_credentials(): 164 def test_push_wrong_credentials():
94 165
95 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ 166 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
96 {'user':USER + 'xxx', 167 {'user':USER + 'xxx',
100 'dest':jn(TESTS_TMP_PATH, HG_REPO)} 171 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
101 172
102 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') 173 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
103 for i in xrange(5): 174 for i in xrange(5):
104 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) 175 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
105 __execute_cmd(cmd) 176 Command(cwd).execute(cmd)
106 177
107 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) 178 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
108 __execute_cmd(cmd) 179 Command(cwd).execute(cmd)
109 180
110 __execute_cmd('hg push %s' % clone_url) 181 Command(cwd).execute('hg push %s' % clone_url)
111 182
112 def test_push_wrong_path(): 183 def test_push_wrong_path():
113 added_file = jn(TESTS_TMP_PATH, HG_REPO, 'somefile.py') 184 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
114 185 added_file = jn(path, 'somefile.py')
115 try: 186
116 os.makedirs(jn(TESTS_TMP_PATH, HG_REPO)) 187 try:
117 except OSError: 188 shutil.rmtree(path, ignore_errors=True)
118 pass 189 os.makedirs(path)
119 190 print 'made dirs %s' % jn(path)
120 __execute_cmd("""echo '' > %s""" % added_file) 191 except OSError:
121 192 raise
122 __execute_cmd("""hg add %s""" % added_file) 193
194 Command(cwd).execute("""echo '' > %s""" % added_file)
195 Command(cwd).execute("""hg init %s""" % path)
196 Command(cwd).execute("""hg add %s""" % added_file)
123 197
124 for i in xrange(2): 198 for i in xrange(2):
125 cmd = """echo 'added_line%s' >> %s""" % (i, added_file) 199 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
126 __execute_cmd(cmd) 200 Command(cwd).execute(cmd)
127 201
128 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) 202 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
129 __execute_cmd(cmd) 203 Command(cwd).execute(cmd)
130 204
131 __execute_cmd('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO + '_error')) 205 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
206 {'user':USER,
207 'pass':PASS,
208 'host':HOST,
209 'cloned_repo':HG_REPO + '_error',
210 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
211
212 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
213 assert """abort: HTTP Error 403: Forbidden""" in stderr
214
132 215
133 if __name__ == '__main__': 216 if __name__ == '__main__':
134 test_clone() 217 test_clone()
135 test_push_wrong_path() 218 test_clone_wrong_credentials()
136 test_push_wrong_credentials() 219 ##test_clone_anonymous_ok()
137 test_push_new_file() 220
138 221 #test_push_new_file()
222 #test_push_wrong_path()
223 #test_push_wrong_credentials()
224
225