Mercurial > kallithea
view kallithea/model/user.py @ 5424:b361974171ea
spelling: fix inappropriate use of title case
These are not titles or labels, and should not be title cased.
author | Søren Løvborg <sorenl@unity3d.com> |
---|---|
date | Fri, 14 Aug 2015 17:21:26 +0200 |
parents | 0210d0b769d4 |
children | f629e9a0c376 |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.model.user ~~~~~~~~~~~~~~~~~~~~ users model for Kallithea This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Apr 9, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging import traceback from pylons.i18n.translation import _ from sqlalchemy.exc import DatabaseError from kallithea import EXTERN_TYPE_INTERNAL from kallithea.lib.utils2 import safe_unicode, generate_api_key, get_current_authuser from kallithea.lib.caching_query import FromCache from kallithea.model import BaseModel from kallithea.model.db import User, UserToPerm, Notification, \ UserEmailMap, UserIpMap from kallithea.lib.exceptions import DefaultUserException, \ UserOwnsReposException from kallithea.model.meta import Session log = logging.getLogger(__name__) class UserModel(BaseModel): cls = User def get(self, user_id, cache=False): user = self.sa.query(User) if cache: user = user.options(FromCache("sql_cache_short", "get_user_%s" % user_id)) return user.get(user_id) def get_user(self, user): return self._get_user(user) def create(self, form_data, cur_user=None): if not cur_user: cur_user = getattr(get_current_authuser(), 'username', None) from kallithea.lib.hooks import log_create_user, \ check_allowed_create_user _fd = form_data user_data = { 'username': _fd['username'], 'password': _fd['password'], 'email': _fd['email'], 'firstname': _fd['firstname'], 'lastname': _fd['lastname'], 'active': _fd['active'], 'admin': False } # raises UserCreationError if it's not allowed check_allowed_create_user(user_data, cur_user) from kallithea.lib.auth import get_crypt_password new_user = User() for k, v in form_data.items(): if k == 'password': v = get_crypt_password(v) if k == 'firstname': k = 'name' setattr(new_user, k, v) new_user.api_key = generate_api_key() self.sa.add(new_user) log_create_user(new_user.get_dict(), cur_user) return new_user def create_or_update(self, username, password, email, firstname='', lastname='', active=True, admin=False, extern_type=None, extern_name=None, cur_user=None): """ Creates a new instance if not found, or updates current one :param username: :param password: :param email: :param active: :param firstname: :param lastname: :param active: :param admin: :param extern_name: :param extern_type: :param cur_user: """ if not cur_user: cur_user = getattr(get_current_authuser(), 'username', None) from kallithea.lib.auth import get_crypt_password, check_password from kallithea.lib.hooks import log_create_user, \ check_allowed_create_user user_data = { 'username': username, 'password': password, 'email': email, 'firstname': firstname, 'lastname': lastname, 'active': active, 'admin': admin } # raises UserCreationError if it's not allowed check_allowed_create_user(user_data, cur_user) log.debug('Checking for %s account in Kallithea database', username) user = User.get_by_username(username, case_insensitive=True) if user is None: log.debug('creating new user %s', username) new_user = User() edit = False else: log.debug('updating user %s', username) new_user = user edit = True try: new_user.username = username new_user.admin = admin new_user.email = email new_user.active = active new_user.extern_name = safe_unicode(extern_name) \ if extern_name else None new_user.extern_type = safe_unicode(extern_type) \ if extern_type else None new_user.name = firstname new_user.lastname = lastname if not edit: new_user.api_key = generate_api_key() # set password only if creating an user or password is changed password_change = new_user.password and \ not check_password(password, new_user.password) if not edit or password_change: reason = 'new password' if edit else 'new user' log.debug('Updating password reason=>%s', reason) new_user.password = get_crypt_password(password) \ if password else None self.sa.add(new_user) if not edit: log_create_user(new_user.get_dict(), cur_user) return new_user except (DatabaseError,): log.error(traceback.format_exc()) raise def create_registration(self, form_data): from kallithea.model.notification import NotificationModel import kallithea.lib.helpers as h form_data['admin'] = False form_data['extern_name'] = EXTERN_TYPE_INTERNAL form_data['extern_type'] = EXTERN_TYPE_INTERNAL new_user = self.create(form_data) self.sa.add(new_user) self.sa.flush() # notification to admins subject = _('New user registration') body = ( 'New user registration\n' '---------------------\n' '- Username: {user.username}\n' '- Full Name: {user.full_name}\n' '- Email: {user.email}\n' ).format(user=new_user) edit_url = h.canonical_url('edit_user', id=new_user.user_id) email_kwargs = { 'registered_user_url': edit_url, 'new_username': new_user.username} NotificationModel().create(created_by=new_user, subject=subject, body=body, recipients=None, type_=Notification.TYPE_REGISTRATION, email_kwargs=email_kwargs) def update(self, user_id, form_data, skip_attrs=[]): from kallithea.lib.auth import get_crypt_password user = self.get(user_id, cache=False) if user.username == User.DEFAULT_USER: raise DefaultUserException( _("You can't edit this user since it's " "crucial for entire application")) for k, v in form_data.items(): if k in skip_attrs: continue if k == 'new_password' and v: user.password = get_crypt_password(v) else: # old legacy thing orm models store firstname as name, # need proper refactor to username if k == 'firstname': k = 'name' setattr(user, k, v) self.sa.add(user) def update_user(self, user, **kwargs): from kallithea.lib.auth import get_crypt_password user = self._get_user(user) if user.username == User.DEFAULT_USER: raise DefaultUserException( _("You can't edit this user since it's" " crucial for entire application") ) for k, v in kwargs.items(): if k == 'password' and v: v = get_crypt_password(v) setattr(user, k, v) self.sa.add(user) return user def delete(self, user, cur_user=None): if cur_user is None: cur_user = getattr(get_current_authuser(), 'username', None) user = self._get_user(user) if user.username == User.DEFAULT_USER: raise DefaultUserException( _("You can't remove this user since it is" " crucial for the entire application")) if user.repositories: repos = [x.repo_name for x in user.repositories] raise UserOwnsReposException( _('User "%s" still owns %s repositories and cannot be ' 'removed. Switch owners or remove those repositories: %s') % (user.username, len(repos), ', '.join(repos))) if user.repo_groups: repogroups = [x.group_name for x in user.repo_groups] raise UserOwnsReposException(_( 'User "%s" still owns %s repository groups and cannot be ' 'removed. Switch owners or remove those repository groups: %s') % (user.username, len(repogroups), ', '.join(repogroups))) if user.user_groups: usergroups = [x.users_group_name for x in user.user_groups] raise UserOwnsReposException( _('User "%s" still owns %s user groups and cannot be ' 'removed. Switch owners or remove those user groups: %s') % (user.username, len(usergroups), ', '.join(usergroups))) self.sa.delete(user) from kallithea.lib.hooks import log_delete_user log_delete_user(user.get_dict(), cur_user) def reset_password_link(self, data): from kallithea.lib.celerylib import tasks, run_task from kallithea.model.notification import EmailNotificationModel import kallithea.lib.helpers as h user_email = data['email'] user = User.get_by_email(user_email) if user is not None: log.debug('password reset user found %s', user) link = h.canonical_url('reset_password_confirmation', key=user.api_key) reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET body = EmailNotificationModel().get_email_tmpl( reg_type, 'txt', user=user.short_contact, reset_url=link) html_body = EmailNotificationModel().get_email_tmpl( reg_type, 'html', user=user.short_contact, reset_url=link) log.debug('sending email') run_task(tasks.send_email, [user_email], _("Password reset link"), body, html_body) log.info('send new password mail to %s', user_email) else: log.debug("password reset email %s not found", user_email) return True def reset_password(self, data): from kallithea.lib.celerylib import tasks, run_task from kallithea.lib import auth user_email = data['email'] user = User.get_by_email(user_email) new_passwd = auth.PasswordGenerator().gen_password( 8, auth.PasswordGenerator.ALPHABETS_BIG_SMALL) if user is not None: user.password = auth.get_crypt_password(new_passwd) Session().add(user) Session().commit() log.info('change password for %s', user_email) if new_passwd is None: raise Exception('unable to generate new password') run_task(tasks.send_email, [user_email], _('Your new password'), _('Your new Kallithea password:%s') % (new_passwd,)) log.info('send new password mail to %s', user_email) return True def has_perm(self, user, perm): perm = self._get_perm(perm) user = self._get_user(user) return UserToPerm.query().filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm).scalar() is not None def grant_perm(self, user, perm): """ Grant user global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) # if this permission is already granted skip it _perm = UserToPerm.query()\ .filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm)\ .scalar() if _perm: return new = UserToPerm() new.user = user new.permission = perm self.sa.add(new) return new def revoke_perm(self, user, perm): """ Revoke users global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) UserToPerm.query().filter( UserToPerm.user == user, UserToPerm.permission == perm, ).delete() def add_extra_email(self, user, email): """ Adds email address to UserEmailMap :param user: :param email: """ from kallithea.model import forms form = forms.UserExtraEmailForm()() data = form.to_python(dict(email=email)) user = self._get_user(user) obj = UserEmailMap() obj.user = user obj.email = data['email'] self.sa.add(obj) return obj def delete_extra_email(self, user, email_id): """ Removes email address from UserEmailMap :param user: :param email_id: """ user = self._get_user(user) obj = UserEmailMap.query().get(email_id) if obj is not None: self.sa.delete(obj) def add_extra_ip(self, user, ip): """ Adds IP address to UserIpMap :param user: :param ip: """ from kallithea.model import forms form = forms.UserExtraIpForm()() data = form.to_python(dict(ip=ip)) user = self._get_user(user) obj = UserIpMap() obj.user = user obj.ip_addr = data['ip'] self.sa.add(obj) return obj def delete_extra_ip(self, user, ip_id): """ Removes IP address from UserIpMap :param user: :param ip_id: """ user = self._get_user(user) obj = UserIpMap.query().get(ip_id) if obj: self.sa.delete(obj)