view rhodecode/model/permission.py @ 3181:efe23d6c178c rhodecode-0.0.1.5.2

merged with beta
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 21 Jan 2013 00:49:59 +0100
parents a5f0bc867edc
children 3563bb7b4b82
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
    rhodecode.model.permission
    ~~~~~~~~~~~~~~~~~~~~~~~~~~

    permissions model for RhodeCode

    :created_on: Aug 20, 2010
    :author: marcink
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
    :license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import traceback

from sqlalchemy.exc import DatabaseError

from rhodecode.lib.caching_query import FromCache

from rhodecode.model import BaseModel
from rhodecode.model.db import User, Permission, UserToPerm, UserRepoToPerm,\
    UserRepoGroupToPerm

log = logging.getLogger(__name__)


class PermissionModel(BaseModel):
    """
    Permissions model for RhodeCode
    """

    cls = Permission

    def get_permission(self, permission_id, cache=False):
        """
        Get's permissions by id

        :param permission_id: id of permission to get from database
        :param cache: use Cache for this query
        """
        perm = self.sa.query(Permission)
        if cache:
            perm = perm.options(FromCache("sql_cache_short",
                                          "get_permission_%s" % permission_id))
        return perm.get(permission_id)

    def get_permission_by_name(self, name, cache=False):
        """
        Get's permissions by given name

        :param name: name to fetch
        :param cache: Use cache for this query
        """
        perm = self.sa.query(Permission)\
            .filter(Permission.permission_name == name)
        if cache:
            perm = perm.options(FromCache("sql_cache_short",
                                          "get_permission_%s" % name))
        return perm.scalar()

    def update(self, form_result):
        perm_user = self.sa.query(User)\
                        .filter(User.username ==
                                form_result['perm_user_name']).scalar()
        u2p = self.sa.query(UserToPerm).filter(UserToPerm.user ==
                                               perm_user).all()
        if len(u2p) != len(User.DEFAULT_PERMISSIONS):
            raise Exception('Defined: %s should be %s  permissions for default'
                            ' user. This should not happen please verify'
                            ' your database' % (len(u2p), len(User.DEFAULT_PERMISSIONS)))

        try:
            # stage 1 change defaults
            for p in u2p:
                if p.permission.permission_name.startswith('repository.'):
                    p.permission = self.get_permission_by_name(
                                       form_result['default_repo_perm'])
                    self.sa.add(p)

                elif p.permission.permission_name.startswith('group.'):
                    p.permission = self.get_permission_by_name(
                                       form_result['default_group_perm'])
                    self.sa.add(p)

                elif p.permission.permission_name.startswith('hg.register.'):
                    p.permission = self.get_permission_by_name(
                                       form_result['default_register'])
                    self.sa.add(p)

                elif p.permission.permission_name.startswith('hg.create.'):
                    p.permission = self.get_permission_by_name(
                                        form_result['default_create'])
                    self.sa.add(p)

                elif p.permission.permission_name.startswith('hg.fork.'):
                    p.permission = self.get_permission_by_name(
                                        form_result['default_fork'])
                    self.sa.add(p)

            #stage 2 update all default permissions for repos if checked
            if form_result['overwrite_default_repo'] == True:
                _def_name = form_result['default_repo_perm'].split('repository.')[-1]
                _def = self.get_permission_by_name('repository.' + _def_name)
                # repos
                for r2p in self.sa.query(UserRepoToPerm)\
                               .filter(UserRepoToPerm.user == perm_user)\
                               .all():
                    r2p.permission = _def
                    self.sa.add(r2p)

            if form_result['overwrite_default_group'] == True:
                _def_name = form_result['default_group_perm'].split('group.')[-1]
                # groups
                _def = self.get_permission_by_name('group.' + _def_name)
                for g2p in self.sa.query(UserRepoGroupToPerm)\
                               .filter(UserRepoGroupToPerm.user == perm_user)\
                               .all():
                    g2p.permission = _def
                    self.sa.add(g2p)

            # stage 3 set anonymous access
            if perm_user.username == 'default':
                perm_user.active = bool(form_result['anonymous'])
                self.sa.add(perm_user)

        except (DatabaseError,):
            log.error(traceback.format_exc())
            raise