changeset 6512:b4d1e85265c1

auth: simplify repository group permission checks In practice, Kallithea has the 'group.admin' permission imply the 'group.write' permission, which again implies 'group.read'. This codifies this practice by replacing HasRepoGroupPermissionAny "perm function" with the new HasRepoGroupLevel function, reducing the risk of errors and saving quite a lot of typing.
author Søren Løvborg <sorenl@unity3d.com>
date Mon, 20 Feb 2017 19:31:48 +0100
parents a17c8e5f6712
children ca77c6da2d34
files kallithea/controllers/admin/repo_groups.py kallithea/controllers/admin/repos.py kallithea/controllers/api/api.py kallithea/controllers/forks.py kallithea/lib/auth.py kallithea/lib/helpers.py kallithea/model/scm.py kallithea/model/validators.py kallithea/templates/index_base.html
diffstat 9 files changed, 59 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/admin/repo_groups.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/controllers/admin/repo_groups.py	Mon Feb 20 19:31:48 2017 +0100
@@ -41,7 +41,7 @@
 from kallithea.lib import helpers as h
 from kallithea.lib.compat import json
 from kallithea.lib.auth import LoginRequired, \
-    HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAny, \
+    HasRepoGroupPermissionLevelDecorator, HasRepoGroupPermissionLevel, \
     HasPermissionAny
 from kallithea.lib.base import BaseController, render
 from kallithea.model.db import RepoGroup, Repository
@@ -68,7 +68,7 @@
         exclude is used for not moving group to itself TODO: also exclude descendants
         Note: only admin can create top level groups
         """
-        repo_groups = AvailableRepoGroupChoices([], ['group.admin'], extras)
+        repo_groups = AvailableRepoGroupChoices([], 'admin', extras)
         exclude_group_ids = set(rg.group_id for rg in exclude)
         c.repo_groups = [rg for rg in repo_groups
                          if rg[0] not in exclude_group_ids]
@@ -110,7 +110,7 @@
 
     def index(self, format='html'):
         _list = RepoGroup.query(sorted=True).all()
-        group_iter = RepoGroupList(_list, perm_set=['group.admin'])
+        group_iter = RepoGroupList(_list, perm_level='admin')
         repo_groups_data = []
         total_records = len(group_iter)
         _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
@@ -197,7 +197,7 @@
             group_id = safe_int(request.GET.get('parent_group'))
             group = RepoGroup.get(group_id) if group_id else None
             group_name = group.group_name if group else None
-            if HasRepoGroupPermissionAny('group.admin')(group_name, 'group create'):
+            if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
                 pass
             else:
                 raise HTTPForbidden()
@@ -205,7 +205,7 @@
         self.__load_defaults()
         return render('admin/repo_groups/repo_group_add.html')
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def update(self, group_name):
         c.repo_group = RepoGroup.guess_instance(group_name)
         self.__load_defaults(extras=[c.repo_group.parent_group],
@@ -251,7 +251,7 @@
 
         raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def delete(self, group_name):
         gr = c.repo_group = RepoGroup.guess_instance(group_name)
         repos = gr.repositories.all()
@@ -292,8 +292,7 @@
             return self.show(group_name)
         raise HTTPNotFound
 
-    @HasRepoGroupPermissionAnyDecorator('group.read', 'group.write',
-                                         'group.admin')
+    @HasRepoGroupPermissionLevelDecorator('read')
     def show(self, group_name):
         c.active = 'settings'
 
@@ -310,7 +309,7 @@
 
         return render('admin/repo_groups/repo_group_show.html')
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def edit(self, group_name):
         c.active = 'settings'
 
@@ -326,14 +325,14 @@
             force_defaults=False
         )
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def edit_repo_group_advanced(self, group_name):
         c.active = 'advanced'
         c.repo_group = RepoGroup.guess_instance(group_name)
 
         return render('admin/repo_groups/repo_group_edit.html')
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def edit_repo_group_perms(self, group_name):
         c.active = 'perms'
         c.repo_group = RepoGroup.guess_instance(group_name)
@@ -347,7 +346,7 @@
             force_defaults=False
         )
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def update_perms(self, group_name):
         """
         Update permissions for given repository group
@@ -378,7 +377,7 @@
         h.flash(_('Repository group permissions updated'), category='success')
         raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 
-    @HasRepoGroupPermissionAnyDecorator('group.admin')
+    @HasRepoGroupPermissionLevelDecorator('admin')
     def delete_perms(self, group_name):
         try:
             obj_type = request.POST.get('obj_type')
--- a/kallithea/controllers/admin/repos.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/controllers/admin/repos.py	Mon Feb 20 19:31:48 2017 +0100
@@ -76,12 +76,13 @@
 
     def __load_defaults(self, repo=None):
         top_perms = ['hg.create.repository']
-        repo_group_perms = ['group.admin']
         if HasPermissionAny('hg.create.write_on_repogroup.true')():
-            repo_group_perms.append('group.write')
+            repo_group_perm_level = 'write'
+        else:
+            repo_group_perm_level = 'admin'
         extras = [] if repo is None else [repo.group]
 
-        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perms, extras)
+        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras)
 
         c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo)
 
--- a/kallithea/controllers/api/api.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/controllers/api/api.py	Mon Feb 20 19:31:48 2017 +0100
@@ -36,7 +36,7 @@
 from kallithea.lib.auth import (
     PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
     HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel,
-    HasRepoGroupPermissionAny, HasUserGroupPermissionAny)
+    HasRepoGroupPermissionLevel, HasUserGroupPermissionAny)
 from kallithea.lib.utils import map_groups, repo2db_mapper
 from kallithea.lib.utils2 import (
     str2bool, time_to_datetime, safe_int, Optional, OAttr)
@@ -2111,8 +2111,7 @@
         repo_group = get_repo_group_or_error(repogroupid)
 
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this repo group !
-            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
+            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
                 raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 
         user = get_user_or_error(userid)
@@ -2175,8 +2174,7 @@
         repo_group = get_repo_group_or_error(repogroupid)
 
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this repo group !
-            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
+            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
                 raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 
         user = get_user_or_error(userid)
@@ -2243,10 +2241,7 @@
         perm = get_perm_or_error(perm, prefix='group.')
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this repo group !
-            _perms = ('group.admin',)
-            if not HasRepoGroupPermissionAny(*_perms)(
-                    group_name=repo_group.group_name):
+            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
                 raise JSONRPCError(
                     'repository group `%s` does not exist' % (repogroupid,))
 
@@ -2319,10 +2314,7 @@
         repo_group = get_repo_group_or_error(repogroupid)
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this repo group !
-            _perms = ('group.admin',)
-            if not HasRepoGroupPermissionAny(*_perms)(
-                    group_name=repo_group.group_name):
+            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
                 raise JSONRPCError(
                     'repository group `%s` does not exist' % (repogroupid,))
 
--- a/kallithea/controllers/forks.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/controllers/forks.py	Mon Feb 20 19:31:48 2017 +0100
@@ -56,10 +56,11 @@
         super(ForksController, self).__before__()
 
     def __load_defaults(self):
-        repo_group_perms = ['group.admin']
         if HasPermissionAny('hg.create.write_on_repogroup.true')():
-            repo_group_perms.append('group.write')
-        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perms)
+            repo_group_perm_level = 'write'
+        else:
+            repo_group_perm_level = 'admin'
+        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level)
 
         c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs()
 
--- a/kallithea/lib/auth.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/lib/auth.py	Mon Feb 20 19:31:48 2017 +0100
@@ -549,6 +549,18 @@
             self.username, level, repo_name, purpose, ok, actual_perm)
         return ok
 
+    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
+        required_perms = {
+            'read': ['group.read', 'group.write', 'group.admin'],
+            'write': ['group.write', 'group.admin'],
+            'admin': ['group.admin'],
+        }[level]
+        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
+        ok = actual_perm in required_perms
+        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
+            self.username, level, repo_group_name, purpose, ok, actual_perm)
+        return ok
+
     @property
     def api_keys(self):
         return self._get_api_keys()
@@ -859,17 +871,15 @@
         return user.has_repository_permission_level(repo_name, level)
 
 
-class HasRepoGroupPermissionAnyDecorator(_PermsDecorator):
+class HasRepoGroupPermissionLevelDecorator(_PermsDecorator):
     """
     Checks the user has any of given permissions for the requested repository group.
     """
 
     def check_permissions(self, user):
         repo_group_name = get_repo_group_slug(request)
-        try:
-            return user.permissions['repositories_groups'][repo_group_name] in self.required_perms
-        except KeyError:
-            return False
+        (level,) = self.required_perms
+        return user.has_repository_group_permission_level(repo_group_name, level)
 
 
 class HasUserGroupPermissionAnyDecorator(_PermsDecorator):
@@ -925,17 +935,11 @@
         return request.user.has_repository_permission_level(repo_name, level, purpose)
 
 
-class HasRepoGroupPermissionAny(_PermsFunction):
+class HasRepoGroupPermissionLevel(_PermsFunction):
 
     def __call__(self, group_name, purpose=None):
-        try:
-            ok = request.user.permissions['repositories_groups'][group_name] in self.required_perms
-        except KeyError:
-            ok = False
-
-        log.debug('Check %s for %s for repo group %s (%s): %s' %
-            (request.user.username, self.required_perms, group_name, purpose, ok))
-        return ok
+        (level,) = self.required_perms
+        return request.user.has_repository_group_permission_level(group_name, level, purpose)
 
 
 class HasUserGroupPermissionAny(_PermsFunction):
--- a/kallithea/lib/helpers.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/lib/helpers.py	Mon Feb 20 19:31:48 2017 +0100
@@ -778,7 +778,7 @@
 # PERMS
 #==============================================================================
 from kallithea.lib.auth import HasPermissionAny, \
-    HasRepoPermissionLevel, HasRepoGroupPermissionAny
+    HasRepoPermissionLevel, HasRepoGroupPermissionLevel
 
 
 #==============================================================================
--- a/kallithea/model/scm.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/model/scm.py	Mon Feb 20 19:31:48 2017 +0100
@@ -49,7 +49,7 @@
 from kallithea.lib import helpers as h
 from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
     _set_extras
-from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionAny, \
+from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \
     HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny
 from kallithea.lib.utils import get_filesystem_repos, make_ui, \
     action_logger
@@ -123,13 +123,10 @@
 
 class RepoGroupList(_PermCheckIterator):
 
-    def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None):
-        if not perm_set:
-            perm_set = ['group.read', 'group.write', 'group.admin']
-
+    def __init__(self, db_repo_group_list, perm_level, extra_kwargs=None):
         super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
-                    obj_attr='group_name', perm_set=perm_set,
-                    perm_checker=HasRepoGroupPermissionAny,
+                    obj_attr='group_name', perm_set=[perm_level],
+                    perm_checker=HasRepoGroupPermissionLevel,
                     extra_kwargs=extra_kwargs)
 
 
@@ -222,7 +219,7 @@
         if groups is None:
             groups = RepoGroup.query() \
                 .filter(RepoGroup.parent_group_id == None).all()
-        return RepoGroupList(groups)
+        return RepoGroupList(groups, perm_level='read')
 
     def mark_for_invalidation(self, repo_name):
         """
@@ -784,7 +781,7 @@
             else:
                 log.debug('skipping writing hook file')
 
-def AvailableRepoGroupChoices(top_perms, repo_group_perms, extras=()):
+def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
     """Return group_id,string tuples with choices for all the repo groups where
     the user has the necessary permissions.
 
@@ -794,7 +791,7 @@
     if HasPermissionAny('hg.admin')('available repo groups'):
         groups.append(None)
     else:
-        groups = list(RepoGroupList(groups, perm_set=repo_group_perms))
+        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
         if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
             groups.append(None)
         for extra in extras:
--- a/kallithea/model/validators.py	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/model/validators.py	Mon Feb 20 19:31:48 2017 +0100
@@ -35,7 +35,7 @@
 from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 from kallithea.lib.exceptions import LdapImportError
 from kallithea.config.routing import ADMIN_PREFIX
-from kallithea.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
+from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 
 # silence warnings and pylint
 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
@@ -502,9 +502,9 @@
 
             # create repositories with write permission on group is set to true
             create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
-            group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name,
+            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
                                             'can write into group validator')
-            group_write = HasRepoGroupPermissionAny('group.write')(gr_name,
+            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
                                             'can write into group validator')
             forbidden = not (group_admin or (group_write and create_on_write))
             can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
@@ -555,8 +555,7 @@
                 return
 
             forbidden_in_root = gr is None and not can_create_in_root
-            val = HasRepoGroupPermissionAny('group.admin')
-            forbidden = not val(gr_name, 'can create group validator')
+            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
             if forbidden_in_root or forbidden:
                 msg = self.message('permission_denied', state)
                 raise formencode.Invalid(msg, value, state,
--- a/kallithea/templates/index_base.html	Mon Feb 20 17:23:25 2017 +0100
+++ b/kallithea/templates/index_base.html	Mon Feb 20 19:31:48 2017 +0100
@@ -18,13 +18,13 @@
                     gr_name = c.group.group_name if c.group else None
                     # create repositories with write permission on group is set to true
                     create_on_write = h.HasPermissionAny('hg.create.write_on_repogroup.true')()
-                    group_admin = h.HasRepoGroupPermissionAny('group.admin')(gr_name, 'can write into group index page')
-                    group_write = h.HasRepoGroupPermissionAny('group.write')(gr_name, 'can write into group index page')
+                    group_admin = h.HasRepoGroupPermissionLevel('admin')(gr_name, 'can write into group index page')
+                    group_write = h.HasRepoGroupPermissionLevel('write')(gr_name, 'can write into group index page')
                 %>
                 %if h.HasPermissionAny('hg.admin','hg.create.repository')() or (group_admin or (group_write and create_on_write)):
                   %if c.group:
                         <a href="${h.url('new_repo',parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository')}</a>
-                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
+                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
                             <a href="${h.url('new_repos_group', parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository Group')}</a>
                         %endif
                   %else:
@@ -34,7 +34,7 @@
                     %endif
                   %endif
                 %endif
-                %if c.group and h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
+                %if c.group and h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
                     <a href="${h.url('edit_repo_group',group_name=c.group.group_name)}" title="${_('You have admin right to this group, and can edit it')}" class="btn btn-default btn-xs"><i class="icon-pencil"></i> ${_('Edit Repository Group')}</a>
                 %endif
                 </li>