# HG changeset patch # User Thomas De Schampheleire # Date 1433359538 -7200 # Node ID 8b35ec0874641081469b6929d4a0227dc4d7dfff # Parent 4c5c59b96adc029dad7378cdb3a9091a4306accb admin: users: factorize check for default user Note that one specific unittest has been commented because it relies on pytest features (monkeypatch). When pytest is the default test runner, the test should be uncommented. diff -r 4c5c59b96adc -r 8b35ec087464 kallithea/controllers/admin/users.py --- a/kallithea/controllers/admin/users.py Tue May 19 21:50:35 2015 +0200 +++ b/kallithea/controllers/admin/users.py Wed Jun 03 21:25:38 2015 +0200 @@ -34,6 +34,7 @@ from pylons.controllers.util import redirect from pylons.i18n.translation import _ from sqlalchemy.sql.expression import func +from webob.exc import HTTPNotFound import kallithea from kallithea.lib.exceptions import DefaultUserException, \ @@ -233,14 +234,17 @@ # url('user', id=ID) User.get_or_404(-1) + def _get_user_or_raise_if_default(self, id): + try: + return User.get_or_404(id, allow_default=False) + except DefaultUserException: + h.flash(_("The default user cannot be edited"), category='warning') + raise HTTPNotFound + def edit(self, id, format='html'): """GET /users/id/edit: Form to edit an existing item""" # url('edit_user', id=ID) - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'profile' c.extern_type = c.user.extern_type c.extern_name = c.user.extern_name @@ -254,11 +258,7 @@ force_defaults=False) def edit_advanced(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'advanced' c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr) @@ -277,11 +277,7 @@ force_defaults=False) def edit_api_keys(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'api_keys' show_expired = True c.lifetime_values = [ @@ -302,10 +298,7 @@ force_defaults=False) def add_api_key(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) + c.user = self._get_user_or_raise_if_default(id) lifetime = safe_int(request.POST.get('lifetime'), -1) description = request.POST.get('description') @@ -315,10 +308,7 @@ return redirect(url('edit_user_api_keys', id=c.user.user_id)) def delete_api_key(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) + c.user = self._get_user_or_raise_if_default(id) api_key = request.POST.get('del_api_key') if request.POST.get('del_api_key_builtin'): @@ -339,11 +329,7 @@ pass def edit_perms(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'perms' c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr) @@ -402,11 +388,7 @@ return redirect(url('edit_user_perms', id=id)) def edit_emails(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'emails' c.user_email_map = UserEmailMap.query()\ .filter(UserEmailMap.user == c.user).all() @@ -449,11 +431,7 @@ return redirect(url('edit_user_emails', id=id)) def edit_ips(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'ips' c.user_ip_map = UserIpMap.query()\ .filter(UserIpMap.user == c.user).all() diff -r 4c5c59b96adc -r 8b35ec087464 kallithea/lib/exceptions.py --- a/kallithea/lib/exceptions.py Tue May 19 21:50:35 2015 +0200 +++ b/kallithea/lib/exceptions.py Wed Jun 03 21:25:38 2015 +0200 @@ -45,6 +45,7 @@ class DefaultUserException(Exception): + """An invalid action was attempted on the default user""" pass diff -r 4c5c59b96adc -r 8b35ec087464 kallithea/model/db.py --- a/kallithea/model/db.py Tue May 19 21:50:35 2015 +0200 +++ b/kallithea/model/db.py Wed Jun 03 21:25:38 2015 +0200 @@ -43,6 +43,7 @@ from pylons.i18n.translation import lazy_ugettext as _ from kallithea import DB_PREFIX +from kallithea.lib.exceptions import DefaultUserException from kallithea.lib.vcs import get_backend from kallithea.lib.vcs.utils.helpers import get_scm from kallithea.lib.vcs.exceptions import VCSError @@ -526,6 +527,18 @@ self.user_id, self.username) @classmethod + def get_or_404(cls, id_, allow_default=True): + ''' + Overridden version of BaseModel.get_or_404, with an extra check on + the default user. + ''' + user = super(User, cls).get_or_404(id_) + if allow_default == False: + if user.username == User.DEFAULT_USER: + raise DefaultUserException + return user + + @classmethod def get_by_username(cls, username, case_insensitive=False, cache=False): if case_insensitive: q = cls.query().filter(cls.username.ilike(username)) diff -r 4c5c59b96adc -r 8b35ec087464 kallithea/tests/functional/test_admin_users.py --- a/kallithea/tests/functional/test_admin_users.py Tue May 19 21:50:35 2015 +0200 +++ b/kallithea/tests/functional/test_admin_users.py Wed Jun 03 21:25:38 2015 +0200 @@ -13,6 +13,7 @@ # along with this program. If not, see . from sqlalchemy.orm.exc import NoResultFound +from webob.exc import HTTPNotFound from kallithea.tests import * from kallithea.tests.fixture import Fixture @@ -497,3 +498,81 @@ self.checkSessionFlash(response, 'API key successfully reset') response = response.follow() response.mustcontain(no=[api_key]) + +# TODO To be uncommented when pytest is the test runner +#import pytest +#from kallithea.controllers.admin.users import UsersController +#class TestAdminUsersController_unittest(object): +# """ +# Unit tests for the users controller +# These are in a separate class, not deriving from TestController (and thus +# unittest.TestCase), to be able to benefit from pytest features like +# monkeypatch. +# """ +# def test_get_user_or_raise_if_default(self, monkeypatch): +# # flash complains about an unexisting session +# def flash_mock(*args, **kwargs): +# pass +# monkeypatch.setattr(h, 'flash', flash_mock) +# +# u = UsersController() +# # a regular user should work correctly +# user = User.get_by_username(TEST_USER_REGULAR_LOGIN) +# assert u._get_user_or_raise_if_default(user.user_id) == user +# # the default user should raise +# with pytest.raises(HTTPNotFound): +# u._get_user_or_raise_if_default(User.get_default_user().user_id) + + +class TestAdminUsersControllerForDefaultUser(TestController): + """ + Edit actions on the default user are not allowed. + Validate that they throw a 404 exception. + """ + def test_edit_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user', id=user.user_id), status=404) + + def test_edit_advanced_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404) + + # API keys + def test_edit_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404) + + def test_add_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.post(url('edit_user_api_keys', id=user.user_id), + {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404) + + def test_delete_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.post(url('edit_user_api_keys', id=user.user_id), + {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404) + + # Permissions + def test_edit_perms_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_perms', id=user.user_id), status=404) + + # E-mails + def test_edit_emails_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_emails', id=user.user_id), status=404) + + # IP addresses + # Add/delete of IP addresses for the default user is used to maintain + # the global IP whitelist and thus allowed. Only 'edit' is forbidden. + def test_edit_ip_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)