changeset 6513:ca77c6da2d34

auth: simplify user group permission checks In practice, Kallithea has the 'usergroup.admin' permission imply the 'usergroup.write' permission, which again implies 'usergroup.read'. This codifies this practice by replacing the HasUserGroupPermissionAny "perm function" with the new HasUserGroupLevel function, reducing the risk of errors and saving quite a lot of typing.
author Søren Løvborg <sorenl@unity3d.com>
date Mon, 20 Feb 2017 18:40:32 +0100
parents b4d1e85265c1
children 25623c3c63aa
files kallithea/controllers/admin/user_groups.py kallithea/controllers/api/api.py kallithea/lib/auth.py kallithea/model/repo.py kallithea/model/repo_group.py kallithea/model/scm.py kallithea/model/user_group.py
diffstat 7 files changed, 61 insertions(+), 101 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/admin/user_groups.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/controllers/admin/user_groups.py	Mon Feb 20 18:40:32 2017 +0100
@@ -45,7 +45,7 @@
     RepoGroupAssignmentError
 from kallithea.lib.utils2 import safe_unicode, safe_int
 from kallithea.lib.auth import LoginRequired, \
-    HasUserGroupPermissionAnyDecorator, HasPermissionAnyDecorator
+    HasUserGroupPermissionLevelDecorator, HasPermissionAnyDecorator
 from kallithea.lib.base import BaseController, render
 from kallithea.model.scm import UserGroupList
 from kallithea.model.user_group import UserGroupModel
@@ -92,7 +92,7 @@
         _list = UserGroup.query() \
                         .order_by(func.lower(UserGroup.users_group_name)) \
                         .all()
-        group_iter = UserGroupList(_list, perm_set=['usergroup.admin'])
+        group_iter = UserGroupList(_list, perm_level='admin')
         user_groups_data = []
         total_records = len(group_iter)
         _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
@@ -165,7 +165,7 @@
     def new(self, format='html'):
         return render('admin/user_groups/user_group_add.html')
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def update(self, id):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'settings'
@@ -211,7 +211,7 @@
 
         raise HTTPFound(location=url('edit_users_group', id=id))
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def delete(self, id):
         usr_gr = UserGroup.get_or_404(id)
         try:
@@ -226,7 +226,7 @@
                     category='error')
         raise HTTPFound(location=url('users_groups'))
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def edit(self, id, format='html'):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'settings'
@@ -241,7 +241,7 @@
             force_defaults=False
         )
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def edit_perms(self, id):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'perms'
@@ -267,7 +267,7 @@
             force_defaults=False
         )
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def update_perms(self, id):
         """
         grant permission for given usergroup
@@ -291,7 +291,7 @@
         h.flash(_('User group permissions updated'), category='success')
         raise HTTPFound(location=url('edit_user_group_perms', id=id))
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def delete_perms(self, id):
         try:
             obj_type = request.POST.get('obj_type')
@@ -319,7 +319,7 @@
                     category='error')
             raise HTTPInternalServerError()
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def edit_default_perms(self, id):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'default_perms'
@@ -368,7 +368,7 @@
             force_defaults=False
         )
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def update_default_perms(self, id):
         user_group = UserGroup.get_or_404(id)
 
@@ -408,7 +408,7 @@
 
         raise HTTPFound(location=url('edit_user_group_default_perms', id=id))
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def edit_advanced(self, id):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'advanced'
@@ -417,7 +417,7 @@
         return render('admin/user_groups/user_group_edit.html')
 
 
-    @HasUserGroupPermissionAnyDecorator('usergroup.admin')
+    @HasUserGroupPermissionLevelDecorator('admin')
     def edit_members(self, id):
         c.user_group = UserGroup.get_or_404(id)
         c.active = 'members'
--- a/kallithea/controllers/api/api.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/controllers/api/api.py	Mon Feb 20 18:40:32 2017 +0100
@@ -36,7 +36,7 @@
 from kallithea.lib.auth import (
     PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
     HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel,
-    HasRepoGroupPermissionLevel, HasUserGroupPermissionAny)
+    HasRepoGroupPermissionLevel, HasUserGroupPermissionLevel)
 from kallithea.lib.utils import map_groups, repo2db_mapper
 from kallithea.lib.utils2 import (
     str2bool, time_to_datetime, safe_int, Optional, OAttr)
@@ -820,10 +820,7 @@
         """
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have at least read permission for this user group !
-            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         data = user_group.get_api_data()
@@ -845,9 +842,7 @@
         """
 
         result = []
-        _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-        for user_group in UserGroupList(UserGroup.query().all(),
-                                        perm_set=_perms):
+        for user_group in UserGroupList(UserGroup.query().all(), perm_level='read'):
             result.append(user_group.get_api_data())
         return result
 
@@ -949,10 +944,7 @@
         """
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this user group !
-            _perms = ('usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         if not isinstance(owner, Optional):
@@ -1006,10 +998,7 @@
         """
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this user group !
-            _perms = ('usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         try:
@@ -1065,10 +1054,7 @@
         user = get_user_or_error(userid)
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this user group !
-            _perms = ('usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         try:
@@ -1117,10 +1103,7 @@
         user = get_user_or_error(userid)
         user_group = get_user_group_or_error(usergroupid)
         if not HasPermissionAny('hg.admin')():
-            # check if we have admin permission for this user group !
-            _perms = ('usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         try:
@@ -1812,10 +1795,7 @@
             if not HasRepoPermissionLevel('admin')(repo.repo_name):
                 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
-            # check if we have at least read permission for this user group !
-            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         try:
@@ -1865,10 +1845,7 @@
             if not HasRepoPermissionLevel('admin')(repo.repo_name):
                 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
-            # check if we have at least read permission for this user group !
-            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
                 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
         try:
@@ -2245,10 +2222,7 @@
                 raise JSONRPCError(
                     'repository group `%s` does not exist' % (repogroupid,))
 
-            # check if we have at least read permission for this user group !
-            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
                 raise JSONRPCError(
                     'user group `%s` does not exist' % (usergroupid,))
 
@@ -2318,10 +2292,7 @@
                 raise JSONRPCError(
                     'repository group `%s` does not exist' % (repogroupid,))
 
-            # check if we have at least read permission for this user group !
-            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
-            if not HasUserGroupPermissionAny(*_perms)(
-                    user_group_name=user_group.users_group_name):
+            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
                 raise JSONRPCError(
                     'user group `%s` does not exist' % (usergroupid,))
 
--- a/kallithea/lib/auth.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/lib/auth.py	Mon Feb 20 18:40:32 2017 +0100
@@ -561,6 +561,18 @@
             self.username, level, repo_group_name, purpose, ok, actual_perm)
         return ok
 
+    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
+        required_perms = {
+            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
+            'write': ['usergroup.write', 'usergroup.admin'],
+            'admin': ['usergroup.admin'],
+        }[level]
+        actual_perm = self.permissions['user_groups'].get(user_group_name)
+        ok = actual_perm in required_perms
+        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
+            self.username, level, user_group_name, purpose, ok, actual_perm)
+        return ok
+
     @property
     def api_keys(self):
         return self._get_api_keys()
@@ -882,7 +894,7 @@
         return user.has_repository_group_permission_level(repo_group_name, level)
 
 
-class HasUserGroupPermissionAnyDecorator(_PermsDecorator):
+class HasUserGroupPermissionLevelDecorator(_PermsDecorator):
     """
     Checks for access permission for any of given predicates for specific
     user group. In order to fulfill the request any of predicates must be meet
@@ -890,10 +902,8 @@
 
     def check_permissions(self, user):
         user_group_name = get_user_group_slug(request)
-        try:
-            return user.permissions['user_groups'][user_group_name] in self.required_perms
-        except KeyError:
-            return False
+        (level,) = self.required_perms
+        return user.has_user_group_permission_level(user_group_name, level)
 
 
 #==============================================================================
@@ -942,17 +952,11 @@
         return request.user.has_repository_group_permission_level(group_name, level, purpose)
 
 
-class HasUserGroupPermissionAny(_PermsFunction):
+class HasUserGroupPermissionLevel(_PermsFunction):
 
     def __call__(self, user_group_name, purpose=None):
-        try:
-            ok = request.user.permissions['user_groups'][user_group_name] in self.required_perms
-        except KeyError:
-            ok = False
-
-        log.debug('Check %s %s for user group %s (%s): %s' %
-            (request.user.username, self.required_perms, user_group_name, purpose, ok))
-        return ok
+        (level,) = self.required_perms
+        return request.user.has_user_group_permission_level(user_group_name, level, purpose)
 
 
 #==============================================================================
--- a/kallithea/model/repo.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/model/repo.py	Mon Feb 20 18:40:32 2017 +0100
@@ -47,7 +47,7 @@
     Statistics, UserGroup, Ui, RepoGroup, RepositoryField
 
 from kallithea.lib import helpers as h
-from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionAny
+from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel
 from kallithea.lib.exceptions import AttachedForksError
 from kallithea.model.scm import UserGroupList
 
@@ -144,9 +144,7 @@
             .order_by(UserGroup.users_group_name) \
             .options(subqueryload(UserGroup.members)) \
             .all()
-        user_groups = UserGroupList(user_groups, perm_set=['usergroup.read',
-                                                           'usergroup.write',
-                                                           'usergroup.admin'])
+        user_groups = UserGroupList(user_groups, perm_level='read')
         return json.dumps([
             {
                 'id': gr.users_group_id,
@@ -468,11 +466,8 @@
                     repo=repo, user=member, perm=perm
                 )
             else:
-                #check if we have permissions to alter this usergroup
-                req_perms = (
-                    'usergroup.read', 'usergroup.write', 'usergroup.admin')
-                if not check_perms or HasUserGroupPermissionAny(*req_perms)(
-                        member):
+                #check if we have permissions to alter this usergroup's access
+                if not check_perms or HasUserGroupPermissionLevel('read')(member):
                     self.grant_user_group_permission(
                         repo=repo, group_name=member, perm=perm
                     )
@@ -483,11 +478,8 @@
                     repo=repo, user=member, perm=perm
                 )
             else:
-                #check if we have permissions to alter this usergroup
-                req_perms = (
-                    'usergroup.read', 'usergroup.write', 'usergroup.admin')
-                if not check_perms or HasUserGroupPermissionAny(*req_perms)(
-                        member):
+                #check if we have permissions to alter this usergroup's access
+                if not check_perms or HasUserGroupPermissionLevel('read')(member):
                     self.grant_user_group_permission(
                         repo=repo, group_name=member, perm=perm
                     )
--- a/kallithea/model/repo_group.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/model/repo_group.py	Mon Feb 20 18:40:32 2017 +0100
@@ -187,7 +187,7 @@
                             perms_updates=None, recursive=None,
                             check_perms=True):
         from kallithea.model.repo import RepoModel
-        from kallithea.lib.auth import HasUserGroupPermissionAny
+        from kallithea.lib.auth import HasUserGroupPermissionLevel
 
         if not perms_new:
             perms_new = []
@@ -255,18 +255,16 @@
                     _set_perm_user(obj, user=member, perm=perm)
                 ## set for user group
                 else:
-                    #check if we have permissions to alter this usergroup
-                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
-                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
+                    #check if we have permissions to alter this usergroup's access
+                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
                         _set_perm_group(obj, users_group=member, perm=perm)
             # set new permissions
             for member, perm, member_type in perms_new:
                 if member_type == 'user':
                     _set_perm_user(obj, user=member, perm=perm)
                 else:
-                    #check if we have permissions to alter this usergroup
-                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
-                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
+                    #check if we have permissions to alter this usergroup's access
+                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
                         _set_perm_group(obj, users_group=member, perm=perm)
             updates.append(obj)
             # if it's not recursive call for all,repos,groups
--- a/kallithea/model/scm.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/model/scm.py	Mon Feb 20 18:40:32 2017 +0100
@@ -50,7 +50,7 @@
 from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
     _set_extras
 from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \
-    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny
+    HasUserGroupPermissionLevel, HasPermissionAny, HasPermissionAny
 from kallithea.lib.utils import get_filesystem_repos, make_ui, \
     action_logger
 from kallithea.model.base import BaseModel
@@ -132,13 +132,10 @@
 
 class UserGroupList(_PermCheckIterator):
 
-    def __init__(self, db_user_group_list, perm_set=None, extra_kwargs=None):
-        if not perm_set:
-            perm_set = ['usergroup.read', 'usergroup.write', 'usergroup.admin']
-
+    def __init__(self, db_user_group_list, perm_level, extra_kwargs=None):
         super(UserGroupList, self).__init__(obj_list=db_user_group_list,
-                    obj_attr='users_group_name', perm_set=perm_set,
-                    perm_checker=HasUserGroupPermissionAny,
+                    obj_attr='users_group_name', perm_set=[perm_level],
+                    perm_checker=HasUserGroupPermissionLevel,
                     extra_kwargs=extra_kwargs)
 
 
--- a/kallithea/model/user_group.py	Mon Feb 20 19:31:48 2017 +0100
+++ b/kallithea/model/user_group.py	Mon Feb 20 18:40:32 2017 +0100
@@ -57,7 +57,7 @@
 
     def _update_permissions(self, user_group, perms_new=None,
                             perms_updates=None):
-        from kallithea.lib.auth import HasUserGroupPermissionAny
+        from kallithea.lib.auth import HasUserGroupPermissionLevel
         if not perms_new:
             perms_new = []
         if not perms_updates:
@@ -71,9 +71,8 @@
                     user_group=user_group, user=member, perm=perm
                 )
             else:
-                #check if we have permissions to alter this usergroup
-                if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
-                                             'usergroup.admin')(member):
+                #check if we have permissions to alter this usergroup's access
+                if HasUserGroupPermissionLevel('read')(member):
                     self.grant_user_group_permission(
                         target_user_group=user_group, user_group=member, perm=perm
                     )
@@ -84,9 +83,8 @@
                     user_group=user_group, user=member, perm=perm
                 )
             else:
-                #check if we have permissions to alter this usergroup
-                if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
-                                             'usergroup.admin')(member):
+                #check if we have permissions to alter this usergroup's access
+                if HasUserGroupPermissionLevel('read')(member):
                     self.grant_user_group_permission(
                         target_user_group=user_group, user_group=member, perm=perm
                     )