Mercurial > kallithea
view kallithea/model/user_group.py @ 8966:59185ce619c3 i18n
i18n: pl: reintroduce malformed translation removed by 19506ee31c1c
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 12 Dec 2022 18:28:10 +0100 |
parents | 5e46f73f0d1c |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.model.user_group ~~~~~~~~~~~~~~~~~~~~~~~~~~ user group model for Kallithea This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Oct 1, 2011 :author: nvinot, marcink """ import logging import traceback from kallithea.lib.exceptions import RepoGroupAssignmentError, UserGroupsAssignedException from kallithea.model import db, meta log = logging.getLogger(__name__) class UserGroupModel(object): def _create_default_perms(self, user_group): # create default permission default_perm = 'usergroup.read' def_user = db.User.get_default_user() for p in def_user.user_perms: if p.permission.permission_name.startswith('usergroup.'): default_perm = p.permission.permission_name break user_group_to_perm = db.UserUserGroupToPerm() user_group_to_perm.permission = db.Permission.get_by_key(default_perm) user_group_to_perm.user_group = user_group user_group_to_perm.user_id = def_user.user_id meta.Session().add(user_group_to_perm) return user_group_to_perm def _update_permissions(self, user_group, perms_new=None, perms_updates=None): from kallithea.lib.auth import HasUserGroupPermissionLevel if not perms_new: perms_new = [] if not perms_updates: perms_updates = [] # update permissions for member, perm, member_type in perms_updates: if member_type == 'user': # this updates existing one self.grant_user_permission( user_group=user_group, user=member, perm=perm ) else: # check if we have permissions to alter this usergroup's access if HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( target_user_group=user_group, user_group=member, perm=perm ) # set new permissions for member, perm, member_type in perms_new: if member_type == 'user': self.grant_user_permission( user_group=user_group, user=member, perm=perm ) else: # check if we have permissions to alter this usergroup's access if HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( target_user_group=user_group, user_group=member, perm=perm ) def get(self, user_group_id): return db.UserGroup.get(user_group_id) def get_group(self, user_group): return db.UserGroup.guess_instance(user_group) def get_by_name(self, name, case_insensitive=False): return db.UserGroup.get_by_group_name(name, case_insensitive=case_insensitive) def create(self, name, description, owner, active=True, group_data=None): try: new_user_group = db.UserGroup() new_user_group.owner = db.User.guess_instance(owner) new_user_group.users_group_name = name new_user_group.user_group_description = description new_user_group.users_group_active = active if group_data: new_user_group.group_data = group_data meta.Session().add(new_user_group) self._create_default_perms(new_user_group) self.grant_user_permission(user_group=new_user_group, user=owner, perm='usergroup.admin') return new_user_group except Exception: log.error(traceback.format_exc()) raise def update(self, user_group, form_data): try: user_group = db.UserGroup.guess_instance(user_group) for k, v in form_data.items(): if k == 'users_group_members': members_list = [] if v: v = [v] if isinstance(v, str) else v for u_id in set(v): member = db.UserGroupMember(user_group.users_group_id, u_id) members_list.append(member) meta.Session().add(member) user_group.members = members_list setattr(user_group, k, v) # Flush to make db assign users_group_member_id to newly # created UserGroupMembers. meta.Session().flush() except Exception: log.error(traceback.format_exc()) raise def delete(self, user_group, force=False): """ Deletes user group, unless force flag is used raises exception if there are members in that group, else deletes group and users :param user_group: :param force: """ user_group = db.UserGroup.guess_instance(user_group) try: # check if this group is not assigned to repo assigned_groups = db.UserGroupRepoToPerm.query() \ .filter(db.UserGroupRepoToPerm.users_group == user_group).all() assigned_groups = [x.repository.repo_name for x in assigned_groups] if assigned_groups and not force: raise UserGroupsAssignedException( 'User Group assigned to %s' % ", ".join(assigned_groups)) meta.Session().delete(user_group) except Exception: log.error(traceback.format_exc()) raise def add_user_to_group(self, user_group, user): """Return True if user already is in the group - else return the new UserGroupMember""" user_group = db.UserGroup.guess_instance(user_group) user = db.User.guess_instance(user) for m in user_group.members: u = m.user if u.user_id == user.user_id: # user already in the group, skip return True try: user_group_member = db.UserGroupMember() user_group_member.user = user user_group_member.users_group = user_group user_group.members.append(user_group_member) user.group_member.append(user_group_member) meta.Session().add(user_group_member) return user_group_member except Exception: log.error(traceback.format_exc()) raise def remove_user_from_group(self, user_group, user): user_group = db.UserGroup.guess_instance(user_group) user = db.User.guess_instance(user) user_group_member = None for m in user_group.members: if m.user_id == user.user_id: # Found this user's membership row user_group_member = m break if user_group_member: try: meta.Session().delete(user_group_member) return True except Exception: log.error(traceback.format_exc()) raise else: # User isn't in that group return False def has_perm(self, user_group, perm): user_group = db.UserGroup.guess_instance(user_group) perm = db.Permission.guess_instance(perm) return db.UserGroupToPerm.query() \ .filter(db.UserGroupToPerm.users_group == user_group) \ .filter(db.UserGroupToPerm.permission == perm).scalar() is not None def grant_perm(self, user_group, perm): user_group = db.UserGroup.guess_instance(user_group) perm = db.Permission.guess_instance(perm) # if this permission is already granted skip it _perm = db.UserGroupToPerm.query() \ .filter(db.UserGroupToPerm.users_group == user_group) \ .filter(db.UserGroupToPerm.permission == perm) \ .scalar() if _perm: return new = db.UserGroupToPerm() new.users_group = user_group new.permission = perm meta.Session().add(new) return new def revoke_perm(self, user_group, perm): user_group = db.UserGroup.guess_instance(user_group) perm = db.Permission.guess_instance(perm) obj = db.UserGroupToPerm.query() \ .filter(db.UserGroupToPerm.users_group == user_group) \ .filter(db.UserGroupToPerm.permission == perm).scalar() if obj is not None: meta.Session().delete(obj) def grant_user_permission(self, user_group, user, perm): """ Grant permission for user on given user group, or update existing one if found :param user_group: Instance of UserGroup, users_group_id, or users_group_name :param user: Instance of User, user_id or username :param perm: Instance of Permission, or permission_name """ user_group = db.UserGroup.guess_instance(user_group) user = db.User.guess_instance(user) permission = db.Permission.guess_instance(perm) # check if we have that permission already obj = db.UserUserGroupToPerm.query() \ .filter(db.UserUserGroupToPerm.user == user) \ .filter(db.UserUserGroupToPerm.user_group == user_group) \ .scalar() if obj is None: # create new ! obj = db.UserUserGroupToPerm() meta.Session().add(obj) obj.user_group = user_group obj.user = user obj.permission = permission log.debug('Granted perm %s to %s on %s', perm, user, user_group) return obj def revoke_user_permission(self, user_group, user): """ Revoke permission for user on given repository group :param user_group: Instance of RepoGroup, repositories_group_id, or repositories_group name :param user: Instance of User, user_id or username """ user_group = db.UserGroup.guess_instance(user_group) user = db.User.guess_instance(user) obj = db.UserUserGroupToPerm.query() \ .filter(db.UserUserGroupToPerm.user == user) \ .filter(db.UserUserGroupToPerm.user_group == user_group) \ .scalar() if obj is not None: meta.Session().delete(obj) log.debug('Revoked perm on %s on %s', user_group, user) def grant_user_group_permission(self, target_user_group, user_group, perm): """ Grant user group permission for given target_user_group :param target_user_group: :param user_group: :param perm: """ target_user_group = db.UserGroup.guess_instance(target_user_group) user_group = db.UserGroup.guess_instance(user_group) permission = db.Permission.guess_instance(perm) # forbid assigning same user group to itself if target_user_group == user_group: raise RepoGroupAssignmentError('target repo:%s cannot be ' 'assigned to itself' % target_user_group) # check if we have that permission already obj = db.UserGroupUserGroupToPerm.query() \ .filter(db.UserGroupUserGroupToPerm.target_user_group == target_user_group) \ .filter(db.UserGroupUserGroupToPerm.user_group == user_group) \ .scalar() if obj is None: # create new ! obj = db.UserGroupUserGroupToPerm() meta.Session().add(obj) obj.user_group = user_group obj.target_user_group = target_user_group obj.permission = permission log.debug('Granted perm %s to %s on %s', perm, target_user_group, user_group) return obj def revoke_user_group_permission(self, target_user_group, user_group): """ Revoke user group permission for given target_user_group :param target_user_group: :param user_group: """ target_user_group = db.UserGroup.guess_instance(target_user_group) user_group = db.UserGroup.guess_instance(user_group) obj = db.UserGroupUserGroupToPerm.query() \ .filter(db.UserGroupUserGroupToPerm.target_user_group == target_user_group) \ .filter(db.UserGroupUserGroupToPerm.user_group == user_group) \ .scalar() if obj is not None: meta.Session().delete(obj) log.debug('Revoked perm on %s on %s', target_user_group, user_group) def enforce_groups(self, user, groups, extern_type=None): user = db.User.guess_instance(user) log.debug('Enforcing groups %s on user %s', user, groups) current_groups = user.group_member # find the external created groups externals = [x.users_group for x in current_groups if 'extern_type' in x.users_group.group_data] # calculate from what groups user should be removed # externals that are not in groups for gr in externals: if gr.users_group_name not in groups: log.debug('Removing user %s from user group %s', user, gr) self.remove_user_from_group(gr, user) # now we calculate in which groups user should be == groups params owner = db.User.get_first_admin().username for gr in set(groups): existing_group = db.UserGroup.get_by_group_name(gr) if not existing_group: desc = 'Automatically created from plugin:%s' % extern_type # we use first admin account to set the owner of the group existing_group = UserGroupModel().create(gr, desc, owner, group_data={'extern_type': extern_type}) # we can only add users to special groups created via plugins managed = 'extern_type' in existing_group.group_data if managed: log.debug('Adding user %s to user group %s', user, gr) UserGroupModel().add_user_to_group(existing_group, user) else: log.debug('Skipping addition to group %s since it is ' 'not managed by auth plugins' % gr)