Mercurial > kallithea
view rhodecode/tests/functional/test_admin_users.py @ 4116:ffd45b185016 rhodecode-2.2.5-gpl
Imported some of the GPLv3'd changes from RhodeCode v2.2.5.
This imports changes between changesets 21af6c4eab3d and 6177597791c2 in
RhodeCode's original repository, including only changes to Python files and HTML.
RhodeCode clearly licensed its changes to these files under GPLv3
in their /LICENSE file, which states the following:
The Python code and integrated HTML are licensed under the GPLv3 license.
(See:
https://code.rhodecode.com/rhodecode/files/v2.2.5/LICENSE
or
http://web.archive.org/web/20140512193334/https://code.rhodecode.com/rhodecode/files/f3b123159901f15426d18e3dc395e8369f70ebe0/LICENSE
for an online copy of that LICENSE file)
Conservancy reviewed these changes and confirmed that they can be licensed as
a whole to the Kallithea project under GPLv3-only.
While some of the contents committed herein are clearly licensed
GPLv3-or-later, on the whole we must assume the are GPLv3-only, since the
statement above from RhodeCode indicates that they intend GPLv3-only as their
license, per GPLv3ยง14 and other relevant sections of GPLv3.
author | Bradley M. Kuhn <bkuhn@sfconservancy.org> |
---|---|
date | Wed, 02 Jul 2014 19:03:13 -0400 |
parents | d7488551578e |
children | da3c57422ee6 |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sqlalchemy.orm.exc import NoResultFound from rhodecode.tests import * from rhodecode.tests.fixture import Fixture from rhodecode.model.db import User, Permission, UserIpMap, UserApiKeys from rhodecode.lib.auth import check_password from rhodecode.model.user import UserModel from rhodecode.model import validators from rhodecode.lib import helpers as h from rhodecode.model.meta import Session fixture = Fixture() class TestAdminUsersController(TestController): test_user_1 = 'testme' @classmethod def teardown_class(cls): if User.get_by_username(cls.test_user_1): UserModel().delete(cls.test_user_1) Session().commit() def test_index(self): self.log_user() response = self.app.get(url('users')) # Test response... def test_create(self): self.log_user() username = 'newtestuser' password = 'test12' password_confirmation = password name = 'name' lastname = 'lastname' email = 'mail@mail.com' response = self.app.post(url('users'), {'username': username, 'password': password, 'password_confirmation': password_confirmation, 'firstname': name, 'active': True, 'lastname': lastname, 'extern_name': 'rhodecode', 'extern_type': 'rhodecode', 'email': email}) self.checkSessionFlash(response, '''Created user %s''' % (username)) new_user = Session().query(User).\ filter(User.username == username).one() self.assertEqual(new_user.username, username) self.assertEqual(check_password(password, new_user.password), True) self.assertEqual(new_user.name, name) self.assertEqual(new_user.lastname, lastname) self.assertEqual(new_user.email, email) response.follow() response = response.follow() response.mustcontain("""newtestuser""") def test_create_err(self): self.log_user() username = 'new_user' password = '' name = 'name' lastname = 'lastname' email = 'errmail.com' response = self.app.post(url('users'), {'username': username, 'password': password, 'name': name, 'active': False, 'lastname': lastname, 'email': email}) msg = validators.ValidUsername(False, {})._messages['system_invalid_username'] msg = h.html_escape(msg % {'username': 'new_user'}) response.mustcontain("""<span class="error-message">%s</span>""" % msg) response.mustcontain("""<span class="error-message">Please enter a value</span>""") response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""") def get_user(): Session().query(User).filter(User.username == username).one() self.assertRaises(NoResultFound, get_user), 'found user in database' def test_new(self): self.log_user() response = self.app.get(url('new_user')) @parameterized.expand( [('firstname', {'firstname': 'new_username'}), ('lastname', {'lastname': 'new_username'}), ('admin', {'admin': True}), ('admin', {'admin': False}), ('extern_type', {'extern_type': 'ldap'}), ('extern_type', {'extern_type': None}), ('extern_name', {'extern_name': 'test'}), ('extern_name', {'extern_name': None}), ('active', {'active': False}), ('active', {'active': True}), ('email', {'email': 'some@email.com'}), # ('new_password', {'new_password': 'foobar123', # 'password_confirmation': 'foobar123'}) ]) def test_update(self, name, attrs): self.log_user() usr = fixture.create_user(self.test_user_1, password='qweqwe', email='testme@rhodecode.org', extern_type='rhodecode', extern_name=self.test_user_1, skip_if_exists=True) Session().commit() params = usr.get_api_data() params.update({'password_confirmation': ''}) params.update({'new_password': ''}) params.update(attrs) if name == 'email': params['emails'] = [attrs['email']] if name == 'extern_type': #cannot update this via form, expected value is original one params['extern_type'] = "rhodecode" if name == 'extern_name': #cannot update this via form, expected value is original one params['extern_name'] = self.test_user_1 # special case since this user is not # logged in yet his data is not filled # so we use creation data response = self.app.put(url('user', id=usr.user_id), params) self.checkSessionFlash(response, 'User updated successfully') updated_user = User.get_by_username(self.test_user_1) updated_params = updated_user.get_api_data() updated_params.update({'password_confirmation': ''}) updated_params.update({'new_password': ''}) self.assertEqual(params, updated_params) def test_delete(self): self.log_user() username = 'newtestuserdeleteme' fixture.create_user(name=username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.delete(url('user', id=new_user.user_id)) self.checkSessionFlash(response, 'Successfully deleted user') def test_show(self): response = self.app.get(url('user', id=1)) def test_edit(self): self.log_user() user = User.get_by_username(TEST_USER_ADMIN_LOGIN) response = self.app.get(url('edit_user', id=user.user_id)) def test_add_perm_create_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname='a', lastname='b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_create), False) response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True)) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(uid, perm_none), False) self.assertEqual(UserModel().has_perm(uid, perm_create), True) finally: UserModel().delete(uid) Session().commit() def test_revoke_perm_create_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname='a', lastname='b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_create), False) response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put')) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(uid, perm_none), True) self.assertEqual(UserModel().has_perm(uid, perm_create), False) finally: UserModel().delete(uid) Session().commit() def test_add_perm_fork_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.fork.none') perm_fork = Permission.get_by_key('hg.fork.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname='a', lastname='b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_fork), False) response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True)) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(uid, perm_none), False) self.assertEqual(UserModel().has_perm(uid, perm_create), True) finally: UserModel().delete(uid) Session().commit() def test_revoke_perm_fork_repo(self): self.log_user() perm_none = Permission.get_by_key('hg.fork.none') perm_fork = Permission.get_by_key('hg.fork.repository') user = UserModel().create_or_update(username='dummy', password='qwe', email='dummy', firstname='a', lastname='b') Session().commit() uid = user.user_id try: #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_fork), False) response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put')) perm_none = Permission.get_by_key('hg.create.none') perm_create = Permission.get_by_key('hg.create.repository') #User should have None permission on creation repository self.assertEqual(UserModel().has_perm(uid, perm_none), True) self.assertEqual(UserModel().has_perm(uid, perm_create), False) finally: UserModel().delete(uid) Session().commit() def test_ips(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get(url('edit_user_ips', id=user.user_id)) response.mustcontain('All IP addresses are allowed') @parameterized.expand([ ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), ('127_bad_ip', 'foobar', 'foobar', True), ]) def test_add_ip(self, test_name, ip, ip_range, failure): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.put(url('edit_user_ips', id=user_id), params=dict(new_ip=ip)) if failure: self.checkSessionFlash(response, 'Please enter a valid IPv4 or IpV6 address') response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) else: response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) ## cleanup for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all(): Session().delete(del_ip) Session().commit() def test_delete_ip(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id ip = '127.0.0.1/32' ip_range = '127.0.0.1 - 127.0.0.1' new_ip = UserModel().add_extra_ip(user_id, ip) Session().commit() new_ip_id = new_ip.ip_id response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) self.app.post(url('edit_user_ips', id=user_id), params=dict(_method='delete', del_ip_id=new_ip_id)) response = self.app.get(url('edit_user_ips', id=user_id)) response.mustcontain('All IP addresses are allowed') response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) def test_api_keys(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get(url('edit_user_api_keys', id=user.user_id)) response.mustcontain(user.api_key) response.mustcontain('expires: never') @parameterized.expand([ ('forever', -1), ('5mins', 60*5), ('30days', 60*60*24*30), ]) def test_add_api_keys(self, desc, lifetime): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'put', 'description': desc, 'lifetime': lifetime}) self.checkSessionFlash(response, 'Api key successfully created') try: response = response.follow() user = User.get(user_id) for api_key in user.api_keys: response.mustcontain(api_key) finally: for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all(): Session().delete(api_key) Session().commit() def test_remove_api_key(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'put', 'description': 'desc', 'lifetime': -1}) self.checkSessionFlash(response, 'Api key successfully created') response = response.follow() #now delete our key keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() self.assertEqual(1, len(keys)) response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'delete', 'del_api_key': keys[0].api_key}) self.checkSessionFlash(response, 'Api key successfully deleted') keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() self.assertEqual(0, len(keys)) def test_reset_main_api_key(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id api_key = user.api_key response = self.app.get(url('edit_user_api_keys', id=user_id)) response.mustcontain(api_key) response.mustcontain('expires: never') response = self.app.post(url('edit_user_api_keys', id=user_id), {'_method': 'delete', 'del_api_key_builtin': api_key}) self.checkSessionFlash(response, 'Api key successfully reset') response = response.follow() response.mustcontain(no=[api_key])