Mercurial > kallithea
view kallithea/controllers/admin/users.py @ 8746:259213d96dca
lib: move action_logger to new userlog model
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 02 Nov 2020 13:57:26 +0100 |
parents | 6a9e5841cc51 |
children | 4f0de9468da3 |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.controllers.admin.users ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Users crud controller This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Apr 4, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging import traceback import formencode from formencode import htmlfill from sqlalchemy.sql.expression import func from tg import app_globals, request from tg import tmpl_context as c from tg.i18n import ugettext as _ from webob.exc import HTTPFound, HTTPNotFound import kallithea import kallithea.lib.helpers as h from kallithea.lib import auth_modules, webutils from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator, LoginRequired from kallithea.lib.base import BaseController, IfSshEnabled, render from kallithea.lib.exceptions import DefaultUserException, UserCreationError, UserOwnsReposException from kallithea.lib.utils2 import datetime_to_time, fmt_date, generate_api_key, safe_int from kallithea.lib.webutils import url from kallithea.model import db, meta, userlog from kallithea.model.api_key import ApiKeyModel from kallithea.model.forms import CustomDefaultPermissionsForm, UserForm from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException from kallithea.model.user import UserModel log = logging.getLogger(__name__) class UsersController(BaseController): @LoginRequired() @HasPermissionAnyDecorator('hg.admin') def _before(self, *args, **kwargs): super(UsersController, self)._before(*args, **kwargs) def index(self, format='html'): c.users_list = db.User.query().order_by(db.User.username) \ .filter_by(is_default_user=False) \ .order_by(func.lower(db.User.username)) \ .all() users_data = [] _tmpl_lookup = app_globals.mako_lookup template = _tmpl_lookup.get_template('data_table/_dt_elements.html') grav_tmpl = '<div class="gravatar">%s</div>' def username(user_id, username): return template.get_def("user_name") \ .render_unicode(user_id, username, _=_, h=h, c=c) def user_actions(user_id, username): return template.get_def("user_actions") \ .render_unicode(user_id, username, _=_, h=h, c=c) for user in c.users_list: users_data.append({ "gravatar": grav_tmpl % h.gravatar(user.email, size=20), "raw_name": user.username, "username": username(user.user_id, user.username), "firstname": webutils.escape(user.name), "lastname": webutils.escape(user.lastname), "last_login": fmt_date(user.last_login), "last_login_raw": datetime_to_time(user.last_login), "active": h.boolicon(user.active), "admin": h.boolicon(user.admin), "extern_type": user.extern_type, "extern_name": user.extern_name, "action": user_actions(user.user_id, user.username), }) c.data = { "sort": None, "dir": "asc", "records": users_data } return render('admin/users/users.html') def create(self): c.default_extern_type = db.User.DEFAULT_AUTH_TYPE c.default_extern_name = '' user_model = UserModel() user_form = UserForm()() try: form_result = user_form.to_python(dict(request.POST)) user = user_model.create(form_result) userlog.action_logger(request.authuser, 'admin_created_user:%s' % user.username, None, request.ip_addr) webutils.flash(_('Created user %s') % user.username, category='success') meta.Session().commit() except formencode.Invalid as errors: return htmlfill.render( render('admin/users/user_add.html'), defaults=errors.value, errors=errors.error_dict or {}, prefix_error=False, encoding="UTF-8", force_defaults=False) except UserCreationError as e: webutils.flash(e, 'error') except Exception: log.error(traceback.format_exc()) webutils.flash(_('Error occurred during creation of user %s') % request.POST.get('username'), category='error') raise HTTPFound(location=url('edit_user', id=user.user_id)) def new(self, format='html'): c.default_extern_type = db.User.DEFAULT_AUTH_TYPE c.default_extern_name = '' return render('admin/users/user_add.html') def update(self, id): user_model = UserModel() user = user_model.get(id) _form = UserForm(edit=True, old_data={'user_id': id, 'email': user.email})() form_result = {} try: form_result = _form.to_python(dict(request.POST)) skip_attrs = ['extern_type', 'extern_name', ] + auth_modules.get_managed_fields(user) user_model.update(id, form_result, skip_attrs=skip_attrs) usr = form_result['username'] userlog.action_logger(request.authuser, 'admin_updated_user:%s' % usr, None, request.ip_addr) webutils.flash(_('User updated successfully'), category='success') meta.Session().commit() except formencode.Invalid as errors: defaults = errors.value e = errors.error_dict or {} defaults.update({ 'create_repo_perm': user_model.has_perm(id, 'hg.create.repository'), 'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'), }) return htmlfill.render( self._render_edit_profile(user), defaults=defaults, errors=e, prefix_error=False, encoding="UTF-8", force_defaults=False) except Exception: log.error(traceback.format_exc()) webutils.flash(_('Error occurred during update of user %s') % form_result.get('username'), category='error') raise HTTPFound(location=url('edit_user', id=id)) def delete(self, id): usr = db.User.get_or_404(id) has_ssh_keys = bool(usr.ssh_keys) try: UserModel().delete(usr) meta.Session().commit() webutils.flash(_('Successfully deleted user'), category='success') except (UserOwnsReposException, DefaultUserException) as e: webutils.flash(e, category='warning') except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during deletion of user'), category='error') else: if has_ssh_keys: SshKeyModel().write_authorized_keys() raise HTTPFound(location=url('users')) def _get_user_or_raise_if_default(self, id): try: return db.User.get_or_404(id, allow_default=False) except DefaultUserException: webutils.flash(_("The default user cannot be edited"), category='warning') raise HTTPNotFound def _render_edit_profile(self, user): c.user = user c.active = 'profile' c.perm_user = AuthUser(dbuser=user) managed_fields = auth_modules.get_managed_fields(user) c.readonly = lambda n: 'readonly' if n in managed_fields else None return render('admin/users/user_edit.html') def edit(self, id, format='html'): user = self._get_user_or_raise_if_default(id) defaults = user.get_dict() return htmlfill.render( self._render_edit_profile(user), defaults=defaults, encoding="UTF-8", force_defaults=False) def edit_advanced(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'advanced' c.perm_user = AuthUser(dbuser=c.user) umodel = UserModel() defaults = c.user.get_dict() defaults.update({ 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'), 'create_user_group_perm': umodel.has_perm(c.user, 'hg.usergroup.create.true'), 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'), }) return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def edit_api_keys(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'api_keys' show_expired = True c.lifetime_values = [ (str(-1), _('Forever')), (str(5), _('5 minutes')), (str(60), _('1 hour')), (str(60 * 24), _('1 day')), (str(60 * 24 * 30), _('1 month')), ] c.lifetime_options = [(c.lifetime_values, _("Lifetime"))] c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id, show_expired=show_expired) defaults = c.user.get_dict() return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def add_api_key(self, id): c.user = self._get_user_or_raise_if_default(id) lifetime = safe_int(request.POST.get('lifetime'), -1) description = request.POST.get('description') ApiKeyModel().create(c.user.user_id, description, lifetime) meta.Session().commit() webutils.flash(_("API key successfully created"), category='success') raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id)) def delete_api_key(self, id): c.user = self._get_user_or_raise_if_default(id) api_key = request.POST.get('del_api_key') if request.POST.get('del_api_key_builtin'): c.user.api_key = generate_api_key() meta.Session().commit() webutils.flash(_("API key successfully reset"), category='success') elif api_key: ApiKeyModel().delete(api_key, c.user.user_id) meta.Session().commit() webutils.flash(_("API key successfully deleted"), category='success') raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id)) def update_account(self, id): pass def edit_perms(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'perms' c.perm_user = AuthUser(dbuser=c.user) umodel = UserModel() defaults = c.user.get_dict() defaults.update({ 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'), 'create_user_group_perm': umodel.has_perm(c.user, 'hg.usergroup.create.true'), 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'), }) return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def update_perms(self, id): user = self._get_user_or_raise_if_default(id) try: form = CustomDefaultPermissionsForm()() form_result = form.to_python(request.POST) user_model = UserModel() defs = db.UserToPerm.query() \ .filter(db.UserToPerm.user == user) \ .all() for ug in defs: meta.Session().delete(ug) if form_result['create_repo_perm']: user_model.grant_perm(id, 'hg.create.repository') else: user_model.grant_perm(id, 'hg.create.none') if form_result['create_user_group_perm']: user_model.grant_perm(id, 'hg.usergroup.create.true') else: user_model.grant_perm(id, 'hg.usergroup.create.false') if form_result['fork_repo_perm']: user_model.grant_perm(id, 'hg.fork.repository') else: user_model.grant_perm(id, 'hg.fork.none') webutils.flash(_("Updated permissions"), category='success') meta.Session().commit() except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during permissions saving'), category='error') raise HTTPFound(location=url('edit_user_perms', id=id)) def edit_emails(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'emails' c.user_email_map = db.UserEmailMap.query() \ .filter(db.UserEmailMap.user == c.user).all() defaults = c.user.get_dict() return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def add_email(self, id): user = self._get_user_or_raise_if_default(id) email = request.POST.get('new_email') user_model = UserModel() try: user_model.add_extra_email(id, email) meta.Session().commit() webutils.flash(_("Added email %s to user") % email, category='success') except formencode.Invalid as error: msg = error.error_dict['email'] webutils.flash(msg, category='error') except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during email saving'), category='error') raise HTTPFound(location=url('edit_user_emails', id=id)) def delete_email(self, id): user = self._get_user_or_raise_if_default(id) email_id = request.POST.get('del_email_id') user_model = UserModel() user_model.delete_extra_email(id, email_id) meta.Session().commit() webutils.flash(_("Removed email from user"), category='success') raise HTTPFound(location=url('edit_user_emails', id=id)) def edit_ips(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'ips' c.user_ip_map = db.UserIpMap.query() \ .filter(db.UserIpMap.user == c.user).all() c.default_user_ip_map = db.UserIpMap.query() \ .filter(db.UserIpMap.user_id == kallithea.DEFAULT_USER_ID).all() defaults = c.user.get_dict() return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) def add_ip(self, id): ip = request.POST.get('new_ip') user_model = UserModel() try: user_model.add_extra_ip(id, ip) meta.Session().commit() webutils.flash(_("Added IP address %s to user whitelist") % ip, category='success') except formencode.Invalid as error: msg = error.error_dict['ip'] webutils.flash(msg, category='error') except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred while adding IP address'), category='error') if 'default_user' in request.POST: raise HTTPFound(location=url('admin_permissions_ips')) raise HTTPFound(location=url('edit_user_ips', id=id)) def delete_ip(self, id): ip_id = request.POST.get('del_ip_id') user_model = UserModel() user_model.delete_extra_ip(id, ip_id) meta.Session().commit() webutils.flash(_("Removed IP address from user whitelist"), category='success') if 'default_user' in request.POST: raise HTTPFound(location=url('admin_permissions_ips')) raise HTTPFound(location=url('edit_user_ips', id=id)) @IfSshEnabled def edit_ssh_keys(self, id): c.user = self._get_user_or_raise_if_default(id) c.active = 'ssh_keys' c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id) defaults = c.user.get_dict() return htmlfill.render( render('admin/users/user_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) @IfSshEnabled def ssh_keys_add(self, id): c.user = self._get_user_or_raise_if_default(id) description = request.POST.get('description') public_key = request.POST.get('public_key') try: new_ssh_key = SshKeyModel().create(c.user.user_id, description, public_key) meta.Session().commit() SshKeyModel().write_authorized_keys() webutils.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success') except SshKeyModelException as e: webutils.flash(e.args[0], category='error') raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id)) @IfSshEnabled def ssh_keys_delete(self, id): c.user = self._get_user_or_raise_if_default(id) fingerprint = request.POST.get('del_public_key_fingerprint') try: SshKeyModel().delete(fingerprint, c.user.user_id) meta.Session().commit() SshKeyModel().write_authorized_keys() webutils.flash(_("SSH key successfully deleted"), category='success') except SshKeyModelException as e: webutils.flash(e.args[0], category='error') raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))