Mercurial > kallithea
changeset 7294:a569b523f86a
repos: introduce low level check of clone URIs to prevent direct file system access to local repos
This is already checked in web form validation, but also check at low level to
make sure API access enforce the same invariants.
This issue was found and reported by
Kacper Szurek
https://security.szurek.pl/
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 07 May 2018 11:38:40 +0200 |
parents | 9ff917d87291 |
children | d314edb04d11 |
files | kallithea/model/repo.py kallithea/tests/api/api_base.py |
diffstat | 2 files changed, 39 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/model/repo.py Mon May 07 11:38:13 2018 +0200 +++ b/kallithea/model/repo.py Mon May 07 11:38:40 2018 +0200 @@ -33,7 +33,7 @@ from datetime import datetime from sqlalchemy.orm import subqueryload -from kallithea.lib.utils import make_ui +from kallithea.lib.utils import make_ui, is_valid_repo_uri from kallithea.lib.vcs.backends import get_backend from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \ remove_prefix, obfuscate_url_pw, get_current_authuser @@ -307,6 +307,10 @@ setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k]) clone_uri = kwargs.get('clone_uri') if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden: + # clone_uri is modified - if given a value, check it is valid + if clone_uri != '': + # will raise exception on error + is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui('db', clear_session=False)) cur_repo.clone_uri = clone_uri if 'repo_name' in kwargs: @@ -369,6 +373,9 @@ new_repo.group = repo_group new_repo.description = description or repo_name new_repo.private = private + if clone_uri: + # will raise exception on error + is_valid_repo_uri(repo_type, clone_uri, make_ui('db', clear_session=False)) new_repo.clone_uri = clone_uri new_repo.landing_rev = landing_rev @@ -638,11 +645,7 @@ Makes repository on filesystem. Operation is group aware, meaning that it will create a repository within a group, and alter the paths accordingly to the group location. - :param repo_name: - :param alias: - :param parent: - :param clone_uri: - :param repo_store_location: + Note: clone_uri is low level and not validated - it might be a file system path used for validated cloning """ from kallithea.lib.utils import is_valid_repo, is_valid_repo_group from kallithea.model.scm import ScmModel
--- a/kallithea/tests/api/api_base.py Mon May 07 11:38:13 2018 +0200 +++ b/kallithea/tests/api/api_base.py Mon May 07 11:38:40 2018 +0200 @@ -279,8 +279,12 @@ self._compare_error(id_, expected, given=response.body) def test_api_pull_remote(self): + # Note: pulling from local repos is a mis-feature - it will bypass access control + # ... but ok, if the path already has been set in the database repo_name = u'test_pull' r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) + # hack around that clone_uri can't be set to to a local path + # (as shown by test_api_create_repo_clone_uri_local) r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO) Session().commit() @@ -1042,7 +1046,8 @@ fixture.destroy_repo(repo_name) def test_api_create_repo_clone_uri_local(self): - # FIXME: cloning from local repo is a mis-feature - it will bypass access control + # cloning from local repo was a mis-feature - it would bypass access control + # TODO: introduce other test coverage of actual remote cloning clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO) repo_name = u'api-repo' id_, params = _build_data(self.apikey, 'create_repo', @@ -1052,12 +1057,8 @@ clone_uri=clone_uri, ) response = api_call(self, params) - expected = { - 'msg': 'Created new repository `%s`' % repo_name, - 'success': True, - 'task': None, - } - self._compare_ok(id_, expected, given=response.body) + expected = "failed to create repository `%s`" % repo_name + self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_and_repo_group(self): @@ -1202,8 +1203,8 @@ @parametrize('changing_attr,updates', [ ('owner', {'owner': TEST_USER_REGULAR_LOGIN}), ('description', {'description': u'new description'}), - ('clone_uri', {'clone_uri': 'http://example.com/repo'}), - ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control + ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail + ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('enable_statistics', {'enable_statistics': True}), @@ -1226,11 +1227,15 @@ if changing_attr == 'repo_group': repo_name = u'/'.join([updates['group'], repo_name]) try: - expected = { - 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), - 'repository': repo.get_api_data() - } - self._compare_ok(id_, expected, given=response.body) + if changing_attr == 'clone_uri' and updates['clone_uri']: + expected = u'failed to update repo `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + else: + expected = { + 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), + 'repository': repo.get_api_data() + } + self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group': @@ -1239,8 +1244,8 @@ @parametrize('changing_attr,updates', [ ('owner', {'owner': TEST_USER_REGULAR_LOGIN}), ('description', {'description': u'new description'}), - ('clone_uri', {'clone_uri': 'http://example.com/repo'}), - ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control + ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail + ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('enable_statistics', {'enable_statistics': True}), @@ -1265,11 +1270,15 @@ if changing_attr == 'repo_group': repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]]) try: - expected = { - 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), - 'repository': repo.get_api_data() - } - self._compare_ok(id_, expected, given=response.body) + if changing_attr == 'clone_uri' and updates['clone_uri']: + expected = u'failed to update repo `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + else: + expected = { + 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), + 'repository': repo.get_api_data() + } + self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group':