Mercurial > kallithea
changeset 2820:c0cc8f8a71b0 beta
Permissions on group can be set in recursive mode setting defined permission to all children
- more explicit permissions
- fixes for empty values in permission form
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Fri, 07 Sep 2012 02:20:02 +0200 |
parents | bbaf0b86a1fe |
children | 9c90be87ae05 |
files | rhodecode/controllers/admin/repos_groups.py rhodecode/model/forms.py rhodecode/model/repo.py rhodecode/model/repos_group.py rhodecode/model/user.py rhodecode/model/validators.py rhodecode/templates/admin/repos_groups/repos_group_edit_perms.html rhodecode/tests/functional/test_compare.py rhodecode/tests/models/common.py rhodecode/tests/models/test_permissions.py rhodecode/tests/models/test_repos_groups.py rhodecode/tests/models/test_user_permissions_on_groups.py rhodecode/tests/models/test_users_group_permissions_on_groups.py |
diffstat | 13 files changed, 618 insertions(+), 66 deletions(-) [+] |
line wrap: on
line diff
--- a/rhodecode/controllers/admin/repos_groups.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/controllers/admin/repos_groups.py Fri Sep 07 02:20:02 2012 +0200 @@ -45,6 +45,7 @@ from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from webob.exc import HTTPInternalServerError, HTTPNotFound +from rhodecode.lib.utils2 import str2bool log = logging.getLogger(__name__) @@ -162,7 +163,7 @@ Session().commit() h.flash(_('updated repos group %s') \ % form_result['group_name'], category='success') - #TODO: in futureaction_logger(, '', '', '', self.sa) + #TODO: in future action_logger(, '', '', '', self.sa) except formencode.Invalid, errors: return htmlfill.render( @@ -227,10 +228,11 @@ :param group_name: """ - try: - ReposGroupModel().revoke_user_permission( - repos_group=group_name, user=request.POST['user_id'] + recursive = str2bool(request.POST.get('recursive', False)) + ReposGroupModel().delete_permission( + repos_group=group_name, obj=request.POST['user_id'], + obj_type='user', recursive=recursive ) Session().commit() except Exception: @@ -248,9 +250,10 @@ """ try: - ReposGroupModel().revoke_users_group_permission( - repos_group=group_name, - group_name=request.POST['users_group_id'] + recursive = str2bool(request.POST.get('recursive', False)) + ReposGroupModel().delete_permission( + repos_group=group_name, obj=request.POST['users_group_id'], + obj_type='users_group', recursive=recursive ) Session().commit() except Exception:
--- a/rhodecode/model/forms.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/model/forms.py Fri Sep 07 02:20:02 2012 +0200 @@ -128,6 +128,7 @@ testValueList=True, if_missing=None, not_empty=False) enable_locking = v.StringBoolean(if_missing=False) + recursive = v.StringBoolean(if_missing=False) chained_validators = [v.ValidReposGroup(edit, old_data), v.ValidPerms('group')]
--- a/rhodecode/model/repo.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/model/repo.py Fri Sep 07 02:20:02 2012 +0200 @@ -368,6 +368,7 @@ obj.user = user obj.permission = permission self.sa.add(obj) + log.debug('Granted perm %s to %s on %s' % (perm, user, repo)) def revoke_user_permission(self, repo, user): """ @@ -383,8 +384,10 @@ obj = self.sa.query(UserRepoToPerm)\ .filter(UserRepoToPerm.repository == repo)\ .filter(UserRepoToPerm.user == user)\ - .one() - self.sa.delete(obj) + .scalar() + if obj: + self.sa.delete(obj) + log.debug('Revoked perm on %s on %s' % (repo, user)) def grant_users_group_permission(self, repo, group_name, perm): """ @@ -414,6 +417,7 @@ obj.users_group = group_name obj.permission = permission self.sa.add(obj) + log.debug('Granted perm %s to %s on %s' % (perm, group_name, repo)) def revoke_users_group_permission(self, repo, group_name): """ @@ -429,8 +433,10 @@ obj = self.sa.query(UsersGroupRepoToPerm)\ .filter(UsersGroupRepoToPerm.repository == repo)\ .filter(UsersGroupRepoToPerm.users_group == group_name)\ - .one() - self.sa.delete(obj) + .scalar() + if obj: + self.sa.delete(obj) + log.debug('Revoked perm to %s on %s' % (repo, group_name)) def delete_stats(self, repo_name): """
--- a/rhodecode/model/repos_group.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/model/repos_group.py Fri Sep 07 02:20:02 2012 +0200 @@ -32,7 +32,7 @@ from rhodecode.model import BaseModel from rhodecode.model.db import RepoGroup, RhodeCodeUi, UserRepoGroupToPerm, \ - User, Permission, UsersGroupRepoGroupToPerm, UsersGroup + User, Permission, UsersGroupRepoGroupToPerm, UsersGroup, Repository log = logging.getLogger(__name__) @@ -115,11 +115,12 @@ 'existing dir %s' % new_path) shutil.move(old_path, new_path) - def __delete_group(self, group): + def __delete_group(self, group, force_delete=False): """ Deletes a group from a filesystem :param group: instance of group from database + :param force_delete: use shutil rmtree to remove all objects """ paths = group.full_path.split(RepoGroup.url_sep()) paths = os.sep.join(paths) @@ -127,7 +128,10 @@ rm_path = os.path.join(self.repos_path, paths) if os.path.isdir(rm_path): # delete only if that path really exists - os.rmdir(rm_path) + if force_delete: + shutil.rmtree(rm_path) + else: + os.rmdir(rm_path) # this raises an exception when there are still objects inside def create(self, group_name, group_description, parent=None, just_db=False): try: @@ -150,32 +154,79 @@ log.error(traceback.format_exc()) raise + def _update_permissions(self, repos_group, perms_new=None, + perms_updates=None, recursive=False): + from rhodecode.model.repo import RepoModel + if not perms_new: + perms_new = [] + if not perms_updates: + perms_updates = [] + + def _set_perm_user(obj, user, perm): + if isinstance(obj, RepoGroup): + ReposGroupModel().grant_user_permission( + repos_group=obj, user=user, perm=perm + ) + elif isinstance(obj, Repository): + # we set group permission but we have to switch to repo + # permission + perm = perm.replace('group.', 'repository.') + RepoModel().grant_user_permission( + repo=obj, user=user, perm=perm + ) + + def _set_perm_group(obj, users_group, perm): + if isinstance(obj, RepoGroup): + ReposGroupModel().grant_users_group_permission( + repos_group=obj, group_name=users_group, perm=perm + ) + elif isinstance(obj, Repository): + # we set group permission but we have to switch to repo + # permission + perm = perm.replace('group.', 'repository.') + RepoModel().grant_users_group_permission( + repo=obj, group_name=users_group, perm=perm + ) + updates = [] + log.debug('Now updating permissions for %s in recursive mode:%s' + % (repos_group, recursive)) + + for obj in repos_group.recursive_groups_and_repos(): + if not recursive: + obj = repos_group + + # update permissions + for member, perm, member_type in perms_updates: + ## set for user + if member_type == 'user': + # this updates also current one if found + _set_perm_user(obj, user=member, perm=perm) + ## set for users group + else: + _set_perm_group(obj, users_group=member, perm=perm) + # set new permissions + for member, perm, member_type in perms_new: + if member_type == 'user': + _set_perm_user(obj, user=member, perm=perm) + else: + _set_perm_group(obj, users_group=member, perm=perm) + updates.append(obj) + #if it's not recursive call + # break the loop and don't proceed with other changes + if not recursive: + break + return updates + def update(self, repos_group_id, form_data): try: repos_group = RepoGroup.get(repos_group_id) - - # update permissions - for member, perm, member_type in form_data['perms_updates']: - if member_type == 'user': - # this updates also current one if found - ReposGroupModel().grant_user_permission( - repos_group=repos_group, user=member, perm=perm - ) - else: - ReposGroupModel().grant_users_group_permission( - repos_group=repos_group, group_name=member, perm=perm - ) - # set new permissions - for member, perm, member_type in form_data['perms_new']: - if member_type == 'user': - ReposGroupModel().grant_user_permission( - repos_group=repos_group, user=member, perm=perm - ) - else: - ReposGroupModel().grant_users_group_permission( - repos_group=repos_group, group_name=member, perm=perm - ) + recursive = form_data['recursive'] + # iterate over all members(if in recursive mode) of this groups and + # set the permissions ! + # this can be potentially heavy operation + self._update_permissions(repos_group, form_data['perms_new'], + form_data['perms_updates'], recursive) old_path = repos_group.full_path @@ -191,7 +242,6 @@ # iterate over all members of this groups and set the locking ! # this can be potentially heavy operation - for obj in repos_group.recursive_groups_and_repos(): #set the value from it's parent obj.enable_locking = repos_group.enable_locking @@ -210,15 +260,54 @@ log.error(traceback.format_exc()) raise - def delete(self, repos_group): + def delete(self, repos_group, force_delete=False): repos_group = self._get_repos_group(repos_group) try: self.sa.delete(repos_group) - self.__delete_group(repos_group) + self.__delete_group(repos_group, force_delete) except: log.exception('Error removing repos_group %s' % repos_group) raise + def delete_permission(self, repos_group, obj, obj_type, recursive): + """ + Revokes permission for repos_group for given obj(user or users_group), + obj_type can be user or users group + + :param repos_group: + :param obj: user or users group id + :param obj_type: user or users group type + :param recursive: recurse to all children of group + """ + from rhodecode.model.repo import RepoModel + repos_group = self._get_repos_group(repos_group) + + for el in repos_group.recursive_groups_and_repos(): + if not recursive: + # if we don't recurse set the permission on only the top level + # object + el = repos_group + + if isinstance(el, RepoGroup): + if obj_type == 'user': + ReposGroupModel().revoke_user_permission(el, user=obj) + elif obj_type == 'users_group': + ReposGroupModel().revoke_users_group_permission(el, group_name=obj) + else: + raise Exception('undefined object type %s' % obj_type) + elif isinstance(el, Repository): + if obj_type == 'user': + RepoModel().revoke_user_permission(el, user=obj) + elif obj_type == 'users_group': + RepoModel().revoke_users_group_permission(el, group_name=obj) + else: + raise Exception('undefined object type %s' % obj_type) + + #if it's not recursive call + # break the loop and don't proceed with other changes + if not recursive: + break + def grant_user_permission(self, repos_group, user, perm): """ Grant permission for user on given repositories group, or update @@ -246,6 +335,7 @@ obj.user = user obj.permission = permission self.sa.add(obj) + log.debug('Granted perm %s to %s on %s' % (perm, user, repos_group)) def revoke_user_permission(self, repos_group, user): """ @@ -262,8 +352,10 @@ obj = self.sa.query(UserRepoGroupToPerm)\ .filter(UserRepoGroupToPerm.user == user)\ .filter(UserRepoGroupToPerm.group == repos_group)\ - .one() - self.sa.delete(obj) + .scalar() + if obj: + self.sa.delete(obj) + log.debug('Revoked perm on %s on %s' % (repos_group, user)) def grant_users_group_permission(self, repos_group, group_name, perm): """ @@ -294,6 +386,7 @@ obj.users_group = group_name obj.permission = permission self.sa.add(obj) + log.debug('Granted perm %s to %s on %s' % (perm, group_name, repos_group)) def revoke_users_group_permission(self, repos_group, group_name): """ @@ -310,5 +403,7 @@ obj = self.sa.query(UsersGroupRepoGroupToPerm)\ .filter(UsersGroupRepoGroupToPerm.group == repos_group)\ .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\ - .one() - self.sa.delete(obj) + .scalar() + if obj: + self.sa.delete(obj) + log.debug('Revoked perm to %s on %s' % (repos_group, group_name))
--- a/rhodecode/model/user.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/model/user.py Fri Sep 07 02:20:02 2012 +0200 @@ -564,7 +564,7 @@ rg_k = perm.UserRepoGroupToPerm.group.group_name p = perm.Permission.permission_name cur_perm = user.permissions[GK][rg_k] - if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]: + if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm] or 1: # disable check user.permissions[GK][rg_k] = p # REPO GROUP + USER GROUP @@ -588,7 +588,7 @@ cur_perm = user.permissions[GK][g_k] # overwrite permission only if it's greater than permission # given from other sources - if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]: + if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm] or 1: # disable check user.permissions[GK][g_k] = p return user
--- a/rhodecode/model/validators.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/model/validators.py Fri Sep 07 02:20:02 2012 +0200 @@ -499,9 +499,9 @@ # fill new permissions in order of how they were added for k in sorted(map(int, new_perms_group.keys())): perm_dict = new_perms_group[str(k)] - new_member = perm_dict['name'] - new_perm = perm_dict['perm'] - new_type = perm_dict['type'] + new_member = perm_dict.get('name') + new_perm = perm_dict.get('perm') + new_type = perm_dict.get('type') if new_member and new_perm and new_type: perms_new.add((new_member, new_perm, new_type))
--- a/rhodecode/templates/admin/repos_groups/repos_group_edit_perms.html Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/templates/admin/repos_groups/repos_group_edit_perms.html Fri Sep 07 02:20:02 2012 +0200 @@ -68,6 +68,12 @@ </span> </td> </tr> + <tr> + <td colspan="6"> + ${h.checkbox('recursive',value="True", label=_('apply to parents'))} + <span class="help-block">${_('Set or revoke permission to all children of that group, including repositories and other groups')}</span> + </td> + </tr> </table> <script type="text/javascript"> function ajaxActionUser(user_id, field_id) { @@ -81,7 +87,8 @@ alert("${_('Failed to remove user')}"); }, }; - var postData = '_method=delete&user_id=' + user_id; + var recursive = YUD.get('recursive').checked; + var postData = '_method=delete&recursive={0}&user_id={1}'.format(recursive,user_id); var request = YAHOO.util.Connect.asyncRequest('POST', sUrl, callback, postData); }; @@ -96,7 +103,8 @@ alert("${_('Failed to remove users group')}"); }, }; - var postData = '_method=delete&users_group_id='+users_group_id; + var recursive = YUD.get('recursive').checked; + var postData = '_method=delete&recursive={0}&users_group_id={1}'.format(recursive,users_group_id); var request = YAHOO.util.Connect.asyncRequest('POST', sUrl, callback, postData); };
--- a/rhodecode/tests/functional/test_compare.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/tests/functional/test_compare.py Fri Sep 07 02:20:02 2012 +0200 @@ -291,4 +291,3 @@ finally: RepoModel().delete(r2_id) RepoModel().delete(r1_id) -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/models/common.py Fri Sep 07 02:20:02 2012 +0200 @@ -0,0 +1,116 @@ +import os +import unittest +import functools +from rhodecode.tests import * + + +from rhodecode.model.repos_group import ReposGroupModel +from rhodecode.model.repo import RepoModel +from rhodecode.model.db import RepoGroup, Repository, User +from rhodecode.model.user import UserModel + +from rhodecode.lib.auth import AuthUser +from rhodecode.model.meta import Session + + +def _make_group(path, desc='desc', parent_id=None, + skip_if_exists=False): + + gr = RepoGroup.get_by_group_name(path) + if gr and skip_if_exists: + return gr + if isinstance(parent_id, RepoGroup): + parent_id = parent_id.group_id + gr = ReposGroupModel().create(path, desc, parent_id) + return gr + + +def _make_repo(name, repos_group=None, repo_type='hg'): + return RepoModel().create_repo(name, repo_type, 'desc', + TEST_USER_ADMIN_LOGIN, + repos_group=repos_group) + + +def _destroy_project_tree(test_u1_id): + Session.remove() + repos_group = RepoGroup.get_by_group_name(group_name='g0') + for el in reversed(repos_group.recursive_groups_and_repos()): + if isinstance(el, Repository): + RepoModel().delete(el) + elif isinstance(el, RepoGroup): + ReposGroupModel().delete(el, force_delete=True) + + u = User.get(test_u1_id) + Session().delete(u) + Session().commit() + + +def _create_project_tree(): + """ + Creates a tree of groups and repositories to test permissions + + structure + [g0] - group `g0` with 3 subgroups + | + |__[g0_1] group g0_1 with 2 groups 0 repos + | | + | |__[g0_1_1] group g0_1_1 with 1 group 2 repos + | | |__<g0/g0_1/g0_1_1/g0_1_1_r1> + | | |__<g0/g0_1/g0_1_1/g0_1_1_r2> + | |__<g0/g0_1/g0_1_r1> + | + |__[g0_2] 2 repos + | | + | |__<g0/g0_2/g0_2_r1> + | |__<g0/g0_2/g0_2_r2> + | + |__[g0_3] 1 repo + | + |_<g0/g0_3/g0_3_r1> + + """ + test_u1 = UserModel().create_or_update( + username=u'test_u1', password=u'qweqwe', + email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1' + ) + g0 = _make_group('g0') + g0_1 = _make_group('g0_1', parent_id=g0) + g0_1_1 = _make_group('g0_1_1', parent_id=g0_1) + g0_1_1_r1 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1) + g0_1_1_r2 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1) + g0_1_r1 = _make_repo('g0/g0_1/g0_1_r1', repos_group=g0_1) + g0_2 = _make_group('g0_2', parent_id=g0) + g0_2_r1 = _make_repo('g0/g0_2/g0_2_r1', repos_group=g0_2) + g0_2_r2 = _make_repo('g0/g0_2/g0_2_r2', repos_group=g0_2) + g0_3 = _make_group('g0_3', parent_id=g0) + g0_3_r1 = _make_repo('g0/g0_3/g0_3_r1', repos_group=g0_3) + return test_u1 + + +def expected_count(group_name, objects=False): + repos_group = RepoGroup.get_by_group_name(group_name=group_name) + objs = repos_group.recursive_groups_and_repos() + if objects: + return objs + return len(objs) + + +def _check_expected_count(items, repo_items, expected): + should_be = len(items + repo_items) + there_are = len(expected) + assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected)) + + +def check_tree_perms(obj_name, repo_perm, prefix, expected_perm): + assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`' + % (obj_name, repo_perm, expected_perm)) + + +def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None): + test_u1 = AuthUser(user_id=test_u1_id) + for k, v in test_u1.permissions[key].items(): + if recursive and k.startswith(filter_): + yield k, v + elif not recursive: + if k == filter_: + yield k, v
--- a/rhodecode/tests/models/test_permissions.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/tests/models/test_permissions.py Fri Sep 07 02:20:02 2012 +0200 @@ -1,7 +1,7 @@ import os import unittest from rhodecode.tests import * - +from rhodecode.tests.models.common import _make_group from rhodecode.model.repos_group import ReposGroupModel from rhodecode.model.repo import RepoModel from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm @@ -12,16 +12,6 @@ from rhodecode.lib.auth import AuthUser -def _make_group(path, desc='desc', parent_id=None, - skip_if_exists=False): - - gr = RepoGroup.get_by_group_name(path) - if gr and skip_if_exists: - return gr - - gr = ReposGroupModel().create(path, desc, parent_id) - return gr - class TestPermissions(unittest.TestCase): def __init__(self, methodName='runTest'):
--- a/rhodecode/tests/models/test_repos_groups.py Thu Sep 06 17:59:45 2012 +0200 +++ b/rhodecode/tests/models/test_repos_groups.py Fri Sep 07 02:20:02 2012 +0200 @@ -4,7 +4,7 @@ from rhodecode.model.repos_group import ReposGroupModel from rhodecode.model.repo import RepoModel -from rhodecode.model.db import RepoGroup, User +from rhodecode.model.db import RepoGroup, User, Repository from rhodecode.model.meta import Session from sqlalchemy.exc import IntegrityError @@ -15,7 +15,8 @@ gr = RepoGroup.get_by_group_name(path) if gr and skip_if_exists: return gr - + if isinstance(parent_id, RepoGroup): + parent_id = parent_id.group_id gr = ReposGroupModel().create(path, desc, parent_id) return gr @@ -54,7 +55,8 @@ group_parent_id=parent_id, perms_updates=[], perms_new=[], - enable_locking=False + enable_locking=False, + recursive=False ) gr = ReposGroupModel().update(id_, form_data) return gr @@ -132,7 +134,8 @@ repo_type='hg', clone_uri=None, landing_rev='tip', - enable_locking=False) + enable_locking=False, + recursive=False) cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) r = RepoModel().create(form_data, cur_user)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/models/test_user_permissions_on_groups.py Fri Sep 07 02:20:02 2012 +0200 @@ -0,0 +1,161 @@ +import os +import unittest +import functools +from rhodecode.tests import * + +from rhodecode.model.repos_group import ReposGroupModel +from rhodecode.model.db import RepoGroup, Repository, User + +from rhodecode.model.meta import Session +from nose.tools import with_setup +from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \ + _get_perms, _check_expected_count, expected_count, _destroy_project_tree +from rhodecode.model.repo import RepoModel + + +test_u1_id = None +_get_repo_perms = None +_get_group_perms = None + + +def permissions_setup_func(group_name='g0', perm='group.read', recursive=True): + """ + Resets all permissions to perm attribute + """ + repos_group = RepoGroup.get_by_group_name(group_name=group_name) + if not repos_group: + raise Exception('Cannot get group %s' % group_name) + perms_updates = [[test_u1_id, perm, 'user']] + ReposGroupModel()._update_permissions(repos_group, + perms_updates=perms_updates, + recursive=recursive) + Session().commit() + + +def setup_module(): + global test_u1_id, _get_repo_perms, _get_group_perms + test_u1 = _create_project_tree() + Session().commit() + test_u1_id = test_u1.user_id + _get_repo_perms = functools.partial(_get_perms, key='repositories', + test_u1_id=test_u1_id) + _get_group_perms = functools.partial(_get_perms, key='repositories_groups', + test_u1_id=test_u1_id) + + +def teardown_module(): + _destroy_project_tree(test_u1_id) + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_without_recursive_mode(): + # set permission to g0 non-recursive mode + recursive = False + group = 'g0' + permissions_setup_func(group, 'group.write', recursive=recursive) + + items = [x for x in _get_repo_perms(group, recursive)] + expected = 0 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'repository.read' + + items = [x for x in _get_group_perms(group, recursive)] + expected = 1 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_without_recursive_mode_subgroup(): + # set permission to g0 non-recursive mode + recursive = False + group = 'g0/g0_1' + permissions_setup_func(group, 'group.write', recursive=recursive) + + items = [x for x in _get_repo_perms(group, recursive)] + expected = 0 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'repository.read' + + items = [x for x in _get_group_perms(group, recursive)] + expected = 1 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode(): + + # set permission to g0 recursive mode, all children including + # other repos and groups should have this permission now set ! + recursive = True + group = 'g0' + permissions_setup_func(group, 'group.write', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.write' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_inner_group(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_3' + permissions_setup_func(group, 'group.none', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.none' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.none' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_deepest(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_1/g0_1_1' + permissions_setup_func(group, 'group.write', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.write' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_only_with_repos(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_2' + permissions_setup_func(group, 'group.admin', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.admin' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.admin'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/models/test_users_group_permissions_on_groups.py Fri Sep 07 02:20:02 2012 +0200 @@ -0,0 +1,170 @@ +import os +import unittest +import functools +from rhodecode.tests import * + +from rhodecode.model.repos_group import ReposGroupModel +from rhodecode.model.db import RepoGroup, Repository, User + +from rhodecode.model.meta import Session +from nose.tools import with_setup +from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \ + _get_perms, _check_expected_count, expected_count, _destroy_project_tree +from rhodecode.model.users_group import UsersGroupModel +from rhodecode.model.repo import RepoModel + + +test_u2_id = None +test_u2_gr_id = None +_get_repo_perms = None +_get_group_perms = None + + +def permissions_setup_func(group_name='g0', perm='group.read', recursive=True): + """ + Resets all permissions to perm attribute + """ + repos_group = RepoGroup.get_by_group_name(group_name=group_name) + if not repos_group: + raise Exception('Cannot get group %s' % group_name) + perms_updates = [[test_u2_gr_id, perm, 'users_group']] + ReposGroupModel()._update_permissions(repos_group, + perms_updates=perms_updates, + recursive=recursive) + Session().commit() + + +def setup_module(): + global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms + test_u2 = _create_project_tree() + Session().commit() + test_u2_id = test_u2.user_id + + gr1 = UsersGroupModel().create(name='perms_group_1') + Session().commit() + test_u2_gr_id = gr1.users_group_id + UsersGroupModel().add_user_to_group(gr1, user=test_u2_id) + Session().commit() + + _get_repo_perms = functools.partial(_get_perms, key='repositories', + test_u1_id=test_u2_id) + _get_group_perms = functools.partial(_get_perms, key='repositories_groups', + test_u1_id=test_u2_id) + + +def teardown_module(): + _destroy_project_tree(test_u2_id) + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_without_recursive_mode(): + # set permission to g0 non-recursive mode + recursive = False + group = 'g0' + permissions_setup_func(group, 'group.write', recursive=recursive) + + items = [x for x in _get_repo_perms(group, recursive)] + expected = 0 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'repository.read' + + items = [x for x in _get_group_perms(group, recursive)] + expected = 1 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_without_recursive_mode_subgroup(): + # set permission to g0 non-recursive mode + recursive = False + group = 'g0/g0_1' + permissions_setup_func(group, 'group.write', recursive=recursive) + + items = [x for x in _get_repo_perms(group, recursive)] + expected = 0 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'repository.read' + + items = [x for x in _get_group_perms(group, recursive)] + expected = 1 + assert len(items) == expected, ' %s != %s' % (len(items), expected) + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode(): + + # set permission to g0 recursive mode, all children including + # other repos and groups should have this permission now set ! + recursive = True + group = 'g0' + permissions_setup_func(group, 'group.write', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.write' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_inner_group(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_3' + permissions_setup_func(group, 'group.none', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.none' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.none' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_deepest(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_1/g0_1_1' + permissions_setup_func(group, 'group.write', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.write' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.write' + + +@with_setup(permissions_setup_func) +def test_user_permissions_on_group_with_recursive_mode_only_with_repos(): + ## set permission to g0_3 group to none + recursive = True + group = 'g0/g0_2' + permissions_setup_func(group, 'group.admin', recursive=recursive) + + repo_items = [x for x in _get_repo_perms(group, recursive)] + items = [x for x in _get_group_perms(group, recursive)] + _check_expected_count(items, repo_items, expected_count(group, True)) + + for name, perm in repo_items: + yield check_tree_perms, name, perm, group, 'repository.admin' + + for name, perm in items: + yield check_tree_perms, name, perm, group, 'group.admin'