Mercurial > kallithea
view kallithea/controllers/admin/my_account.py @ 7483:2e3e1dacdbb7
auth: drop confusing and layering-violating User.AuthUser property
Keep it simple and just explicitly create an AuthUser if "needed". That can
perhaps be simplified further later on.
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Wed, 26 Dec 2018 02:11:55 +0100 |
parents | af938280e76a |
children | 66c208bf56fe |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.controllers.admin.my_account ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ my account controller for Kallithea admin This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: August 20, 2013 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging import traceback import formencode from sqlalchemy import func from formencode import htmlfill from tg import request, tmpl_context as c from tg.i18n import ugettext as _ from webob.exc import HTTPFound from kallithea.config.routing import url from kallithea.lib import helpers as h from kallithea.lib import auth_modules from kallithea.lib.auth import LoginRequired, AuthUser from kallithea.lib.base import BaseController, render from kallithea.lib.utils2 import generate_api_key, safe_int from kallithea.model.db import Repository, UserEmailMap, User, UserFollowing from kallithea.model.forms import UserForm, PasswordChangeForm from kallithea.model.user import UserModel from kallithea.model.repo import RepoModel from kallithea.model.api_key import ApiKeyModel from kallithea.model.meta import Session log = logging.getLogger(__name__) class MyAccountController(BaseController): """REST Controller styled on the Atom Publishing Protocol""" # To properly map this controller, ensure your config/routing.py # file has a resource setup: # map.resource('setting', 'settings', controller='admin/settings', # path_prefix='/admin', name_prefix='admin_') @LoginRequired() def _before(self, *args, **kwargs): super(MyAccountController, self)._before(*args, **kwargs) def __load_data(self): c.user = User.get(request.authuser.user_id) if c.user.is_default_user: h.flash(_("You can't edit this user since it's" " crucial for entire application"), category='warning') raise HTTPFound(location=url('users')) def _load_my_repos_data(self, watched=False): if watched: admin = False repos_list = Session().query(Repository) \ .join(UserFollowing) \ .filter(UserFollowing.user_id == request.authuser.user_id).all() else: admin = True repos_list = Session().query(Repository) \ .filter(Repository.owner_id == request.authuser.user_id).all() return RepoModel().get_repos_as_dict(repos_list, admin=admin) def my_account(self): c.active = 'profile' self.__load_data() c.perm_user = AuthUser(user_id=request.authuser.user_id) managed_fields = auth_modules.get_managed_fields(c.user) def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global'] if 'hg.register.none' in def_user_perms: managed_fields.extend(['username', 'firstname', 'lastname', 'email']) c.readonly = lambda n: 'readonly' if n in managed_fields else None defaults = c.user.get_dict() update = False if request.POST: _form = UserForm(edit=True, old_data={'user_id': request.authuser.user_id, 'email': request.authuser.email})() form_result = {} try: post_data = dict(request.POST) post_data['new_password'] = '' post_data['password_confirmation'] = '' form_result = _form.to_python(post_data) # skip updating those attrs for my account skip_attrs = ['admin', 'active', 'extern_type', 'extern_name', 'new_password', 'password_confirmation', ] + managed_fields UserModel().update(request.authuser.user_id, form_result, skip_attrs=skip_attrs) h.flash(_('Your account was updated successfully'), category='success') Session().commit() update = True except formencode.Invalid as errors: return htmlfill.render( render('admin/my_account/my_account.html'), defaults=errors.value, errors=errors.error_dict or {}, prefix_error=False, encoding="UTF-8", force_defaults=False) except Exception: log.error(traceback.format_exc()) h.flash(_('Error occurred during update of user %s') % form_result.get('username'), category='error') if update: raise HTTPFound(location='my_account') return htmlfill.render( render('admin/my_account/my_account.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def my_account_password(self): c.active = 'password' self.__load_data() managed_fields = auth_modules.get_managed_fields(c.user) c.can_change_password = 'password' not in managed_fields if request.POST and c.can_change_password: _form = PasswordChangeForm(request.authuser.username)() try: form_result = _form.to_python(request.POST) UserModel().update(request.authuser.user_id, form_result) Session().commit() h.flash(_("Successfully updated password"), category='success') except formencode.Invalid as errors: return htmlfill.render( render('admin/my_account/my_account.html'), defaults=errors.value, errors=errors.error_dict or {}, prefix_error=False, encoding="UTF-8", force_defaults=False) except Exception: log.error(traceback.format_exc()) h.flash(_('Error occurred during update of user password'), category='error') return render('admin/my_account/my_account.html') def my_account_repos(self): c.active = 'repos' self.__load_data() # data used to render the grid c.data = self._load_my_repos_data() return render('admin/my_account/my_account.html') def my_account_watched(self): c.active = 'watched' self.__load_data() # data used to render the grid c.data = self._load_my_repos_data(watched=True) return render('admin/my_account/my_account.html') def my_account_perms(self): c.active = 'perms' self.__load_data() c.perm_user = AuthUser(user_id=request.authuser.user_id) return render('admin/my_account/my_account.html') def my_account_emails(self): c.active = 'emails' self.__load_data() c.user_email_map = UserEmailMap.query() \ .filter(UserEmailMap.user == c.user).all() return render('admin/my_account/my_account.html') def my_account_emails_add(self): email = request.POST.get('new_email') try: UserModel().add_extra_email(request.authuser.user_id, email) Session().commit() h.flash(_("Added email %s to user") % email, category='success') except formencode.Invalid as error: msg = error.error_dict['email'] h.flash(msg, category='error') except Exception: log.error(traceback.format_exc()) h.flash(_('An error occurred during email saving'), category='error') raise HTTPFound(location=url('my_account_emails')) def my_account_emails_delete(self): email_id = request.POST.get('del_email_id') user_model = UserModel() user_model.delete_extra_email(request.authuser.user_id, email_id) Session().commit() h.flash(_("Removed email from user"), category='success') raise HTTPFound(location=url('my_account_emails')) def my_account_api_keys(self): c.active = 'api_keys' self.__load_data() show_expired = True c.lifetime_values = [ (str(-1), _('Forever')), (str(5), _('5 minutes')), (str(60), _('1 hour')), (str(60 * 24), _('1 day')), (str(60 * 24 * 30), _('1 month')), ] c.lifetime_options = [(c.lifetime_values, _("Lifetime"))] c.user_api_keys = ApiKeyModel().get_api_keys(request.authuser.user_id, show_expired=show_expired) return render('admin/my_account/my_account.html') def my_account_api_keys_add(self): lifetime = safe_int(request.POST.get('lifetime'), -1) description = request.POST.get('description') ApiKeyModel().create(request.authuser.user_id, description, lifetime) Session().commit() h.flash(_("API key successfully created"), category='success') raise HTTPFound(location=url('my_account_api_keys')) def my_account_api_keys_delete(self): api_key = request.POST.get('del_api_key') if request.POST.get('del_api_key_builtin'): user = User.get(request.authuser.user_id) user.api_key = generate_api_key() Session().commit() h.flash(_("API key successfully reset"), category='success') elif api_key: ApiKeyModel().delete(api_key, request.authuser.user_id) Session().commit() h.flash(_("API key successfully deleted"), category='success') raise HTTPFound(location=url('my_account_api_keys'))