view kallithea/model/user.py @ 4208:ad38f9f93b3b kallithea-2.2.5-rebrand

Correct licensing information in individual files. The top-level license file is now LICENSE.md. Also, in various places where there should have been joint copyright holders listed, a single copyright holder was listed. It does not appear easy to add a link to a large list of copyright holders in these places, so it simply refers to the fact that various authors hold copyright. In future, if an easy method is discovered to link to a list from those places, we should do so. Finally, text is added to LICENSE.md to point to where the full list of copyright holders is, and that Kallithea as a whole is GPLv3'd.
author Bradley M. Kuhn <bkuhn@sfconservancy.org>
date Wed, 21 May 2014 16:59:37 -0400
parents d208416c84c6
children de26de99ac5b
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.model.user
~~~~~~~~~~~~~~~~~~~~

users model for RhodeCode

:created_on: Apr 9, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH.
:license: GPLv3, see LICENSE.md for more details.
"""


import logging
import traceback
from pylons import url
from pylons.i18n.translation import _

from sqlalchemy.exc import DatabaseError


from kallithea.lib.utils2 import safe_unicode, generate_api_key, get_current_authuser
from kallithea.lib.caching_query import FromCache
from kallithea.model import BaseModel
from kallithea.model.db import User, UserToPerm, Notification, \
    UserEmailMap, UserIpMap
from kallithea.lib.exceptions import DefaultUserException, \
    UserOwnsReposException
from kallithea.model.meta import Session


log = logging.getLogger(__name__)


class UserModel(BaseModel):
    cls = User

    def get(self, user_id, cache=False):
        user = self.sa.query(User)
        if cache:
            user = user.options(FromCache("sql_cache_short",
                                          "get_user_%s" % user_id))
        return user.get(user_id)

    def get_user(self, user):
        return self._get_user(user)

    def get_by_username(self, username, cache=False, case_insensitive=False):

        if case_insensitive:
            user = self.sa.query(User).filter(User.username.ilike(username))
        else:
            user = self.sa.query(User)\
                .filter(User.username == username)
        if cache:
            user = user.options(FromCache("sql_cache_short",
                                          "get_user_%s" % username))
        return user.scalar()

    def get_by_email(self, email, cache=False, case_insensitive=False):
        return User.get_by_email(email, case_insensitive, cache)

    def get_by_api_key(self, api_key, cache=False):
        return User.get_by_api_key(api_key, cache)

    def create(self, form_data, cur_user=None):
        if not cur_user:
            cur_user = getattr(get_current_authuser(), 'username', None)

        from kallithea.lib.hooks import log_create_user, check_allowed_create_user
        _fd = form_data
        user_data = {
            'username': _fd['username'], 'password': _fd['password'],
            'email': _fd['email'], 'firstname': _fd['firstname'], 'lastname': _fd['lastname'],
            'active': _fd['active'], 'admin': False
        }
        # raises UserCreationError if it's not allowed
        check_allowed_create_user(user_data, cur_user)
        from kallithea.lib.auth import get_crypt_password
        try:
            new_user = User()
            for k, v in form_data.items():
                if k == 'password':
                    v = get_crypt_password(v)
                if k == 'firstname':
                    k = 'name'
                setattr(new_user, k, v)

            new_user.api_key = generate_api_key(form_data['username'])
            self.sa.add(new_user)

            log_create_user(new_user.get_dict(), cur_user)
            return new_user
        except Exception:
            log.error(traceback.format_exc())
            raise

    def create_or_update(self, username, password, email, firstname='',
                         lastname='', active=True, admin=False,
                         extern_type=None, extern_name=None, cur_user=None):
        """
        Creates a new instance if not found, or updates current one

        :param username:
        :param password:
        :param email:
        :param active:
        :param firstname:
        :param lastname:
        :param active:
        :param admin:
        :param extern_name:
        :param extern_type:
        :param cur_user:
        """
        if not cur_user:
            cur_user = getattr(get_current_authuser(), 'username', None)

        from kallithea.lib.auth import get_crypt_password, check_password
        from kallithea.lib.hooks import log_create_user, check_allowed_create_user
        user_data = {
            'username': username, 'password': password,
            'email': email, 'firstname': firstname, 'lastname': lastname,
            'active': active, 'admin': admin
        }
        # raises UserCreationError if it's not allowed
        check_allowed_create_user(user_data, cur_user)

        log.debug('Checking for %s account in RhodeCode database' % username)
        user = User.get_by_username(username, case_insensitive=True)
        if user is None:
            log.debug('creating new user %s' % username)
            new_user = User()
            edit = False
        else:
            log.debug('updating user %s' % username)
            new_user = user
            edit = True

        try:
            new_user.username = username
            new_user.admin = admin
            new_user.email = email
            new_user.active = active
            new_user.extern_name = safe_unicode(extern_name) if extern_name else None
            new_user.extern_type = safe_unicode(extern_type) if extern_type else None
            new_user.name = firstname
            new_user.lastname = lastname

            if not edit:
                new_user.api_key = generate_api_key(username)

            # set password only if creating an user or password is changed
            password_change = new_user.password and not check_password(password,
                                                            new_user.password)
            if not edit or password_change:
                reason = 'new password' if edit else 'new user'
                log.debug('Updating password reason=>%s' % (reason,))
                new_user.password = get_crypt_password(password) if password else None

            self.sa.add(new_user)

            if not edit:
                log_create_user(new_user.get_dict(), cur_user)
            return new_user
        except (DatabaseError,):
            log.error(traceback.format_exc())
            raise

    def create_registration(self, form_data):
        from kallithea.model.notification import NotificationModel

        try:
            form_data['admin'] = False
            form_data['extern_name'] = 'rhodecode'
            form_data['extern_type'] = 'rhodecode'
            new_user = self.create(form_data)

            self.sa.add(new_user)
            self.sa.flush()

            # notification to admins
            subject = _('New user registration')
            body = ('New user registration\n'
                    '---------------------\n'
                    '- Username: %s\n'
                    '- Full Name: %s\n'
                    '- Email: %s\n')
            body = body % (new_user.username, new_user.full_name, new_user.email)
            edit_url = url('edit_user', id=new_user.user_id, qualified=True)
            kw = {'registered_user_url': edit_url}
            NotificationModel().create(created_by=new_user, subject=subject,
                                       body=body, recipients=None,
                                       type_=Notification.TYPE_REGISTRATION,
                                       email_kwargs=kw)

        except Exception:
            log.error(traceback.format_exc())
            raise

    def update(self, user_id, form_data, skip_attrs=[]):
        from kallithea.lib.auth import get_crypt_password
        try:
            user = self.get(user_id, cache=False)
            if user.username == User.DEFAULT_USER:
                raise DefaultUserException(
                                _("You can't Edit this user since it's "
                                  "crucial for entire application"))

            for k, v in form_data.items():
                if k in skip_attrs:
                    continue
                if k == 'new_password' and v:
                    user.password = get_crypt_password(v)
                else:
                    # old legacy thing orm models store firstname as name,
                    # need proper refactor to username
                    if k == 'firstname':
                        k = 'name'
                    setattr(user, k, v)
            self.sa.add(user)
        except Exception:
            log.error(traceback.format_exc())
            raise

    def update_user(self, user, **kwargs):
        from kallithea.lib.auth import get_crypt_password
        try:
            user = self._get_user(user)
            if user.username == User.DEFAULT_USER:
                raise DefaultUserException(
                    _("You can't Edit this user since it's"
                      " crucial for entire application")
                )

            for k, v in kwargs.items():
                if k == 'password' and v:
                    v = get_crypt_password(v)

                setattr(user, k, v)
            self.sa.add(user)
            return user
        except Exception:
            log.error(traceback.format_exc())
            raise

    def delete(self, user, cur_user=None):
        if not cur_user:
            cur_user = getattr(get_current_authuser(), 'username', None)
        user = self._get_user(user)

        try:
            if user.username == User.DEFAULT_USER:
                raise DefaultUserException(
                    _(u"You can't remove this user since it's"
                      " crucial for entire application")
                )
            if user.repositories:
                repos = [x.repo_name for x in user.repositories]
                raise UserOwnsReposException(
                    _(u'user "%s" still owns %s repositories and cannot be '
                      'removed. Switch owners or remove those repositories. %s')
                    % (user.username, len(repos), ', '.join(repos))
                )
            self.sa.delete(user)

            from kallithea.lib.hooks import log_delete_user
            log_delete_user(user.get_dict(), cur_user)
        except Exception:
            log.error(traceback.format_exc())
            raise

    def reset_password_link(self, data):
        from kallithea.lib.celerylib import tasks, run_task
        from kallithea.model.notification import EmailNotificationModel
        user_email = data['email']
        try:
            user = User.get_by_email(user_email)
            if user:
                log.debug('password reset user found %s' % user)
                link = url('reset_password_confirmation', key=user.api_key,
                           qualified=True)
                reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
                body = EmailNotificationModel().get_email_tmpl(reg_type,
                                                    **{'user': user.short_contact,
                                                       'reset_url': link})
                log.debug('sending email')
                run_task(tasks.send_email, user_email,
                         _("Password reset link"), body, body)
                log.info('send new password mail to %s' % user_email)
            else:
                log.debug("password reset email %s not found" % user_email)
        except Exception:
            log.error(traceback.format_exc())
            return False

        return True

    def reset_password(self, data):
        from kallithea.lib.celerylib import tasks, run_task
        from kallithea.lib import auth
        user_email = data['email']
        pre_db = True
        try:
            user = User.get_by_email(user_email)
            new_passwd = auth.PasswordGenerator().gen_password(8,
                            auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
            if user:
                user.password = auth.get_crypt_password(new_passwd)
                Session().add(user)
                Session().commit()
                log.info('change password for %s' % user_email)
            if new_passwd is None:
                raise Exception('unable to generate new password')

            pre_db = False
            run_task(tasks.send_email, user_email,
                     _('Your new password'),
                     _('Your new RhodeCode password:%s') % (new_passwd,))
            log.info('send new password mail to %s' % user_email)

        except Exception:
            log.error('Failed to update user password')
            log.error(traceback.format_exc())
            if pre_db:
                # we rollback only if local db stuff fails. If it goes into
                # run_task, we're pass rollback state this wouldn't work then
                Session().rollback()

        return True

    def fill_data(self, auth_user, user_id=None, api_key=None, username=None):
        """
        Fetches auth_user by user_id,or api_key if present.
        Fills auth_user attributes with those taken from database.
        Additionally set's is_authenitated if lookup fails
        present in database

        :param auth_user: instance of user to set attributes
        :param user_id: user id to fetch by
        :param api_key: api key to fetch by
        :param username: username to fetch by
        """
        if user_id is None and api_key is None and username is None:
            raise Exception('You need to pass user_id, api_key or username')

        try:
            dbuser = None
            if user_id:
                dbuser = self.get(user_id)
            elif api_key:
                dbuser = self.get_by_api_key(api_key)
            elif username:
                dbuser = self.get_by_username(username)

            if dbuser is not None and dbuser.active:
                log.debug('filling %s data' % dbuser)
                for k, v in dbuser.get_dict().iteritems():
                    if k not in ['api_keys', 'permissions']:
                        setattr(auth_user, k, v)
            else:
                return False

        except Exception:
            log.error(traceback.format_exc())
            auth_user.is_authenticated = False
            return False

        return True

    def has_perm(self, user, perm):
        perm = self._get_perm(perm)
        user = self._get_user(user)

        return UserToPerm.query().filter(UserToPerm.user == user)\
            .filter(UserToPerm.permission == perm).scalar() is not None

    def grant_perm(self, user, perm):
        """
        Grant user global permissions

        :param user:
        :param perm:
        """
        user = self._get_user(user)
        perm = self._get_perm(perm)
        # if this permission is already granted skip it
        _perm = UserToPerm.query()\
            .filter(UserToPerm.user == user)\
            .filter(UserToPerm.permission == perm)\
            .scalar()
        if _perm:
            return
        new = UserToPerm()
        new.user = user
        new.permission = perm
        self.sa.add(new)
        return new

    def revoke_perm(self, user, perm):
        """
        Revoke users global permissions

        :param user:
        :param perm:
        """
        user = self._get_user(user)
        perm = self._get_perm(perm)

        obj = UserToPerm.query()\
                .filter(UserToPerm.user == user)\
                .filter(UserToPerm.permission == perm)\
                .scalar()
        if obj:
            self.sa.delete(obj)

    def add_extra_email(self, user, email):
        """
        Adds email address to UserEmailMap

        :param user:
        :param email:
        """
        from kallithea.model import forms
        form = forms.UserExtraEmailForm()()
        data = form.to_python(dict(email=email))
        user = self._get_user(user)

        obj = UserEmailMap()
        obj.user = user
        obj.email = data['email']
        self.sa.add(obj)
        return obj

    def delete_extra_email(self, user, email_id):
        """
        Removes email address from UserEmailMap

        :param user:
        :param email_id:
        """
        user = self._get_user(user)
        obj = UserEmailMap.query().get(email_id)
        if obj:
            self.sa.delete(obj)

    def add_extra_ip(self, user, ip):
        """
        Adds ip address to UserIpMap

        :param user:
        :param ip:
        """
        from kallithea.model import forms
        form = forms.UserExtraIpForm()()
        data = form.to_python(dict(ip=ip))
        user = self._get_user(user)

        obj = UserIpMap()
        obj.user = user
        obj.ip_addr = data['ip']
        self.sa.add(obj)
        return obj

    def delete_extra_ip(self, user, ip_id):
        """
        Removes ip address from UserIpMap

        :param user:
        :param ip_id:
        """
        user = self._get_user(user)
        obj = UserIpMap.query().get(ip_id)
        if obj:
            self.sa.delete(obj)