# HG changeset patch # User domruf # Date 1490222034 -3600 # Node ID 27279bdcb598c591f20f3c1c44e6a29b76b6a00b # Parent d7cdef68240b845cdeb4f6be45e054e9ccbda3d0 tests: let the webserver fixture generate URLs for the vcs tests diff -r d7cdef68240b -r 27279bdcb598 kallithea/tests/conftest.py --- a/kallithea/tests/conftest.py Mon May 02 22:41:51 2016 +0200 +++ b/kallithea/tests/conftest.py Wed Mar 22 23:33:54 2017 +0100 @@ -146,17 +146,27 @@ yield +class MyWSGIServer(WSGIServer): + def repo_url(self, repo_name, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS): + """Return URL to repo on this web server.""" + host, port = self.server_address + proto = 'http' if self._server.ssl_context is None else 'https' + auth = '' + if username is not None: + auth = username + if password is not None: + auth += ':' + password + if auth: + auth += '@' + return '%s://%s%s:%s/%s' % (proto, auth, host, port, repo_name) + @pytest.yield_fixture(scope="session") def webserver(): """Start web server while tests are running. Useful for debugging and necessary for vcs operation tests.""" - server = WSGIServer(application=kallithea.tests.base.testapp) + server = MyWSGIServer(application=kallithea.tests.base.testapp) server.start() - # temporary hack for using this server for the vcs tests: - from kallithea.tests.other import manual_test_vcs_operations - manual_test_vcs_operations.HOST = '%s:%s' % server.server_address - yield server server.stop() diff -r d7cdef68240b -r 27279bdcb598 kallithea/tests/other/manual_test_vcs_operations.py --- a/kallithea/tests/other/manual_test_vcs_operations.py Mon May 02 22:41:51 2016 +0200 +++ b/kallithea/tests/other/manual_test_vcs_operations.py Wed Mar 22 23:33:54 2017 +0100 @@ -81,24 +81,8 @@ return tempfile.mkdtemp(prefix='rc_integration_test') -def _construct_url(repo, **kwargs): - """Return a clone url for the provided repo path. - Optional named parameters: user, passwd and host.""" - params = { - 'user': TEST_USER_ADMIN_LOGIN, - 'passwd': TEST_USER_ADMIN_PASS, - 'host': HOST, - 'cloned_repo': repo, - } - params.update(**kwargs) - if params['user'] and params['passwd']: - _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params - else: - _url = 'http://(host)s/%(cloned_repo)s' % params - return _url - - -def _add_files_and_push(vcs, DEST, ignoreReturnCode=False, **kwargs): +def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3, + clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS): """ Generate some files, add it to DEST repo and push back vcs is git or hg and defines what VCS we want to make those files for @@ -118,7 +102,7 @@ author_str = 'User <%s>' % email else: author_str = 'User ǝɯɐᴎ <%s>' % email - for i in xrange(kwargs.get('files_no', 3)): + for i in xrange(files_no): cmd = """echo "added_line%s" >> %s""" % (i, added_file) Command(cwd).execute(cmd) if vcs == 'hg': @@ -139,9 +123,9 @@ elif vcs == 'git': _REPO = GIT_REPO - clone_url = _construct_url(_REPO, **kwargs) - if 'clone_url' in kwargs: - clone_url = kwargs['clone_url'] + if clone_url is None: + clone_url = webserver.repo_url(_REPO, username=username, password=password) + stdout = stderr = None if vcs == 'hg': stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode) @@ -193,7 +177,7 @@ Session().commit() def test_clone_hg_repo_by_admin(self, webserver): - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir()) assert 'requesting all changes' in stdout @@ -204,51 +188,51 @@ assert stderr == '' def test_clone_git_repo_by_admin(self, webserver): - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir()) assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' def test_clone_wrong_credentials_hg(self, webserver): - clone_url = _construct_url(HG_REPO, passwd='bad!') + clone_url = webserver.repo_url(HG_REPO, password='bad!') stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'abort: authorization failed' in stderr def test_clone_wrong_credentials_git(self, webserver): - clone_url = _construct_url(GIT_REPO, passwd='bad!') + clone_url = webserver.repo_url(GIT_REPO, password='bad!') stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'fatal: Authentication failed' in stderr def test_clone_git_dir_as_hg(self, webserver): - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'HTTP Error 404: Not Found' in stderr def test_clone_hg_repo_as_git(self, webserver): - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'not found' in stderr def test_clone_non_existing_path_hg(self, webserver): - clone_url = _construct_url('trololo') + clone_url = webserver.repo_url('trololo') stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'HTTP Error 404: Not Found' in stderr def test_clone_non_existing_path_git(self, webserver): - clone_url = _construct_url('trololo') + clone_url = webserver.repo_url('trololo') stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'not found' in stderr def test_push_new_file_hg(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next()) fixture.create_fork(HG_REPO, fork_name) - clone_url = _construct_url(fork_name).split()[0] - stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url) + clone_url = webserver.repo_url(fork_name) + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url) assert 'pushing to' in stdout assert 'Repository size' in stdout @@ -256,14 +240,14 @@ def test_push_new_file_git(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) # commit some stuff into this repo fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next()) fixture.create_fork(GIT_REPO, fork_name) - clone_url = _construct_url(fork_name).split()[0] - stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url) + clone_url = webserver.repo_url(fork_name) + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url) print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for _check_proper_git_push(stdout, stderr) @@ -278,13 +262,13 @@ Session().commit() DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next()) fixture.create_fork(HG_REPO, fork_name) - clone_url = _construct_url(fork_name).split()[0] - stdout, stderr = _add_files_and_push('hg', DEST, files_no=1, clone_url=clone_url) + clone_url = webserver.repo_url(fork_name) + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==fork_name).all() @@ -301,14 +285,14 @@ Session().commit() DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) # commit some stuff into this repo fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next()) fixture.create_fork(GIT_REPO, fork_name) - clone_url = _construct_url(fork_name).split()[0] - stdout, stderr = _add_files_and_push('git', DEST, files_no=1, clone_url=clone_url) + clone_url = webserver.repo_url(fork_name) + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url) _check_proper_git_push(stdout, stderr) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key @@ -317,42 +301,42 @@ def test_push_wrong_credentials_hg(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) - stdout, stderr = _add_files_and_push('hg', DEST, user='bad', - passwd='name', ignoreReturnCode=True) + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad', + password='name', ignoreReturnCode=True) assert 'abort: authorization failed' in stderr def test_push_wrong_credentials_git(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) - stdout, stderr = _add_files_and_push('git', DEST, user='bad', - passwd='name', ignoreReturnCode=True) + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad', + password='name', ignoreReturnCode=True) assert 'fatal: Authentication failed' in stderr def test_push_back_to_wrong_url_hg(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) - stdout, stderr = _add_files_and_push('hg', DEST, - clone_url='http://%s/tmp' % HOST, + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, + clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]), ignoreReturnCode = True) assert 'HTTP Error 404: Not Found' in stderr def test_push_back_to_wrong_url_git(self, webserver): DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) - stdout, stderr = _add_files_and_push('git', DEST, - clone_url='http://%s/tmp' % HOST, + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, + clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]), ignoreReturnCode = True) assert 'not found' in stderr @@ -363,7 +347,7 @@ r.enable_locking = True Session().commit() # clone - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir()) #check if lock was made @@ -376,7 +360,7 @@ r.enable_locking = True Session().commit() # clone - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir()) #check if lock was made @@ -388,7 +372,7 @@ r = Repository.get_by_repo_name(HG_REPO) Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #pull fails since repo is locked - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" % (HG_REPO, TEST_USER_ADMIN_LOGIN)) @@ -399,7 +383,7 @@ r = Repository.get_by_repo_name(GIT_REPO) Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #pull fails since repo is locked - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) msg = ("""The requested URL returned error: 423""") assert msg in stderr @@ -407,7 +391,7 @@ def test_push_on_locked_repo_by_other_user_hg(self, webserver): #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) #lock repo @@ -419,9 +403,9 @@ Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #push fails repo is locked by other user ! - stdout, stderr = _add_files_and_push('hg', DEST, - user=TEST_USER_REGULAR_LOGIN, - passwd=TEST_USER_REGULAR_PASS, + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, + username=TEST_USER_REGULAR_LOGIN, + password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True) msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" % (HG_REPO, TEST_USER_ADMIN_LOGIN)) @@ -433,7 +417,7 @@ #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) #lock repo @@ -445,9 +429,9 @@ Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) #push fails repo is locked by other user ! - stdout, stderr = _add_files_and_push('git', DEST, - user=TEST_USER_REGULAR_LOGIN, - passwd=TEST_USER_REGULAR_PASS, + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, + username=TEST_USER_REGULAR_LOGIN, + password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True) err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN) assert err in stderr @@ -469,7 +453,7 @@ Session().commit() #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(fork_name) + clone_url = webserver.repo_url(fork_name) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, DEST) #check for lock repo after clone @@ -478,7 +462,7 @@ assert r.locked[0] == uid #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0]) + stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url) assert str('remote: Released lock on repo `%s`' % fork_name) in stdout #we need to cleanup the Session Here ! Session.remove() @@ -495,7 +479,7 @@ Session().commit() #clone some temp DEST = _get_tmp_dir() - clone_url = _construct_url(fork_name) + clone_url = webserver.repo_url(fork_name) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, DEST) #check for lock repo after clone @@ -503,7 +487,7 @@ assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0]) + stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url) _check_proper_git_push(stdout, stderr) assert ('remote: Released lock on repo `%s`' % fork_name) in stderr @@ -517,7 +501,7 @@ try: user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') Session().commit() - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'abort: HTTP Error 403: Forbidden' in stderr finally: @@ -529,7 +513,7 @@ # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) - clone_url = _construct_url(HG_REPO) + clone_url = webserver.repo_url(HG_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url, _get_tmp_dir()) assert 'requesting all changes' in stdout @@ -544,7 +528,7 @@ try: user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') Session().commit() - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) # The message apparently changed in Git 1.8.3, so match it loosely. assert re.search(r'\b403\b', stderr) @@ -557,7 +541,7 @@ # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) - clone_url = _construct_url(GIT_REPO) + clone_url = webserver.repo_url(GIT_REPO) stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url, _get_tmp_dir()) assert 'Cloning into' in stdout + stderr