changeset 8513:7643d8ecbb20 stable

api: fix repo group permission check for repo creation hg.create.repository is only controlling whether all users have write access at top level. For other repo locations, check for write permission to the repo group. Note: This also covers "repo creation" by forking or by moving another repo.
author Mads Kiilerich <mads@kiilerich.com>
date Sat, 02 Jan 2021 23:41:37 +0100
parents 9a28233045b9
children 7f3515800bd8
files kallithea/controllers/api/api.py kallithea/templates/admin/permissions/permissions_globals.html kallithea/tests/api/api_base.py
diffstat 3 files changed, 39 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/api/api.py	Sat Jan 02 20:21:02 2021 +0100
+++ b/kallithea/controllers/api/api.py	Sat Jan 02 23:41:37 2021 +0100
@@ -1167,7 +1167,7 @@
                 'failed to get repo: `%s` nodes' % repo.repo_name
             )
 
-    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
+    # permission check inside
     def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')),
                     repo_type=Optional('hg'), description=Optional(''),
                     private=Optional(False), clone_uri=Optional(None),
@@ -1224,6 +1224,19 @@
           }
 
         """
+        group_name = None
+        repo_name_parts = repo_name.split('/')
+        if len(repo_name_parts) > 1:
+            group_name = '/'.join(repo_name_parts[:-1])
+            repo_group = RepoGroup.get_by_group_name(group_name)
+            if repo_group is None:
+                raise JSONRPCError("repo group `%s` not found" % group_name)
+            if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)):
+                raise JSONRPCError("no permission to create repo in %s" % group_name)
+        else:
+            if not HasPermissionAny('hg.admin', 'hg.create.repository')():
+                raise JSONRPCError("no permission to create top level repo")
+
         if not HasPermissionAny('hg.admin')():
             if not isinstance(owner, Optional):
                 # forbid setting owner for non-admins
@@ -1254,13 +1267,6 @@
         copy_permissions = Optional.extract(copy_permissions)
 
         try:
-            repo_name_parts = repo_name.split('/')
-            group_name = None
-            if len(repo_name_parts) > 1:
-                group_name = '/'.join(repo_name_parts[:-1])
-                repo_group = RepoGroup.get_by_group_name(group_name)
-                if repo_group is None:
-                    raise JSONRPCError("repo group `%s` not found" % group_name)
             data = dict(
                 repo_name=repo_name_parts[-1],
                 repo_name_full=repo_name,
@@ -1334,6 +1340,9 @@
         repo_group = group
         if not isinstance(repo_group, Optional):
             repo_group = get_repo_group_or_error(repo_group)
+            if repo_group.group_id != repo.group_id:
+                if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(repo_group.group_name)):
+                    raise JSONRPCError("no permission to create (or move) repo in %s" % repo_group.group_name)
             repo_group = repo_group.group_id
         try:
             store_update(updates, name, 'repo_name')
@@ -1356,6 +1365,7 @@
             log.error(traceback.format_exc())
             raise JSONRPCError('failed to update repo `%s`' % repoid)
 
+    # permission check inside
     @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
     def fork_repo(self, repoid, fork_name,
                   owner=Optional(OAttr('apiuser')),
@@ -1410,6 +1420,19 @@
             type_ = 'fork' if _repo.fork else 'repo'
             raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 
+        group_name = None
+        fork_name_parts = fork_name.split('/')
+        if len(fork_name_parts) > 1:
+            group_name = '/'.join(fork_name_parts[:-1])
+            repo_group = RepoGroup.get_by_group_name(group_name)
+            if repo_group is None:
+                raise JSONRPCError("repo group `%s` not found" % group_name)
+            if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)):
+                raise JSONRPCError("no permission to create repo in %s" % group_name)
+        else:
+            if not HasPermissionAny('hg.admin', 'hg.create.repository')():
+                raise JSONRPCError("no permission to create top level repo")
+
         if HasPermissionAny('hg.admin')():
             pass
         elif HasRepoPermissionLevel('read')(repo.repo_name):
@@ -1418,9 +1441,6 @@
                 raise JSONRPCError(
                     'Only Kallithea admin can specify `owner` param'
                 )
-
-            if not HasPermissionAny('hg.create.repository')():
-                raise JSONRPCError('no permission to create repositories')
         else:
             raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
@@ -1430,14 +1450,6 @@
         owner = get_user_or_error(owner)
 
         try:
-            fork_name_parts = fork_name.split('/')
-            group_name = None
-            if len(fork_name_parts) > 1:
-                group_name = '/'.join(fork_name_parts[:-1])
-                repo_group = RepoGroup.get_by_group_name(group_name)
-                if repo_group is None:
-                    raise JSONRPCError("repo group `%s` not found" % group_name)
-
             form_data = dict(
                 repo_name=fork_name_parts[-1],
                 repo_name_full=fork_name,
--- a/kallithea/templates/admin/permissions/permissions_globals.html	Sat Jan 02 20:21:02 2021 +0100
+++ b/kallithea/templates/admin/permissions/permissions_globals.html	Sat Jan 02 23:41:37 2021 +0100
@@ -54,7 +54,6 @@
                 <div>
                     ${h.select('default_repo_create','',c.repo_create_choices,class_='form-control')}
                     <span class="help-block">${_('Enable this to allow non-admins to create repositories at the top level.')}</span>
-                    <span class="help-block">${_('Note: This will also give all users API access to create repositories everywhere. That might change in future versions.')}</span>
                 </div>
             </div>
             <div class="form-group">
--- a/kallithea/tests/api/api_base.py	Sat Jan 02 20:21:02 2021 +0100
+++ b/kallithea/tests/api/api_base.py	Sat Jan 02 23:41:37 2021 +0100
@@ -916,20 +916,11 @@
         )
         response = api_call(self, params)
 
-        # Current result when API access control is different from Web:
-        ret = {
-            'msg': 'Created new repository `%s`' % repo_name,
-            'success': True,
-            'task': None,
-        }
-        expected = ret
-        self._compare_ok(id_, expected, given=response.body)
+        # API access control match Web access control:
+        expected = 'no permission to create repo in test_repo_group/api-repo-repo'
+        self._compare_error(id_, expected, given=response.body)
+
         fixture.destroy_repo(repo_name)
-
-        # Expected and arguably more correct result:
-        #expected = 'failed to create repository `%s`' % repo_name
-        #self._compare_error(id_, expected, given=response.body)
-
         fixture.destroy_repo_group(repo_group_name)
 
     def test_api_create_repo_unknown_owner(self):
@@ -1298,6 +1289,9 @@
         '%s/api-repo-fork' % TEST_REPO_GROUP,
     ])
     def test_api_fork_repo_non_admin(self, fork_name):
+        RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
+                                               self.TEST_USER_LOGIN,
+                                               'group.write')
         id_, params = _build_data(self.apikey_regular, 'fork_repo',
                                   repoid=self.REPO,
                                   fork_name=fork_name,
@@ -1364,7 +1358,7 @@
                                   fork_name=fork_name,
         )
         response = api_call(self, params)
-        expected = 'no permission to create repositories'
+        expected = 'no permission to create top level repo'
         self._compare_error(id_, expected, given=response.body)
         fixture.destroy_repo(fork_name)