changeset 7312:fa3365c94064 stable

repos: introduce low level check of clone URIs to prevent direct file system access to local repos This is already checked in web form validation, but also check at low level to make sure API access enforce the same invariants. This issue was found and reported by Kacper Szurek https://security.szurek.pl/
author Mads Kiilerich <mads@kiilerich.com>
date Mon, 07 May 2018 11:38:40 +0200
parents 02e0d2d469bf
children eeb8ddecaee2
files kallithea/model/repo.py kallithea/tests/api/api_base.py
diffstat 2 files changed, 39 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/model/repo.py	Mon May 07 11:38:13 2018 +0200
+++ b/kallithea/model/repo.py	Mon May 07 11:38:40 2018 +0200
@@ -33,7 +33,7 @@
 from datetime import datetime
 from sqlalchemy.orm import subqueryload
 
-from kallithea.lib.utils import make_ui
+from kallithea.lib.utils import make_ui, is_valid_repo_uri
 from kallithea.lib.vcs.backends import get_backend
 from kallithea.lib.compat import json
 from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
@@ -335,6 +335,10 @@
                     setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
             clone_uri = kwargs.get('clone_uri')
             if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
+                # clone_uri is modified - if given a value, check it is valid
+                if clone_uri != '':
+                    # will raise exception on error
+                    is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui('db', clear_session=False))
                 cur_repo.clone_uri = clone_uri
 
             if 'repo_name' in kwargs:
@@ -399,6 +403,9 @@
             new_repo.group = repo_group
             new_repo.description = description or repo_name
             new_repo.private = private
+            if clone_uri:
+                # will raise exception on error
+                is_valid_repo_uri(repo_type, clone_uri, make_ui('db', clear_session=False))
             new_repo.clone_uri = clone_uri
             new_repo.landing_rev = landing_rev
 
@@ -677,11 +684,7 @@
         Makes repository on filesystem. Operation is group aware, meaning that it will create
         a repository within a group, and alter the paths accordingly to the group location.
 
-        :param repo_name:
-        :param alias:
-        :param parent:
-        :param clone_uri:
-        :param repo_store_location:
+        Note: clone_uri is low level and not validated - it might be a file system path used for validated cloning
         """
         from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
         from kallithea.model.scm import ScmModel
--- a/kallithea/tests/api/api_base.py	Mon May 07 11:38:13 2018 +0200
+++ b/kallithea/tests/api/api_base.py	Mon May 07 11:38:40 2018 +0200
@@ -278,8 +278,12 @@
         self._compare_error(id_, expected, given=response.body)
 
     def test_api_pull(self):
+        # Note: pulling from local repos is a mis-feature - it will bypass access control
+        # ... but ok, if the path already has been set in the database
         repo_name = 'test_pull'
         r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
+        # hack around that clone_uri can't be set to to a local path
+        # (as shown by test_api_create_repo_clone_uri_local)
         r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
         Session.add(r)
         Session.commit()
@@ -995,7 +999,8 @@
         fixture.destroy_repo(repo_name)
 
     def test_api_create_repo_clone_uri_local(self):
-        # FIXME: cloning from local repo is a mis-feature - it will bypass access control
+        # cloning from local repo was a mis-feature - it would bypass access control
+        # TODO: introduce other test coverage of actual remote cloning
         clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
         repo_name = u'api-repo'
         id_, params = _build_data(self.apikey, 'create_repo',
@@ -1005,12 +1010,8 @@
                                   clone_uri=clone_uri,
         )
         response = api_call(self, params)
-        expected = {
-            'msg': 'Created new repository `%s`' % repo_name,
-            'success': True,
-            'task': None,
-        }
-        self._compare_ok(id_, expected, given=response.body)
+        expected = "failed to create repository `%s`" % repo_name
+        self._compare_error(id_, expected, given=response.body)
         fixture.destroy_repo(repo_name)
 
     def test_api_create_repo_and_repo_group(self):
@@ -1157,8 +1158,8 @@
         ('description', {'description': 'new description'}),
         ('active', {'active': True}),
         ('active', {'active': False}),
-        ('clone_uri', {'clone_uri': 'http://example.com/repo'}),
-        ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control
+        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
+        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
         ('clone_uri', {'clone_uri': None}),
         ('landing_rev', {'landing_rev': 'branch:master'}),
         ('enable_statistics', {'enable_statistics': True}),
@@ -1181,11 +1182,15 @@
         if changing_attr == 'repo_group':
             repo_name = '/'.join([updates['group'], repo_name])
         try:
-            expected = {
-                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
-                'repository': repo.get_api_data()
-            }
-            self._compare_ok(id_, expected, given=response.body)
+            if changing_attr == 'clone_uri' and updates['clone_uri']:
+                expected = u'failed to update repo `%s`' % repo_name
+                self._compare_error(id_, expected, given=response.body)
+            else:
+                expected = {
+                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
+                    'repository': repo.get_api_data()
+                }
+                self._compare_ok(id_, expected, given=response.body)
         finally:
             fixture.destroy_repo(repo_name)
             if changing_attr == 'repo_group':
@@ -1196,8 +1201,8 @@
         ('description', {'description': u'new description'}),
         ('active', {'active': True}),
         ('active', {'active': False}),
-        ('clone_uri', {'clone_uri': 'http://example.com/repo'}),
-        ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control
+        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
+        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
         ('clone_uri', {'clone_uri': None}),
         ('landing_rev', {'landing_rev': 'branch:master'}),
         ('enable_statistics', {'enable_statistics': True}),
@@ -1222,11 +1227,15 @@
         if changing_attr == 'repo_group':
             repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
         try:
-            expected = {
-                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
-                'repository': repo.get_api_data()
-            }
-            self._compare_ok(id_, expected, given=response.body)
+            if changing_attr == 'clone_uri' and updates['clone_uri']:
+                expected = u'failed to update repo `%s`' % repo_name
+                self._compare_error(id_, expected, given=response.body)
+            else:
+                expected = {
+                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
+                    'repository': repo.get_api_data()
+                }
+                self._compare_ok(id_, expected, given=response.body)
         finally:
             fixture.destroy_repo(repo_name)
             if changing_attr == 'repo_group':