Mercurial > kallithea
view kallithea/tests/functional/test_admin_users.py @ 5914:be1d366f461c
pytest migration: functional: switch to standard assert statements
Use unittest2pytest to replace unittest-style assert statements (e.g.
assertEqual) with standard Python assert statements to benefit from pytest's
improved reporting on assert failures.
The conversion by unittest2pytest was correct, except for line wrapping
problems.
author | Thomas De Schampheleire <thomas.de.schampheleire@gmail.com> |
---|---|
date | Sat, 14 May 2016 21:04:26 +0200 |
parents | e82963996ae8 |
children | 7f2aa3ec2931 |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sqlalchemy.orm.exc import NoResultFound import pytest from kallithea.tests import * from kallithea.tests.fixture import Fixture from kallithea.controllers.admin.users import UsersController from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys from kallithea.lib.auth import check_password from kallithea.model.user import UserModel from kallithea.model import validators from kallithea.lib import helpers as h from kallithea.model.meta import Session from webob.exc import HTTPNotFound fixture = Fixture() class TestAdminUsersController(TestControllerPytest): test_user_1 = 'testme' @classmethod def teardown_class(cls): if User.get_by_username(cls.test_user_1): UserModel().delete(cls.test_user_1) Session().commit() def test_index(self): self.log_user() response = self.app.get(url('users')) # Test response... def test_create(self): self.log_user() username = 'newtestuser' password = 'test12' password_confirmation = password name = u'name' lastname = u'lastname' email = 'mail@example.com' response = self.app.post(url('users'), {'username': username, 'password': password, 'password_confirmation': password_confirmation, 'firstname': name, 'active': True, 'lastname': lastname, 'extern_name': 'internal', 'extern_type': 'internal', 'email': email, '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''') self.checkSessionFlash(response, '''/edit">%s</a>''' % (username)) new_user = Session().query(User). \ filter(User.username == username).one() assert new_user.username == username assert check_password(password, new_user.password) == True assert new_user.name == name assert new_user.lastname == lastname assert new_user.email == email response.follow() response = response.follow() response.mustcontain("""newtestuser""") def test_create_err(self): self.log_user() username = 'new_user' password = '' name = u'name' lastname = u'lastname' email = 'errmail.example.com' response = self.app.post(url('users'), {'username': username, 'password': password, 'name': name, 'active': False, 'lastname': lastname, 'email': email, '_authentication_token': self.authentication_token()}) msg = validators.ValidUsername(False, {})._messages['system_invalid_username'] msg = h.html_escape(msg % {'username': 'new_user'}) response.mustcontain("""<span class="error-message">%s</span>""" % msg) response.mustcontain("""<span class="error-message">Please enter a value</span>""") response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""") def get_user(): Session().query(User).filter(User.username == username).one() with pytest.raises(NoResultFound): get_user(), 'found user in database' def test_new(self): self.log_user() response = self.app.get(url('new_user')) @parametrize('name,attrs', [('firstname', {'firstname': 'new_username'}), ('lastname', {'lastname': 'new_username'}), ('admin', {'admin': True}), ('admin', {'admin': False}), ('extern_type', {'extern_type': 'ldap'}), ('extern_type', {'extern_type': None}), ('extern_name', {'extern_name': 'test'}), ('extern_name', {'extern_name': None}), ('active', {'active': False}), ('active', {'active': True}), ('email', {'email': 'someemail@example.com'}), # ('new_password', {'new_password': 'foobar123', # 'password_confirmation': 'foobar123'}) ]) def test_update(self, name, attrs): self.log_user() usr = fixture.create_user(self.test_user_1, password='qweqwe', email='testme@example.com', extern_type='internal', extern_name=self.test_user_1, skip_if_exists=True) Session().commit() params = usr.get_api_data(True) params.update({'password_confirmation': ''}) params.update({'new_password': ''}) params.update(attrs) if name == 'email': params['emails'] = [attrs['email']] if name == 'extern_type': #cannot update this via form, expected value is original one params['extern_type'] = "internal" if name == 'extern_name': #cannot update this via form, expected value is original one params['extern_name'] = self.test_user_1 # special case since this user is not # logged in yet his data is not filled # so we use creation data params.update({'_authentication_token': self.authentication_token()}) response = self.app.put(url('user', id=usr.user_id), params) self.checkSessionFlash(response, 'User updated successfully') params.pop('_authentication_token') updated_user = User.get_by_username(self.test_user_1) updated_params = updated_user.get_api_data(True) updated_params.update({'password_confirmation': ''}) updated_params.update({'new_password': ''}) assert params == updated_params def test_delete(self): self.log_user() username = 'newtestuserdeleteme' fixture.create_user(name=username) new_user = Session().query(User) \ .filter(User.username == username).one() response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_delete_repo_err(self): self.log_user() username = 'repoerr' reponame = u'repoerr_fail' fixture.create_user(name=username) fixture.create_repo(name=reponame, cur_user=username) new_user = Session().query(User) \ .filter(User.username == username).one() response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 repositories and cannot be removed. ' 'Switch owners or remove those repositories: ' '%s' % (username, reponame)) response = self.app.post(url('delete_repo', repo_name=reponame), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Deleted repository %s' % reponame) response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_delete_repo_group_err(self): self.log_user() username = 'repogrouperr' groupname = u'repogroup_fail' fixture.create_user(name=username) fixture.create_repo_group(name=groupname, cur_user=username) new_user = Session().query(User) \ .filter(User.username == username).one() response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 repository groups and cannot be removed. ' 'Switch owners or remove those repository groups: ' '%s' % (username, groupname)) # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner # rg = RepoGroup.get_by_group_name(group_name=groupname) # response = self.app.get(url('repos_groups', id=rg.group_id)) response = self.app.post(url('delete_repo_group', group_name=groupname), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Removed repository group %s' % groupname) response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_delete_user_group_err(self): self.log_user() username = 'usergrouperr' groupname = u'usergroup_fail' fixture.create_user(name=username) ug = fixture.create_user_group(name=groupname, cur_user=username) new_user = Session().query(User) \ .filter(User.username == username).one() response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 user groups and cannot be removed. ' 'Switch owners or remove those user groups: ' '%s' % (username, groupname)) # TODO: why do this fail? #response = self.app.delete(url('delete_users_group', id=groupname)) #self.checkSessionFlash(response, 'Removed user group %s' % groupname) fixture.destroy_user_group(ug.users_group_id) response = self.app.post(url('user', id=new_user.user_id), params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_show(self): response = self.app.get(url('user', id=1)) def test_edit(self): self.log_user() user = User.get_by_username(TEST_USER_ADMIN_LOGIN) response = self.app.get(url('edit_user', id=user.user_id)) def test_add_perm_create_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname=u'a', lastname=u'b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository assert UserModel().has_perm(user, perm_none) == False assert UserModel().has_perm(user, perm_create) == False response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True, _authentication_token=self.authentication_token())) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository assert UserModel().has_perm(uid, perm_none) == False assert UserModel().has_perm(uid, perm_create) == True finally: UserModel().delete(uid) Session().commit() def test_revoke_perm_create_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname=u'a', lastname=u'b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository assert UserModel().has_perm(user, perm_none) == False assert UserModel().has_perm(user, perm_create) == False response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', _authentication_token=self.authentication_token())) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository assert UserModel().has_perm(uid, perm_none) == True assert UserModel().has_perm(uid, perm_create) == False finally: UserModel().delete(uid) Session().commit() def test_add_perm_fork_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.fork.none') perm_fork = Permission.get_by_key('hg.fork.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname=u'a', lastname=u'b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository assert UserModel().has_perm(user, perm_none) == False assert UserModel().has_perm(user, perm_fork) == False response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True, _authentication_token=self.authentication_token())) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository assert UserModel().has_perm(uid, perm_none) == False assert UserModel().has_perm(uid, perm_create) == True finally: UserModel().delete(uid) Session().commit() def test_revoke_perm_fork_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.fork.none') perm_fork = Permission.get_by_key('hg.fork.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname=u'a', lastname=u'b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository assert UserModel().has_perm(user, perm_none) == False assert UserModel().has_perm(user, perm_fork) == False response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', _authentication_token=self.authentication_token())) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository assert UserModel().has_perm(uid, perm_none) == True assert UserModel().has_perm(uid, perm_create) == False finally: UserModel().delete(uid) Session().commit() def test_ips(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get(url('edit_user_ips', id=user.user_id)) response.mustcontain('All IP addresses are allowed') @parametrize('test_name,ip,ip_range,failure', [ ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), ('127_bad_ip', 'foobar', 'foobar', True), ]) def test_add_ip(self, test_name, ip, ip_range, failure): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.put(url('edit_user_ips', id=user_id), params=dict(new_ip=ip, _authentication_token=self.authentication_token())) if failure: self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address') response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) else: response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) ## cleanup for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all(): Session().delete(del_ip) Session().commit() def test_delete_ip(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id ip = '127.0.0.1/32' ip_range = '127.0.0.1 - 127.0.0.1' new_ip = UserModel().add_extra_ip(user_id, ip) Session().commit() new_ip_id = new_ip.ip_id response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) self.app.post(url('edit_user_ips', id=user_id), params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token())) response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain('All IP addresses are allowed') response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) def test_api_keys(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get(url('edit_user_api_keys', id=user.user_id)) response.mustcontain(user.api_key) response.mustcontain('Expires: Never') @parametrize('desc,lifetime', [ ('forever', -1), ('5mins', 60*5), ('30days', 60*60*24*30), ]) def test_add_api_keys(self, desc, lifetime): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.post(url('edit_user_api_keys', id=user_id), {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'API key successfully created') try: response = response.follow() user = User.get(user_id) for api_key in user.api_keys: response.mustcontain(api_key) finally: for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all(): Session().delete(api_key) Session().commit() def test_remove_api_key(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.post(url('edit_user_api_keys', id=user_id), {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'API key successfully created') response = response.follow() #now delete our key keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() assert 1 == len(keys) response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'API key successfully deleted') keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() assert 0 == len(keys) def test_reset_main_api_key(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id api_key = user.api_key response = self.app.get(url('edit_user_api_keys', id=user_id)) response.mustcontain(api_key) response.mustcontain('Expires: Never') response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'API key successfully reset') response = response.follow() response.mustcontain(no=[api_key]) class TestAdminUsersController_unittest(object): """ Unit tests for the users controller """ def test_get_user_or_raise_if_default(self, monkeypatch): # flash complains about an non-existing session def flash_mock(*args, **kwargs): pass monkeypatch.setattr(h, 'flash', flash_mock) u = UsersController() # a regular user should work correctly user = User.get_by_username(TEST_USER_REGULAR_LOGIN) assert u._get_user_or_raise_if_default(user.user_id) == user # the default user should raise with pytest.raises(HTTPNotFound): u._get_user_or_raise_if_default(User.get_default_user().user_id) class TestAdminUsersControllerForDefaultUser(TestControllerPytest): """ Edit actions on the default user are not allowed. Validate that they throw a 404 exception. """ def test_edit_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user', id=user.user_id), status=404) def test_edit_advanced_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404) # API keys def test_edit_api_keys_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404) def test_add_api_keys_default_user(self): self.log_user() user = User.get_default_user() response = self.app.post(url('edit_user_api_keys', id=user.user_id), {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404) def test_delete_api_keys_default_user(self): self.log_user() user = User.get_default_user() response = self.app.post(url('edit_user_api_keys', id=user.user_id), {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404) # Permissions def test_edit_perms_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user_perms', id=user.user_id), status=404) def test_update_perms_default_user(self): self.log_user() user = User.get_default_user() response = self.app.post(url('edit_user_perms', id=user.user_id), {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404) # Emails def test_edit_emails_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user_emails', id=user.user_id), status=404) def test_add_emails_default_user(self): self.log_user() user = User.get_default_user() response = self.app.post(url('edit_user_emails', id=user.user_id), {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404) def test_delete_emails_default_user(self): self.log_user() user = User.get_default_user() response = self.app.post(url('edit_user_emails', id=user.user_id), {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404) # IP addresses # Add/delete of IP addresses for the default user is used to maintain # the global IP whitelist and thus allowed. Only 'edit' is forbidden. def test_edit_ip_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)