changeset 6632:27279bdcb598

tests: let the webserver fixture generate URLs for the vcs tests
author domruf <dominikruf@gmail.com>
date Wed, 22 Mar 2017 23:33:54 +0100
parents d7cdef68240b
children 7f15fb03a183
files kallithea/tests/conftest.py kallithea/tests/other/manual_test_vcs_operations.py
diffstat 2 files changed, 73 insertions(+), 79 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/tests/conftest.py	Mon May 02 22:41:51 2016 +0200
+++ b/kallithea/tests/conftest.py	Wed Mar 22 23:33:54 2017 +0100
@@ -146,17 +146,27 @@
         yield
 
 
+class MyWSGIServer(WSGIServer):
+    def repo_url(self, repo_name, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
+        """Return URL to repo on this web server."""
+        host, port = self.server_address
+        proto = 'http' if self._server.ssl_context is None else 'https'
+        auth = ''
+        if username is not None:
+            auth = username
+            if password is not None:
+                auth += ':' + password
+        if auth:
+            auth += '@'
+        return '%s://%s%s:%s/%s' % (proto, auth, host, port, repo_name)
+
 @pytest.yield_fixture(scope="session")
 def webserver():
     """Start web server while tests are running.
     Useful for debugging and necessary for vcs operation tests."""
-    server = WSGIServer(application=kallithea.tests.base.testapp)
+    server = MyWSGIServer(application=kallithea.tests.base.testapp)
     server.start()
 
-    # temporary hack for using this server for the vcs tests:
-    from kallithea.tests.other import manual_test_vcs_operations
-    manual_test_vcs_operations.HOST = '%s:%s' % server.server_address
-
     yield server
 
     server.stop()
--- a/kallithea/tests/other/manual_test_vcs_operations.py	Mon May 02 22:41:51 2016 +0200
+++ b/kallithea/tests/other/manual_test_vcs_operations.py	Wed Mar 22 23:33:54 2017 +0100
@@ -81,24 +81,8 @@
     return tempfile.mkdtemp(prefix='rc_integration_test')
 
 
-def _construct_url(repo, **kwargs):
-    """Return a clone url for the provided repo path.
-    Optional named parameters: user, passwd and host."""
-    params = {
-        'user': TEST_USER_ADMIN_LOGIN,
-        'passwd': TEST_USER_ADMIN_PASS,
-        'host': HOST,
-        'cloned_repo': repo,
-    }
-    params.update(**kwargs)
-    if params['user'] and params['passwd']:
-        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params
-    else:
-        _url = 'http://(host)s/%(cloned_repo)s' % params
-    return _url
-
-
-def _add_files_and_push(vcs, DEST, ignoreReturnCode=False, **kwargs):
+def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
+                        clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
     """
     Generate some files, add it to DEST repo and push back
     vcs is git or hg and defines what VCS we want to make those files for
@@ -118,7 +102,7 @@
         author_str = 'User <%s>' % email
     else:
         author_str = 'User ǝɯɐᴎ <%s>' % email
-    for i in xrange(kwargs.get('files_no', 3)):
+    for i in xrange(files_no):
         cmd = """echo "added_line%s" >> %s""" % (i, added_file)
         Command(cwd).execute(cmd)
         if vcs == 'hg':
@@ -139,9 +123,9 @@
     elif vcs == 'git':
         _REPO = GIT_REPO
 
-    clone_url = _construct_url(_REPO, **kwargs)
-    if 'clone_url' in kwargs:
-        clone_url = kwargs['clone_url']
+    if clone_url is None:
+        clone_url = webserver.repo_url(_REPO, username=username, password=password)
+
     stdout = stderr = None
     if vcs == 'hg':
         stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
@@ -193,7 +177,7 @@
         Session().commit()
 
     def test_clone_hg_repo_by_admin(self, webserver):
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir())
 
         assert 'requesting all changes' in stdout
@@ -204,51 +188,51 @@
         assert stderr == ''
 
     def test_clone_git_repo_by_admin(self, webserver):
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir())
 
         assert 'Cloning into' in stdout + stderr
         assert stderr == '' or stdout == ''
 
     def test_clone_wrong_credentials_hg(self, webserver):
-        clone_url = _construct_url(HG_REPO, passwd='bad!')
+        clone_url = webserver.repo_url(HG_REPO, password='bad!')
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'abort: authorization failed' in stderr
 
     def test_clone_wrong_credentials_git(self, webserver):
-        clone_url = _construct_url(GIT_REPO, passwd='bad!')
+        clone_url = webserver.repo_url(GIT_REPO, password='bad!')
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'fatal: Authentication failed' in stderr
 
     def test_clone_git_dir_as_hg(self, webserver):
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'HTTP Error 404: Not Found' in stderr
 
     def test_clone_hg_repo_as_git(self, webserver):
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'not found' in stderr
 
     def test_clone_non_existing_path_hg(self, webserver):
-        clone_url = _construct_url('trololo')
+        clone_url = webserver.repo_url('trololo')
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'HTTP Error 404: Not Found' in stderr
 
     def test_clone_non_existing_path_git(self, webserver):
-        clone_url = _construct_url('trololo')
+        clone_url = webserver.repo_url('trololo')
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         assert 'not found' in stderr
 
     def test_push_new_file_hg(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
         fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
         fixture.create_fork(HG_REPO, fork_name)
-        clone_url = _construct_url(fork_name).split()[0]
-        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url)
+        clone_url = webserver.repo_url(fork_name)
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
         assert 'pushing to' in stdout
         assert 'Repository size' in stdout
@@ -256,14 +240,14 @@
 
     def test_push_new_file_git(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
         # commit some stuff into this repo
         fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
         fixture.create_fork(GIT_REPO, fork_name)
-        clone_url = _construct_url(fork_name).split()[0]
-        stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url)
+        clone_url = webserver.repo_url(fork_name)
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
         print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
         _check_proper_git_push(stdout, stderr)
 
@@ -278,13 +262,13 @@
         Session().commit()
 
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
         fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
         fixture.create_fork(HG_REPO, fork_name)
-        clone_url = _construct_url(fork_name).split()[0]
-        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1, clone_url=clone_url)
+        clone_url = webserver.repo_url(fork_name)
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url)
 
         key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                                ==fork_name).all()
@@ -301,14 +285,14 @@
         Session().commit()
 
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
         # commit some stuff into this repo
         fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
         fixture.create_fork(GIT_REPO, fork_name)
-        clone_url = _construct_url(fork_name).split()[0]
-        stdout, stderr = _add_files_and_push('git', DEST, files_no=1, clone_url=clone_url)
+        clone_url = webserver.repo_url(fork_name)
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url)
         _check_proper_git_push(stdout, stderr)
 
         key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
@@ -317,42 +301,42 @@
 
     def test_push_wrong_credentials_hg(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
-        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
-                                             passwd='name', ignoreReturnCode=True)
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad',
+                                             password='name', ignoreReturnCode=True)
 
         assert 'abort: authorization failed' in stderr
 
     def test_push_wrong_credentials_git(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
-        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
-                                             passwd='name', ignoreReturnCode=True)
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad',
+                                             password='name', ignoreReturnCode=True)
 
         assert 'fatal: Authentication failed' in stderr
 
     def test_push_back_to_wrong_url_hg(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
-        stdout, stderr = _add_files_and_push('hg', DEST,
-                                    clone_url='http://%s/tmp' % HOST,
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
+                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
                                     ignoreReturnCode = True)
 
         assert 'HTTP Error 404: Not Found' in stderr
 
     def test_push_back_to_wrong_url_git(self, webserver):
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
-        stdout, stderr = _add_files_and_push('git', DEST,
-                                    clone_url='http://%s/tmp' % HOST,
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
+                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
                                     ignoreReturnCode = True)
 
         assert 'not found' in stderr
@@ -363,7 +347,7 @@
         r.enable_locking = True
         Session().commit()
         # clone
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir())
 
         #check if lock was made
@@ -376,7 +360,7 @@
         r.enable_locking = True
         Session().commit()
         # clone
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir())
 
         #check if lock was made
@@ -388,7 +372,7 @@
         r = Repository.get_by_repo_name(HG_REPO)
         Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
         #pull fails since repo is locked
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
@@ -399,7 +383,7 @@
         r = Repository.get_by_repo_name(GIT_REPO)
         Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
         #pull fails since repo is locked
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
         msg = ("""The requested URL returned error: 423""")
         assert msg in stderr
@@ -407,7 +391,7 @@
     def test_push_on_locked_repo_by_other_user_hg(self, webserver):
         #clone some temp
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
         #lock repo
@@ -419,9 +403,9 @@
         Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
         #push fails repo is locked by other user !
-        stdout, stderr = _add_files_and_push('hg', DEST,
-                                             user=TEST_USER_REGULAR_LOGIN,
-                                             passwd=TEST_USER_REGULAR_PASS,
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
+                                             username=TEST_USER_REGULAR_LOGIN,
+                                             password=TEST_USER_REGULAR_PASS,
                                              ignoreReturnCode=True)
         msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
@@ -433,7 +417,7 @@
 
         #clone some temp
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
         #lock repo
@@ -445,9 +429,9 @@
         Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
         #push fails repo is locked by other user !
-        stdout, stderr = _add_files_and_push('git', DEST,
-                                             user=TEST_USER_REGULAR_LOGIN,
-                                             passwd=TEST_USER_REGULAR_PASS,
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
+                                             username=TEST_USER_REGULAR_LOGIN,
+                                             password=TEST_USER_REGULAR_PASS,
                                              ignoreReturnCode=True)
         err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
         assert err in stderr
@@ -469,7 +453,7 @@
         Session().commit()
         #clone some temp
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(fork_name)
+        clone_url = webserver.repo_url(fork_name)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST)
 
         #check for lock repo after clone
@@ -478,7 +462,7 @@
         assert r.locked[0] == uid
 
         #push is ok and repo is now unlocked
-        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0])
+        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
         assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
         #we need to cleanup the Session Here !
         Session.remove()
@@ -495,7 +479,7 @@
         Session().commit()
         #clone some temp
         DEST = _get_tmp_dir()
-        clone_url = _construct_url(fork_name)
+        clone_url = webserver.repo_url(fork_name)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST)
 
         #check for lock repo after clone
@@ -503,7 +487,7 @@
         assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
         #push is ok and repo is now unlocked
-        stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0])
+        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
         _check_proper_git_push(stdout, stderr)
 
         assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
@@ -517,7 +501,7 @@
         try:
             user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
             Session().commit()
-            clone_url = _construct_url(HG_REPO)
+            clone_url = webserver.repo_url(HG_REPO)
             stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
             assert 'abort: HTTP Error 403: Forbidden' in stderr
         finally:
@@ -529,7 +513,7 @@
         # IP permissions are cached, need to wait for the cache in the server process to expire
         time.sleep(1.5)
 
-        clone_url = _construct_url(HG_REPO)
+        clone_url = webserver.repo_url(HG_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir())
 
         assert 'requesting all changes' in stdout
@@ -544,7 +528,7 @@
         try:
             user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
             Session().commit()
-            clone_url = _construct_url(GIT_REPO)
+            clone_url = webserver.repo_url(GIT_REPO)
             stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
             # The message apparently changed in Git 1.8.3, so match it loosely.
             assert re.search(r'\b403\b', stderr)
@@ -557,7 +541,7 @@
         # IP permissions are cached, need to wait for the cache in the server process to expire
         time.sleep(1.5)
 
-        clone_url = _construct_url(GIT_REPO)
+        clone_url = webserver.repo_url(GIT_REPO)
         stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir())
 
         assert 'Cloning into' in stdout + stderr