view rhodecode/model/users_group.py @ 1757:2aa7f454204e beta

fixes #298, ldap email addresses created by rhodecode automatically during first login didn't get converted to lower case, which lead to lookup failures and than wrong checks for uniqueness. Fixed that by putting a setter on db model column that will enforce converting to lowercase.
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 06 Dec 2011 01:18:27 +0200
parents 8ecc6b8229a5
children cf51bbfb120e
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
    rhodecode.model.users_group
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    users group model for RhodeCode

    :created_on: Oct 1, 2011
    :author: nvinot
    :copyright: (C) 2011-2011 Nicolas Vinot <aeris@imirhil.fr>
    :license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import traceback

from rhodecode.model import BaseModel
from rhodecode.model.db import UsersGroupMember, UsersGroup,\
    UsersGroupRepoToPerm, Permission, UsersGroupToPerm
from rhodecode.lib.exceptions import UsersGroupsAssignedException

log = logging.getLogger(__name__)


class UsersGroupModel(BaseModel):

    def __get_users_group(self, users_group):
        return self._get_instance(UsersGroup, users_group)

    def get(self, users_group_id, cache=False):
        return UsersGroup.get(users_group_id)

    def get_by_name(self, name, cache=False, case_insensitive=False):
        return UsersGroup.get_by_group_name(name, cache, case_insensitive)

    def create(self, name, active=True):
        try:
            new = UsersGroup()
            new.users_group_name = name
            new.users_group_active = active
            self.sa.add(new)
            return new
        except:
            log.error(traceback.format_exc())
            raise

    def update(self, users_group, form_data):

        try:
            users_group = self.__get_users_group(users_group)

            for k, v in form_data.items():
                if k == 'users_group_members':
                    users_group.members = []
                    self.sa.flush()
                    members_list = []
                    if v:
                        v = [v] if isinstance(v, basestring) else v
                        for u_id in set(v):
                            member = UsersGroupMember(users_group.users_group_id, u_id)
                            members_list.append(member)
                    setattr(users_group, 'members', members_list)
                setattr(users_group, k, v)

            self.sa.add(users_group)
        except:
            log.error(traceback.format_exc())
            raise

    def delete(self, users_group):
        try:
            users_group = self.__get_users_group(users_group)
            
            # check if this group is not assigned to repo
            assigned_groups = UsersGroupRepoToPerm.query()\
                .filter(UsersGroupRepoToPerm.users_group == users_group).all()

            if assigned_groups:
                raise UsersGroupsAssignedException('RepoGroup assigned to %s' %
                                                   assigned_groups)
            
            self.sa.delete(users_group)
        except:
            log.error(traceback.format_exc())
            raise

    def add_user_to_group(self, users_group, user):
        for m in users_group.members:
            u = m.user
            if u.user_id == user.user_id:
                return m

        try:
            users_group_member = UsersGroupMember()
            users_group_member.user = user
            users_group_member.users_group = users_group

            users_group.members.append(users_group_member)
            user.group_member.append(users_group_member)

            self.sa.add(users_group_member)
            return users_group_member
        except:
            log.error(traceback.format_exc())
            raise

    def has_perm(self, users_group, perm):
        if not isinstance(perm, Permission):
            raise Exception('perm needs to be an instance of Permission class')

        users_group = self.__get_users_group(users_group)

        return UsersGroupToPerm.query()\
            .filter(UsersGroupToPerm.users_group == users_group)\
            .filter(UsersGroupToPerm.permission == perm).scalar() is not None

    def grant_perm(self, users_group, perm):
        if not isinstance(perm, Permission):
            raise Exception('perm needs to be an instance of Permission class')

        users_group = self.__get_users_group(users_group)

        new = UsersGroupToPerm()
        new.users_group = users_group
        new.permission = perm
        self.sa.add(new)


    def revoke_perm(self, users_group, perm):
        if not isinstance(perm, Permission):
            raise Exception('perm needs to be an instance of Permission class')
        
        users_group = self.__get_users_group(users_group)
        
        obj = UsersGroupToPerm.query()\
            .filter(UsersGroupToPerm.users_group == users_group)\
            .filter(UsersGroupToPerm.permission == perm).one()
        self.sa.delete(obj)