view rhodecode/tests/models/test_permissions.py @ 4116:ffd45b185016 rhodecode-2.2.5-gpl

Imported some of the GPLv3'd changes from RhodeCode v2.2.5. This imports changes between changesets 21af6c4eab3d and 6177597791c2 in RhodeCode's original repository, including only changes to Python files and HTML. RhodeCode clearly licensed its changes to these files under GPLv3 in their /LICENSE file, which states the following: The Python code and integrated HTML are licensed under the GPLv3 license. (See: https://code.rhodecode.com/rhodecode/files/v2.2.5/LICENSE or http://web.archive.org/web/20140512193334/https://code.rhodecode.com/rhodecode/files/f3b123159901f15426d18e3dc395e8369f70ebe0/LICENSE for an online copy of that LICENSE file) Conservancy reviewed these changes and confirmed that they can be licensed as a whole to the Kallithea project under GPLv3-only. While some of the contents committed herein are clearly licensed GPLv3-or-later, on the whole we must assume the are GPLv3-only, since the statement above from RhodeCode indicates that they intend GPLv3-only as their license, per GPLv3ยง14 and other relevant sections of GPLv3.
author Bradley M. Kuhn <bkuhn@sfconservancy.org>
date Wed, 02 Jul 2014 19:03:13 -0400
parents a5888ca796b5
children da3c57422ee6
line wrap: on
line source

from rhodecode.tests import *
from rhodecode.tests.fixture import Fixture
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\
    Permission, UserToPerm
from rhodecode.model.user import UserModel

from rhodecode.model.meta import Session
from rhodecode.model.user_group import UserGroupModel
from rhodecode.lib.auth import AuthUser
from rhodecode.model.permission import PermissionModel


fixture = Fixture()


class TestPermissions(BaseTestCase):
    def __init__(self, methodName='runTest'):
        super(TestPermissions, self).__init__(methodName=methodName)

    @classmethod
    def setUpClass(cls):
        #recreate default user to get a clean start
        PermissionModel().create_default_permissions(user=User.DEFAULT_USER,
                                                     force=True)
        Session().commit()

    def setUp(self):
        self.u1 = UserModel().create_or_update(
            username=u'u1', password=u'qweqwe',
            email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
        )
        self.u2 = UserModel().create_or_update(
            username=u'u2', password=u'qweqwe',
            email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
        )
        self.u3 = UserModel().create_or_update(
            username=u'u3', password=u'qweqwe',
            email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
        )
        self.anon = User.get_default_user()
        self.a1 = UserModel().create_or_update(
            username=u'a1', password=u'qweqwe',
            email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
        )
        Session().commit()

    def tearDown(self):
        if hasattr(self, 'test_repo'):
            RepoModel().delete(repo=self.test_repo)

        UserModel().delete(self.u1)
        UserModel().delete(self.u2)
        UserModel().delete(self.u3)
        UserModel().delete(self.a1)
        if hasattr(self, 'g1'):
            RepoGroupModel().delete(self.g1.group_id)
        if hasattr(self, 'g2'):
            RepoGroupModel().delete(self.g2.group_id)

        if hasattr(self, 'ug1'):
            UserGroupModel().delete(self.ug1, force=True)

        Session().commit()

    def test_default_perms_set(self):
        u1_auth = AuthUser(user_id=self.u1.user_id)
        perms = {
            'repositories_groups': {},
            'global': set([u'hg.create.repository', u'repository.read',
                           u'hg.register.manual_activate']),
            'repositories': {u'vcs_test_hg': u'repository.read'}
        }
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         perms['repositories'][HG_REPO])
        new_perm = 'repository.write'
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
                                          perm=new_perm)
        Session().commit()

        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         new_perm)

    def test_default_admin_perms_set(self):
        a1_auth = AuthUser(user_id=self.a1.user_id)
        perms = {
            'repositories_groups': {},
            'global': set([u'hg.admin', 'hg.create.write_on_repogroup.true']),
            'repositories': {u'vcs_test_hg': u'repository.admin'}
        }
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
                         perms['repositories'][HG_REPO])
        new_perm = 'repository.write'
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
                                          perm=new_perm)
        Session().commit()
        # cannot really downgrade admins permissions !? they still gets set as
        # admin !
        u1_auth = AuthUser(user_id=self.a1.user_id)
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         perms['repositories'][HG_REPO])

    def test_default_group_perms(self):
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
        u1_auth = AuthUser(user_id=self.u1.user_id)
        perms = {
            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
            'global': set(Permission.DEFAULT_USER_PERMISSIONS),
            'repositories': {u'vcs_test_hg': u'repository.read'}
        }
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         perms['repositories'][HG_REPO])
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         perms['repositories_groups'])
        self.assertEqual(u1_auth.permissions['global'],
                         perms['global'])

    def test_default_admin_group_perms(self):
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
        a1_auth = AuthUser(user_id=self.a1.user_id)
        perms = {
            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
            'repositories': {u'vcs_test_hg': 'repository.admin'}
        }

        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
                         perms['repositories'][HG_REPO])
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                         perms['repositories_groups'])

    def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
        # make group
        self.ug1 = fixture.create_user_group('G1')
        UserGroupModel().add_user_to_group(self.ug1, self.u1)

        # set permission to lower
        new_perm = 'repository.none'
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         new_perm)

        # grant perm for group this should not override permission from user
        # since it has explicitly set
        new_perm_gr = 'repository.write'
        RepoModel().grant_user_group_permission(repo=HG_REPO,
                                                 group_name=self.ug1,
                                                 perm=new_perm_gr)
        # check perms
        u1_auth = AuthUser(user_id=self.u1.user_id)
        perms = {
            'repositories_groups': {},
            'global': set([u'hg.create.repository', u'repository.read',
                           u'hg.register.manual_activate']),
            'repositories': {u'vcs_test_hg': u'repository.read'}
        }
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         new_perm)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         perms['repositories_groups'])

    def test_propagated_permission_from_users_group(self):
        # make group
        self.ug1 = fixture.create_user_group('G1')
        UserGroupModel().add_user_to_group(self.ug1, self.u3)

        # grant perm for group this should override default permission from user
        new_perm_gr = 'repository.write'
        RepoModel().grant_user_group_permission(repo=HG_REPO,
                                                 group_name=self.ug1,
                                                 perm=new_perm_gr)
        # check perms
        u3_auth = AuthUser(user_id=self.u3.user_id)
        perms = {
            'repositories_groups': {},
            'global': set([u'hg.create.repository', u'repository.read',
                           u'hg.register.manual_activate']),
            'repositories': {u'vcs_test_hg': u'repository.read'}
        }
        self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
                         new_perm_gr)
        self.assertEqual(u3_auth.permissions['repositories_groups'],
                         perms['repositories_groups'])

    def test_propagated_permission_from_users_group_lower_weight(self):
        # make group
        self.ug1 = fixture.create_user_group('G1')
        # add user to group
        UserGroupModel().add_user_to_group(self.ug1, self.u1)

        # set permission to lower
        new_perm_h = 'repository.write'
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
                                          perm=new_perm_h)
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         new_perm_h)

        # grant perm for group this should NOT override permission from user
        # since it's lower than granted
        new_perm_l = 'repository.read'
        RepoModel().grant_user_group_permission(repo=HG_REPO,
                                                 group_name=self.ug1,
                                                 perm=new_perm_l)
        # check perms
        u1_auth = AuthUser(user_id=self.u1.user_id)
        perms = {
            'repositories_groups': {},
            'global': set([u'hg.create.repository', u'repository.read',
                           u'hg.register.manual_activate']),
            'repositories': {u'vcs_test_hg': u'repository.write'}
        }
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
                         new_perm_h)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         perms['repositories_groups'])

    def test_repo_in_group_permissions(self):
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
        self.g2 = fixture.create_repo_group('group2', skip_if_exists=True)
        # both perms should be read !
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.read', u'group2': u'group.read'})

        a1_auth = AuthUser(user_id=self.anon.user_id)
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.read', u'group2': u'group.read'})

        #Change perms to none for both groups
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
                                               user=self.anon,
                                               perm='group.none')
        RepoGroupModel().grant_user_permission(repo_group=self.g2,
                                               user=self.anon,
                                               perm='group.none')

        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

        a1_auth = AuthUser(user_id=self.anon.user_id)
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

        # add repo to group
        name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
        self.test_repo = fixture.create_repo(name=name,
                                             repo_type='hg',
                                             repo_group=self.g1,
                                             cur_user=self.u1,)

        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

        a1_auth = AuthUser(user_id=self.anon.user_id)
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

        #grant permission for u2 !
        RepoGroupModel().grant_user_permission(repo_group=self.g1, user=self.u2,
                                               perm='group.read')
        RepoGroupModel().grant_user_permission(repo_group=self.g2, user=self.u2,
                                               perm='group.read')
        Session().commit()
        self.assertNotEqual(self.u1, self.u2)
        #u1 and anon should have not change perms while u2 should !
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

        u2_auth = AuthUser(user_id=self.u2.user_id)
        self.assertEqual(u2_auth.permissions['repositories_groups'],
                 {u'group1': u'group.read', u'group2': u'group.read'})

        a1_auth = AuthUser(user_id=self.anon.user_id)
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                 {u'group1': u'group.none', u'group2': u'group.none'})

    def test_repo_group_user_as_user_group_member(self):
        # create Group1
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
        a1_auth = AuthUser(user_id=self.anon.user_id)

        self.assertEqual(a1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.read'})

        # set default permission to none
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
                                               user=self.anon,
                                               perm='group.none')
        # make group
        self.ug1 = fixture.create_user_group('G1')
        # add user to group
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
        Session().commit()

        # check if user is in the group
        membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
        self.assertEqual(membrs, [self.u1.user_id])
        # add some user to that group

        # check his permissions
        a1_auth = AuthUser(user_id=self.anon.user_id)
        self.assertEqual(a1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.none'})

        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.none'})

        # grant ug1 read permissions for
        RepoGroupModel().grant_user_group_permission(repo_group=self.g1,
                                                      group_name=self.ug1,
                                                      perm='group.read')
        Session().commit()
        # check if the
        obj = Session().query(UserGroupRepoGroupToPerm)\
            .filter(UserGroupRepoGroupToPerm.group == self.g1)\
            .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
            .scalar()
        self.assertEqual(obj.permission.permission_name, 'group.read')

        a1_auth = AuthUser(user_id=self.anon.user_id)

        self.assertEqual(a1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.none'})

        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories_groups'],
                         {u'group1': u'group.read'})

    def test_inherited_permissions_from_default_on_user_enabled(self):
        user_model = UserModel()
        # enable fork and create on default user
        usr = 'default'
        user_model.revoke_perm(usr, 'hg.create.none')
        user_model.grant_perm(usr, 'hg.create.repository')
        user_model.revoke_perm(usr, 'hg.fork.none')
        user_model.grant_perm(usr, 'hg.fork.repository')
        # make sure inherit flag is turned on
        self.u1.inherit_default_permissions = True
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        # this user will have inherited permissions from default user
        self.assertEqual(u1_auth.permissions['global'],
                         set(['hg.create.repository', 'hg.fork.repository',
                              'hg.register.manual_activate',
                              'hg.extern_activate.auto',
                              'repository.read', 'group.read',
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))

    def test_inherited_permissions_from_default_on_user_disabled(self):
        user_model = UserModel()
        # disable fork and create on default user
        usr = 'default'
        user_model.revoke_perm(usr, 'hg.create.repository')
        user_model.grant_perm(usr, 'hg.create.none')
        user_model.revoke_perm(usr, 'hg.fork.repository')
        user_model.grant_perm(usr, 'hg.fork.none')
        # make sure inherit flag is turned on
        self.u1.inherit_default_permissions = True
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        # this user will have inherited permissions from default user
        self.assertEqual(u1_auth.permissions['global'],
                         set(['hg.create.none', 'hg.fork.none',
                              'hg.register.manual_activate',
                              'hg.extern_activate.auto',
                              'repository.read', 'group.read',
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))

    def test_non_inherited_permissions_from_default_on_user_enabled(self):
        user_model = UserModel()
        # enable fork and create on default user
        usr = 'default'
        user_model.revoke_perm(usr, 'hg.create.none')
        user_model.grant_perm(usr, 'hg.create.repository')
        user_model.revoke_perm(usr, 'hg.fork.none')
        user_model.grant_perm(usr, 'hg.fork.repository')

        #disable global perms on specific user
        user_model.revoke_perm(self.u1, 'hg.create.repository')
        user_model.grant_perm(self.u1, 'hg.create.none')
        user_model.revoke_perm(self.u1, 'hg.fork.repository')
        user_model.grant_perm(self.u1, 'hg.fork.none')

        # make sure inherit flag is turned off
        self.u1.inherit_default_permissions = False
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        # this user will have non inherited permissions from he's
        # explicitly set permissions
        self.assertEqual(u1_auth.permissions['global'],
                         set(['hg.create.none', 'hg.fork.none',
                              'hg.register.manual_activate',
                              'hg.extern_activate.auto',
                              'repository.read', 'group.read',
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))

    def test_non_inherited_permissions_from_default_on_user_disabled(self):
        user_model = UserModel()
        # disable fork and create on default user
        usr = 'default'
        user_model.revoke_perm(usr, 'hg.create.repository')
        user_model.grant_perm(usr, 'hg.create.none')
        user_model.revoke_perm(usr, 'hg.fork.repository')
        user_model.grant_perm(usr, 'hg.fork.none')

        #enable global perms on specific user
        user_model.revoke_perm(self.u1, 'hg.create.none')
        user_model.grant_perm(self.u1, 'hg.create.repository')
        user_model.revoke_perm(self.u1, 'hg.fork.none')
        user_model.grant_perm(self.u1, 'hg.fork.repository')

        # make sure inherit flag is turned off
        self.u1.inherit_default_permissions = False
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        # this user will have non inherited permissions from he's
        # explicitly set permissions
        self.assertEqual(u1_auth.permissions['global'],
                         set(['hg.create.repository', 'hg.fork.repository',
                              'hg.register.manual_activate',
                              'hg.extern_activate.auto',
                              'repository.read', 'group.read',
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))

    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
        #create repo as USER,
        self.test_repo = fixture.create_repo(name='myownrepo',
                                             repo_type='hg',
                                             cur_user=self.u1)

        #he has permissions of admin as owner
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
                         'repository.admin')
        #set his permission as user group, he should still be admin
        self.ug1 = fixture.create_user_group('G1')
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
        RepoModel().grant_user_group_permission(self.test_repo,
                                                 group_name=self.ug1,
                                                 perm='repository.none')

        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
                         'repository.admin')

    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
        #create repo as USER,
        self.test_repo = fixture.create_repo(name='myownrepo',
                                             repo_type='hg',
                                             cur_user=self.u1)

        #he has permissions of admin as owner
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
                         'repository.admin')
        #set his permission as user, he should still be admin
        RepoModel().grant_user_permission(self.test_repo, user=self.u1,
                                          perm='repository.none')
        Session().commit()
        u1_auth = AuthUser(user_id=self.u1.user_id)
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
                         'repository.admin')

    def _test_def_perm_equal(self, user, change_factor=0):
        perms = UserToPerm.query()\
                .filter(UserToPerm.user == user)\
                .all()
        self.assertEqual(len(perms),
                         len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
                         msg=perms)

    def test_set_default_permissions(self):
        PermissionModel().create_default_permissions(user=self.u1)
        self._test_def_perm_equal(user=self.u1)

    def test_set_default_permissions_after_one_is_missing(self):
        PermissionModel().create_default_permissions(user=self.u1)
        self._test_def_perm_equal(user=self.u1)
        #now we delete one, it should be re-created after another call
        perms = UserToPerm.query()\
                .filter(UserToPerm.user == self.u1)\
                .all()
        Session().delete(perms[0])
        Session().commit()

        self._test_def_perm_equal(user=self.u1, change_factor=-1)

        #create missing one !
        PermissionModel().create_default_permissions(user=self.u1)
        self._test_def_perm_equal(user=self.u1)

    @parameterized.expand([
        ('repository.read', 'repository.none'),
        ('group.read', 'group.none'),
        ('usergroup.read', 'usergroup.none'),
        ('hg.create.repository', 'hg.create.none'),
        ('hg.fork.repository', 'hg.fork.none'),
        ('hg.register.manual_activate', 'hg.register.auto_activate',)
    ])
    def test_set_default_permissions_after_modification(self, perm, modify_to):
        PermissionModel().create_default_permissions(user=self.u1)
        self._test_def_perm_equal(user=self.u1)

        old = Permission.get_by_key(perm)
        new = Permission.get_by_key(modify_to)
        self.assertNotEqual(old, None)
        self.assertNotEqual(new, None)

        #now modify permissions
        p = UserToPerm.query()\
                .filter(UserToPerm.user == self.u1)\
                .filter(UserToPerm.permission == old)\
                .one()
        p.permission = new
        Session().add(p)
        Session().commit()

        PermissionModel().create_default_permissions(user=self.u1)
        self._test_def_perm_equal(user=self.u1)