Mercurial > kallithea
view kallithea/tests/api/api_base.py @ 8940:1d1fe8c2ef57 stable
tests: update test_api_create_repo with better coverage of create_repo api
Based on test_api_update_repo.
This shows that some of the API is broken. Failing test cases are disabled and
will be enabled when the problems are fixed.
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 12 Dec 2022 00:38:59 +0100 |
parents | 9de126d62202 |
children | f2dc57c123cf |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Tests for the JSON-RPC web api. """ import datetime import os import random import re import string from typing import Sized import mock import pytest from webtest import TestApp from kallithea.lib import ext_json from kallithea.lib.auth import AuthUser from kallithea.lib.utils2 import ascii_bytes from kallithea.model import db, meta from kallithea.model.changeset_status import ChangesetStatusModel from kallithea.model.gist import GistModel from kallithea.model.pull_request import PullRequestModel from kallithea.model.repo import RepoModel from kallithea.model.repo_group import RepoGroupModel from kallithea.model.scm import ScmModel from kallithea.model.user import UserModel from kallithea.model.user_group import UserGroupModel from kallithea.tests import base from kallithea.tests.fixture import Fixture, raise_exception API_URL = '/_admin/api' TEST_USER_GROUP = 'test_user_group' TEST_REPO_GROUP = 'test_repo_group' fixture = Fixture() def _build_data(apikey, method, **kw): """ Builds API data with given random ID For convenience, the json is returned as str """ random_id = random.randrange(1, 9999) return random_id, ext_json.dumps({ "id": random_id, "api_key": apikey, "method": method, "args": kw }) jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj)) def api_call(test_obj, params): response = test_obj.app.post(API_URL, content_type='application/json', params=params) return response ## helpers def make_user_group(name=TEST_USER_GROUP): gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN) UserGroupModel().add_user_to_group(user_group=gr, user=base.TEST_USER_ADMIN_LOGIN) meta.Session().commit() return gr def make_repo_group(name=TEST_REPO_GROUP): gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN) meta.Session().commit() return gr class _BaseTestApi(object): app: TestApp # assigned by app_fixture in subclass TestController mixin # assigned in subclass: REPO: str REPO_TYPE: str TEST_REVISION: str TEST_PR_SRC: str TEST_PR_DST: str TEST_PR_REVISIONS: Sized @classmethod def setup_class(cls): cls.usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) cls.apikey = cls.usr.api_key cls.test_user = UserModel().create_or_update( username='test-api', password='test', email='test@example.com', firstname='first', lastname='last' ) meta.Session().commit() cls.TEST_USER_LOGIN = cls.test_user.username cls.apikey_regular = cls.test_user.api_key @classmethod def teardown_class(cls): pass def setup_method(self, method): make_user_group() make_repo_group() def teardown_method(self, method): fixture.destroy_user_group(TEST_USER_GROUP) fixture.destroy_gists() fixture.destroy_repo_group(TEST_REPO_GROUP) def _compare_ok(self, id_, expected, given): expected = jsonify({ 'id': id_, 'error': None, 'result': expected }) given = ext_json.loads(given) assert expected == given, (expected, given) def _compare_error(self, id_, expected, given): expected = jsonify({ 'id': id_, 'error': expected, 'result': None }) given = ext_json.loads(given) assert expected == given, (expected, given) def test_api_wrong_key(self): id_, params = _build_data('trololo', 'get_user') response = api_call(self, params) expected = 'Invalid API key' self._compare_error(id_, expected, given=response.body) def test_api_missing_non_optional_param(self): id_, params = _build_data(self.apikey, 'get_repo') response = api_call(self, params) expected = 'Missing non optional `repoid` arg in JSON DATA' self._compare_error(id_, expected, given=response.body) def test_api_missing_non_optional_param_args_null(self): id_, params = _build_data(self.apikey, 'get_repo') params = params.replace('"args": {}', '"args": null') response = api_call(self, params) expected = 'Missing non optional `repoid` arg in JSON DATA' self._compare_error(id_, expected, given=response.body) def test_api_missing_non_optional_param_args_bad(self): id_, params = _build_data(self.apikey, 'get_repo') params = params.replace('"args": {}', '"args": 1') response = api_call(self, params) expected = 'Missing non optional `repoid` arg in JSON DATA' self._compare_error(id_, expected, given=response.body) def test_api_args_is_null(self): id_, params = _build_data(self.apikey, 'get_users', ) params = params.replace('"args": {}', '"args": null') response = api_call(self, params) assert response.status == '200 OK' def test_api_args_is_bad(self): id_, params = _build_data(self.apikey, 'get_users', ) params = params.replace('"args": {}', '"args": 1') response = api_call(self, params) assert response.status == '200 OK' def test_api_args_different_args(self): expected = { 'ascii_letters': string.ascii_letters, 'ws': string.whitespace, 'printables': string.printable } id_, params = _build_data(self.apikey, 'test', args=expected) response = api_call(self, params) assert response.status == '200 OK' self._compare_ok(id_, expected, response.body) def test_api_get_users(self): id_, params = _build_data(self.apikey, 'get_users', ) response = api_call(self, params) ret_all = [] _users = db.User.query().filter_by(is_default_user=False) \ .order_by(db.User.username).all() for usr in _users: ret = usr.get_api_data() ret_all.append(jsonify(ret)) expected = ret_all self._compare_ok(id_, expected, given=response.body) def test_api_get_user(self): id_, params = _build_data(self.apikey, 'get_user', userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) ret = usr.get_api_data() ret['permissions'] = AuthUser(dbuser=usr).permissions expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_get_user_that_does_not_exist(self): id_, params = _build_data(self.apikey, 'get_user', userid='trololo') response = api_call(self, params) expected = "user `%s` does not exist" % 'trololo' self._compare_error(id_, expected, given=response.body) def test_api_get_user_without_giving_userid(self): id_, params = _build_data(self.apikey, 'get_user') response = api_call(self, params) usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) ret = usr.get_api_data() ret['permissions'] = AuthUser(dbuser=usr).permissions expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_get_user_without_giving_userid_non_admin(self): id_, params = _build_data(self.apikey_regular, 'get_user') response = api_call(self, params) usr = db.User.get_by_username(self.TEST_USER_LOGIN) ret = usr.get_api_data() ret['permissions'] = AuthUser(dbuser=usr).permissions expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_get_user_with_giving_userid_non_admin(self): id_, params = _build_data(self.apikey_regular, 'get_user', userid=self.TEST_USER_LOGIN) response = api_call(self, params) expected = 'userid is not the same as your user' self._compare_error(id_, expected, given=response.body) def test_api_pull_remote(self): # Note: pulling from local repos is a misfeature - it will bypass access control # ... but ok, if the path already has been set in the database repo_name = 'test_pull' r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) # hack around that clone_uri can't be set to to a local path # (as shown by test_api_create_repo_clone_uri_local) r.clone_uri = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO) meta.Session().commit() pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)] id_, params = _build_data(self.apikey, 'pull', repoid=repo_name,) response = api_call(self, params) expected = {'msg': 'Pulled from `%s`' % repo_name, 'repository': repo_name} self._compare_ok(id_, expected, given=response.body) post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)] fixture.destroy_repo(repo_name) assert pre_cached_tip != post_cached_tip def test_api_pull_fork(self): fork_name = 'fork' fixture.create_fork(self.REPO, fork_name) id_, params = _build_data(self.apikey, 'pull', repoid=fork_name,) response = api_call(self, params) expected = {'msg': 'Pulled from `%s`' % fork_name, 'repository': fork_name} self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(fork_name) def test_api_pull_error_no_remote_no_fork(self): # should fail because no clone_uri is set id_, params = _build_data(self.apikey, 'pull', repoid=self.REPO, ) response = api_call(self, params) expected = 'Unable to pull changes from `%s`' % self.REPO self._compare_error(id_, expected, given=response.body) def test_api_pull_custom_remote(self): repo_name = 'test_pull_custom_remote' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) custom_remote_path = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO) id_, params = _build_data(self.apikey, 'pull', repoid=repo_name, clone_uri=custom_remote_path) response = api_call(self, params) expected = {'msg': 'Pulled from `%s`' % repo_name, 'repository': repo_name} self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_rescan_repos(self): id_, params = _build_data(self.apikey, 'rescan_repos') response = api_call(self, params) expected = {'added': [], 'removed': []} self._compare_ok(id_, expected, given=response.body) @mock.patch.object(ScmModel, 'repo_scan', raise_exception) def test_api_rescann_error(self): id_, params = _build_data(self.apikey, 'rescan_repos', ) response = api_call(self, params) expected = 'Error occurred during rescan repositories action' self._compare_error(id_, expected, given=response.body) def test_api_create_existing_user(self): id_, params = _build_data(self.apikey, 'create_user', username=base.TEST_USER_ADMIN_LOGIN, email='test@example.com', password='trololo') response = api_call(self, params) expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN self._compare_error(id_, expected, given=response.body) def test_api_create_user_with_existing_email(self): id_, params = _build_data(self.apikey, 'create_user', username=base.TEST_USER_ADMIN_LOGIN + 'new', email=base.TEST_USER_REGULAR_EMAIL, password='trololo') response = api_call(self, params) expected = "email `%s` already exist" % base.TEST_USER_REGULAR_EMAIL self._compare_error(id_, expected, given=response.body) def test_api_create_user(self): username = 'test_new_api_user' email = username + "@example.com" id_, params = _build_data(self.apikey, 'create_user', username=username, email=email, password='trololo') response = api_call(self, params) usr = db.User.get_by_username(username) ret = dict( msg='created new user `%s`' % username, user=jsonify(usr.get_api_data()) ) try: expected = ret self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user(usr.user_id) def test_api_create_user_without_password(self): username = 'test_new_api_user_passwordless' email = username + "@example.com" id_, params = _build_data(self.apikey, 'create_user', username=username, email=email) response = api_call(self, params) usr = db.User.get_by_username(username) ret = dict( msg='created new user `%s`' % username, user=jsonify(usr.get_api_data()) ) try: expected = ret self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user(usr.user_id) def test_api_create_user_with_extern_name(self): username = 'test_new_api_user_passwordless' email = username + "@example.com" id_, params = _build_data(self.apikey, 'create_user', username=username, email=email, extern_name='internal') response = api_call(self, params) usr = db.User.get_by_username(username) ret = dict( msg='created new user `%s`' % username, user=jsonify(usr.get_api_data()) ) try: expected = ret self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user(usr.user_id) @mock.patch.object(UserModel, 'create_or_update', raise_exception) def test_api_create_user_when_exception_happened(self): username = 'test_new_api_user' email = username + "@example.com" id_, params = _build_data(self.apikey, 'create_user', username=username, email=email, password='trololo') response = api_call(self, params) expected = 'failed to create user `%s`' % username self._compare_error(id_, expected, given=response.body) def test_api_delete_user(self): usr = UserModel().create_or_update(username='test_user', password='qweqwe', email='u232@example.com', firstname='u1', lastname='u1') meta.Session().commit() username = usr.username email = usr.email usr_id = usr.user_id ## DELETE THIS USER NOW id_, params = _build_data(self.apikey, 'delete_user', userid=username, ) response = api_call(self, params) ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username), 'user': None} expected = ret self._compare_ok(id_, expected, given=response.body) @mock.patch.object(UserModel, 'delete', raise_exception) def test_api_delete_user_when_exception_happened(self): usr = UserModel().create_or_update(username='test_user', password='qweqwe', email='u232@example.com', firstname='u1', lastname='u1') meta.Session().commit() username = usr.username id_, params = _build_data(self.apikey, 'delete_user', userid=username, ) response = api_call(self, params) ret = 'failed to delete user ID:%s %s' % (usr.user_id, usr.username) expected = ret self._compare_error(id_, expected, given=response.body) @base.parametrize('name,expected', [ ('firstname', 'new_username'), ('lastname', 'new_username'), ('email', 'new_username'), ('admin', True), ('admin', False), ('extern_type', 'ldap'), ('extern_type', None), ('extern_name', 'test'), ('extern_name', None), ('active', False), ('active', True), ('password', 'newpass'), ]) def test_api_update_user(self, name, expected): usr = db.User.get_by_username(self.TEST_USER_LOGIN) kw = {name: expected, 'userid': usr.user_id} id_, params = _build_data(self.apikey, 'update_user', **kw) response = api_call(self, params) ret = { 'msg': 'updated user ID:%s %s' % ( usr.user_id, self.TEST_USER_LOGIN), 'user': jsonify(db.User \ .get_by_username(self.TEST_USER_LOGIN) \ .get_api_data()) } expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_update_user_no_changed_params(self): usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) ret = jsonify(usr.get_api_data()) id_, params = _build_data(self.apikey, 'update_user', userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) ret = { 'msg': 'updated user ID:%s %s' % ( usr.user_id, base.TEST_USER_ADMIN_LOGIN), 'user': ret } expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_update_user_by_user_id(self): usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) ret = jsonify(usr.get_api_data()) id_, params = _build_data(self.apikey, 'update_user', userid=usr.user_id) response = api_call(self, params) ret = { 'msg': 'updated user ID:%s %s' % ( usr.user_id, base.TEST_USER_ADMIN_LOGIN), 'user': ret } expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_update_user_default_user(self): usr = db.User.get_default_user() id_, params = _build_data(self.apikey, 'update_user', userid=usr.user_id) response = api_call(self, params) expected = 'editing default user is forbidden' self._compare_error(id_, expected, given=response.body) @mock.patch.object(UserModel, 'update_user', raise_exception) def test_api_update_user_when_exception_happens(self): usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) ret = jsonify(usr.get_api_data()) id_, params = _build_data(self.apikey, 'update_user', userid=usr.user_id) response = api_call(self, params) ret = 'failed to update user `%s`' % usr.user_id expected = ret self._compare_error(id_, expected, given=response.body) def test_api_get_repo(self): new_group = 'some_new_group' make_user_group(new_group) RepoModel().grant_user_group_permission(repo=self.REPO, group_name=new_group, perm='repository.read') meta.Session().commit() id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO) response = api_call(self, params) assert "tags" not in response.json['result'] assert 'pull_requests' not in response.json['result'] repo = RepoModel().get_by_repo_name(self.REPO) ret = repo.get_api_data() members = [] followers = [] for user in repo.repo_to_perm: perm = user.permission.permission_name user = user.user user_data = {'name': user.username, 'type': "user", 'permission': perm} members.append(user_data) for user_group in repo.users_group_to_perm: perm = user_group.permission.permission_name user_group = user_group.users_group user_group_data = {'name': user_group.users_group_name, 'type': "user_group", 'permission': perm} members.append(user_group_data) for user in repo.followers: followers.append(user.user.get_api_data()) ret['members'] = members ret['followers'] = followers expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_user_group(new_group) id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO, with_revision_names=True, with_pullrequests=True) response = api_call(self, params) assert "v0.2.0" in response.json['result']['tags'] assert 'pull_requests' in response.json['result'] @base.parametrize('grant_perm', [ ('repository.admin'), ('repository.write'), ('repository.read'), ]) def test_api_get_repo_by_non_admin(self, grant_perm): RepoModel().grant_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN, perm=grant_perm) meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'get_repo', repoid=self.REPO) response = api_call(self, params) repo = RepoModel().get_by_repo_name(self.REPO) assert len(repo.repo_to_perm) >= 2 # make sure we actually are testing something - probably the default 2 permissions, possibly more expected = repo.get_api_data() members = [] for user in repo.repo_to_perm: perm = user.permission.permission_name user_obj = user.user user_data = {'name': user_obj.username, 'type': "user", 'permission': perm} members.append(user_data) for user_group in repo.users_group_to_perm: perm = user_group.permission.permission_name user_group_obj = user_group.users_group user_group_data = {'name': user_group_obj.users_group_name, 'type': "user_group", 'permission': perm} members.append(user_group_data) expected['members'] = members followers = [] for user in repo.followers: followers.append(user.user.get_api_data()) expected['followers'] = followers try: self._compare_ok(id_, expected, given=response.body) finally: RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN) def test_api_get_repo_by_non_admin_no_permission_to_repo(self): RepoModel().grant_user_permission(repo=self.REPO, user=db.User.DEFAULT_USER_NAME, perm='repository.none') try: RepoModel().grant_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN, perm='repository.none') id_, params = _build_data(self.apikey_regular, 'get_repo', repoid=self.REPO) response = api_call(self, params) expected = 'repository `%s` does not exist' % (self.REPO) self._compare_error(id_, expected, given=response.body) finally: RepoModel().grant_user_permission(repo=self.REPO, user=db.User.DEFAULT_USER_NAME, perm='repository.read') def test_api_get_repo_that_doesn_not_exist(self): id_, params = _build_data(self.apikey, 'get_repo', repoid='no-such-repo') response = api_call(self, params) ret = 'repository `%s` does not exist' % 'no-such-repo' expected = ret self._compare_error(id_, expected, given=response.body) def test_api_get_repos(self): id_, params = _build_data(self.apikey, 'get_repos') response = api_call(self, params) expected = jsonify([ repo.get_api_data() for repo in db.Repository.query() ]) self._compare_ok(id_, expected, given=response.body) def test_api_get_repos_non_admin(self): id_, params = _build_data(self.apikey_regular, 'get_repos') response = api_call(self, params) expected = jsonify([ repo.get_api_data() for repo in AuthUser(dbuser=db.User.get_by_username(self.TEST_USER_LOGIN)).get_all_user_repos() ]) self._compare_ok(id_, expected, given=response.body) @base.parametrize('name,ret_type', [ ('all', 'all'), ('dirs', 'dirs'), ('files', 'files'), ]) def test_api_get_repo_nodes(self, name, ret_type): rev = 'tip' path = '/' id_, params = _build_data(self.apikey, 'get_repo_nodes', repoid=self.REPO, revision=rev, root_path=path, ret_type=ret_type) response = api_call(self, params) # we don't the actual return types here since it's tested somewhere # else expected = response.json['result'] self._compare_ok(id_, expected, given=response.body) def test_api_get_repo_nodes_bad_revisions(self): rev = 'i-dont-exist' path = '/' id_, params = _build_data(self.apikey, 'get_repo_nodes', repoid=self.REPO, revision=rev, root_path=path, ) response = api_call(self, params) expected = 'failed to get repo: `%s` nodes' % self.REPO self._compare_error(id_, expected, given=response.body) def test_api_get_repo_nodes_bad_path(self): rev = 'tip' path = '/idontexits' id_, params = _build_data(self.apikey, 'get_repo_nodes', repoid=self.REPO, revision=rev, root_path=path, ) response = api_call(self, params) expected = 'failed to get repo: `%s` nodes' % self.REPO self._compare_error(id_, expected, given=response.body) def test_api_get_repo_nodes_bad_ret_type(self): rev = 'tip' path = '/' ret_type = 'error' id_, params = _build_data(self.apikey, 'get_repo_nodes', repoid=self.REPO, revision=rev, root_path=path, ret_type=ret_type) response = api_call(self, params) expected = ('ret_type must be one of %s' % (','.join(sorted(['files', 'dirs', 'all'])))) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,ret_type,grant_perm', [ ('all', 'all', 'repository.write'), ('dirs', 'dirs', 'repository.admin'), ('files', 'files', 'repository.read'), ]) def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm): RepoModel().grant_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN, perm=grant_perm) meta.Session().commit() rev = 'tip' path = '/' id_, params = _build_data(self.apikey_regular, 'get_repo_nodes', repoid=self.REPO, revision=rev, root_path=path, ret_type=ret_type) response = api_call(self, params) # we don't the actual return types here since it's tested somewhere # else expected = response.json['result'] try: self._compare_ok(id_, expected, given=response.body) finally: RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN) @base.parametrize('changing_attr,updates', [ ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), ('description', {'description': 'new description'}), ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('private', {'private': True}), #('enable_statistics', {'enable_statistics': True}), # currently broken #('enable_downloads', {'enable_downloads': True}), # currently broken ('repo_group', {'group': 'test_group_for_update'}), ]) def test_api_create_repo(self, changing_attr, updates): repo_name = repo_name_full = 'new_repo' if changing_attr == 'repo_group': group_name = updates['group'] fixture.create_repo_group(group_name) repo_name_full = '/'.join([group_name, repo_name]) updates = {} id_, params = _build_data(self.apikey, 'create_repo', repo_type=self.REPO_TYPE, repo_name=repo_name_full, **updates) response = api_call(self, params) try: expected = { 'msg': 'Created new repository `%s`' % repo_name_full, 'success': True} if changing_attr == 'clone_uri' and updates['clone_uri']: expected = 'failed to create repository `%s`' % repo_name self._compare_error(id_, expected, given=response.body) return else: self._compare_ok(id_, expected, given=response.body) repo = db.Repository.get_by_repo_name(repo_name_full) assert repo is not None expected_data = { 'clone_uri': None, 'created_on': repo.created_on, 'description': repo_name, 'enable_downloads': False, 'enable_statistics': False, 'fork_of': None, 'landing_rev': ['rev', 'tip'], 'last_changeset': {'author': '', 'date': datetime.datetime(1970, 1, 1, 0, 0), 'message': '', 'raw_id': '0000000000000000000000000000000000000000', 'revision': -1, 'short_id': '000000000000'}, 'owner': 'test_admin', 'private': False, 'repo_id': repo.repo_id, 'repo_name': repo_name_full, 'repo_type': self.REPO_TYPE, } expected_data.update(updates) if changing_attr == 'landing_rev': expected_data['landing_rev'] = expected_data['landing_rev'].split(':', 1) assert repo.get_api_data() == expected_data finally: fixture.destroy_repo(repo_name_full) if changing_attr == 'repo_group': fixture.destroy_repo_group(group_name) @base.parametrize('repo_name', [ '', '.', '..', ':', '/', '<test>', ]) def test_api_create_repo_bad_names(self, repo_name): id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE, ) response = api_call(self, params) if repo_name == '/': expected = "repo group `` not found" self._compare_error(id_, expected, given=response.body) else: expected = "failed to create repository `%s`" % repo_name self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_clone_uri_local(self): # cloning from local repos was a misfeature - it would bypass access control # TODO: introduce other test coverage of actual remote cloning clone_uri = os.path.join(base.TESTS_TMP_PATH, self.REPO) repo_name = 'api-repo' id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE, clone_uri=clone_uri, ) response = api_call(self, params) expected = "failed to create repository `%s`" % repo_name self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_and_repo_group(self): repo_group_name = 'my_gr' repo_name = '%s/api-repo' % repo_group_name # repo creation can no longer also create repo group id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE,) response = api_call(self, params) expected = 'repo group `%s` not found' % repo_group_name self._compare_error(id_, expected, given=response.body) assert RepoModel().get_by_repo_name(repo_name) is None # create group before creating repo rg = fixture.create_repo_group(repo_group_name) meta.Session().commit() id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE,) response = api_call(self, params) expected = { 'msg': 'Created new repository `%s`' % repo_name, 'success': True, } self._compare_ok(id_, expected, given=response.body) repo = RepoModel().get_by_repo_name(repo_name) assert repo is not None fixture.destroy_repo(repo_name) fixture.destroy_repo_group(repo_group_name) def test_api_create_repo_in_repo_group_without_permission(self): repo_group_basename = 'api-repo-repo' repo_group_name = '%s/%s' % (TEST_REPO_GROUP, repo_group_basename) repo_name = '%s/api-repo' % repo_group_name top_group = db.RepoGroup.get_by_group_name(TEST_REPO_GROUP) assert top_group rg = fixture.create_repo_group(repo_group_basename, parent_group_id=top_group) meta.Session().commit() RepoGroupModel().grant_user_permission(repo_group_name, self.TEST_USER_LOGIN, 'group.none') meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'create_repo', repo_name=repo_name, repo_type=self.REPO_TYPE, ) response = api_call(self, params) # API access control match Web access control: expected = 'no permission to create repo in test_repo_group/api-repo-repo' self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) fixture.destroy_repo_group(repo_group_name) def test_api_create_repo_unknown_owner(self): repo_name = 'api-repo' owner = 'i-dont-exist' id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=owner, repo_type=self.REPO_TYPE, ) response = api_call(self, params) expected = 'user `%s` does not exist' % owner self._compare_error(id_, expected, given=response.body) def test_api_create_repo_dont_specify_owner(self): repo_name = 'api-repo' owner = 'i-dont-exist' id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, repo_type=self.REPO_TYPE, ) response = api_call(self, params) repo = RepoModel().get_by_repo_name(repo_name) assert repo is not None ret = { 'msg': 'Created new repository `%s`' % repo_name, 'success': True, } expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_by_non_admin(self): repo_name = 'api-repo' owner = 'i-dont-exist' id_, params = _build_data(self.apikey_regular, 'create_repo', repo_name=repo_name, repo_type=self.REPO_TYPE, ) response = api_call(self, params) repo = RepoModel().get_by_repo_name(repo_name) assert repo is not None ret = { 'msg': 'Created new repository `%s`' % repo_name, 'success': True, } expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_by_non_admin_specify_owner(self): repo_name = 'api-repo' owner = 'i-dont-exist' id_, params = _build_data(self.apikey_regular, 'create_repo', repo_name=repo_name, repo_type=self.REPO_TYPE, owner=owner) response = api_call(self, params) expected = 'Only Kallithea admin can specify `owner` param' self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_exists(self): repo_name = self.REPO id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE,) response = api_call(self, params) expected = "repo `%s` already exist" % repo_name self._compare_error(id_, expected, given=response.body) def test_api_create_repo_dot_dot(self): # it is only possible to create repositories in existing repo groups - and '..' can't be used group_name = '%s/..' % TEST_REPO_GROUP repo_name = '%s/%s' % (group_name, 'could-be-outside') id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE,) response = api_call(self, params) expected = 'repo group `%s` not found' % group_name self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) @mock.patch.object(RepoModel, 'create', raise_exception) def test_api_create_repo_exception_occurred(self): repo_name = 'api-repo' id_, params = _build_data(self.apikey, 'create_repo', repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=self.REPO_TYPE,) response = api_call(self, params) expected = 'failed to create repository `%s`' % repo_name self._compare_error(id_, expected, given=response.body) @base.parametrize('changing_attr,updates', [ ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), ('description', {'description': 'new description'}), ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('private', {'private': True}), ('enable_statistics', {'enable_statistics': True}), ('enable_downloads', {'enable_downloads': True}), ('name', {'name': 'new_repo_name'}), ('repo_group', {'group': 'test_group_for_update'}), ]) def test_api_update_repo(self, changing_attr, updates): repo_name = 'api_update_me' repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) if changing_attr == 'repo_group': fixture.create_repo_group(updates['group']) id_, params = _build_data(self.apikey, 'update_repo', repoid=repo_name, **updates) if changing_attr == 'name': repo_name = updates['name'] if changing_attr == 'repo_group': repo_name = '/'.join([updates['group'], repo_name]) expected = { 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), 'repository': repo.get_api_data() } expected['repository'].update(updates) if changing_attr == 'clone_uri' and updates['clone_uri'] is None: expected['repository']['clone_uri'] = '' if changing_attr == 'landing_rev': expected['repository']['landing_rev'] = expected['repository']['landing_rev'].split(':', 1) if changing_attr == 'name': expected['repository']['repo_name'] = expected['repository'].pop('name') if changing_attr == 'repo_group': expected['repository']['repo_name'] = expected['repository'].pop('group') + '/' + repo.repo_name response = api_call(self, params) try: if changing_attr == 'clone_uri' and updates['clone_uri']: expected = 'failed to update repo `%s`' % repo_name self._compare_error(id_, expected, given=response.body) else: self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group': fixture.destroy_repo_group(updates['group']) @base.parametrize('changing_attr,updates', [ ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), ('description', {'description': 'new description'}), ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('enable_statistics', {'enable_statistics': True}), ('enable_downloads', {'enable_downloads': True}), ('name', {'name': 'new_repo_name'}), ('repo_group', {'group': 'test_group_for_update'}), ]) def test_api_update_group_repo(self, changing_attr, updates): group_name = 'lololo' fixture.create_repo_group(group_name) repo_name = '%s/api_update_me' % group_name repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE) if changing_attr == 'repo_group': fixture.create_repo_group(updates['group']) id_, params = _build_data(self.apikey, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) if changing_attr == 'name': repo_name = '%s/%s' % (group_name, updates['name']) if changing_attr == 'repo_group': repo_name = '/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]]) try: if changing_attr == 'clone_uri' and updates['clone_uri']: expected = 'failed to update repo `%s`' % repo_name self._compare_error(id_, expected, given=response.body) else: expected = { 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), 'repository': repo.get_api_data() } self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group': fixture.destroy_repo_group(updates['group']) fixture.destroy_repo_group(group_name) def test_api_update_repo_repo_group_does_not_exist(self): repo_name = 'admin_owned' fixture.create_repo(repo_name) updates = {'group': 'test_group_for_update'} id_, params = _build_data(self.apikey, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) try: expected = 'repository group `%s` does not exist' % updates['group'] self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_update_repo_regular_user_not_allowed(self): repo_name = 'admin_owned' fixture.create_repo(repo_name) updates = {'description': 'something else'} id_, params = _build_data(self.apikey_regular, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) try: expected = 'repository `%s` does not exist' % repo_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) @mock.patch.object(RepoModel, 'update', raise_exception) def test_api_update_repo_exception_occurred(self): repo_name = 'api_update_me' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) id_, params = _build_data(self.apikey, 'update_repo', repoid=repo_name, owner=base.TEST_USER_ADMIN_LOGIN,) response = api_call(self, params) try: expected = 'failed to update repo `%s`' % repo_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_update_repo_regular_user_change_top_level_repo_name(self): repo_name = 'admin_owned' new_repo_name = 'new_repo_name' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) RepoModel().grant_user_permission(repo=repo_name, user=self.TEST_USER_LOGIN, perm='repository.admin') UserModel().revoke_perm('default', 'hg.create.repository') UserModel().grant_perm('default', 'hg.create.none') updates = {'name': new_repo_name} id_, params = _build_data(self.apikey_regular, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) try: expected = 'no permission to create (or move) top level repositories' self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) fixture.destroy_repo(new_repo_name) def test_api_update_repo_regular_user_change_repo_name_allowed(self): repo_name = 'admin_owned' new_repo_name = 'new_repo_name' repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) RepoModel().grant_user_permission(repo=repo_name, user=self.TEST_USER_LOGIN, perm='repository.admin') UserModel().revoke_perm('default', 'hg.create.none') UserModel().grant_perm('default', 'hg.create.repository') updates = {'name': new_repo_name} id_, params = _build_data(self.apikey_regular, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) try: expected = { 'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name), 'repository': repo.get_api_data() } self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) fixture.destroy_repo(new_repo_name) def test_api_update_repo_regular_user_change_owner(self): repo_name = 'admin_owned' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) RepoModel().grant_user_permission(repo=repo_name, user=self.TEST_USER_LOGIN, perm='repository.admin') updates = {'owner': base.TEST_USER_ADMIN_LOGIN} id_, params = _build_data(self.apikey_regular, 'update_repo', repoid=repo_name, **updates) response = api_call(self, params) try: expected = 'Only Kallithea admin can specify `owner` param' self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_delete_repo(self): repo_name = 'api_delete_me' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) id_, params = _build_data(self.apikey, 'delete_repo', repoid=repo_name, ) response = api_call(self, params) ret = { 'msg': 'Deleted repository `%s`' % repo_name, 'success': True } try: expected = ret self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_delete_repo_by_non_admin(self): repo_name = 'api_delete_me' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE, cur_user=self.TEST_USER_LOGIN) id_, params = _build_data(self.apikey_regular, 'delete_repo', repoid=repo_name, ) response = api_call(self, params) ret = { 'msg': 'Deleted repository `%s`' % repo_name, 'success': True } try: expected = ret self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_delete_repo_by_non_admin_no_permission(self): repo_name = 'api_delete_me' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) try: id_, params = _build_data(self.apikey_regular, 'delete_repo', repoid=repo_name, ) response = api_call(self, params) expected = 'repository `%s` does not exist' % (repo_name) self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_delete_repo_exception_occurred(self): repo_name = 'api_delete_me' fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) try: with mock.patch.object(RepoModel, 'delete', raise_exception): id_, params = _build_data(self.apikey, 'delete_repo', repoid=repo_name, ) response = api_call(self, params) expected = 'failed to delete repository `%s`' % repo_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) def test_api_fork_repo(self): fork_name = 'api-repo-fork' id_, params = _build_data(self.apikey, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) ret = { 'msg': 'Created fork of `%s` as `%s`' % (self.REPO, fork_name), 'success': True, } expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(fork_name) @base.parametrize('fork_name', [ 'api-repo-fork', '%s/api-repo-fork' % TEST_REPO_GROUP, ]) def test_api_fork_repo_non_admin(self, fork_name): RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, self.TEST_USER_LOGIN, 'group.write') id_, params = _build_data(self.apikey_regular, 'fork_repo', repoid=self.REPO, fork_name=fork_name, ) response = api_call(self, params) ret = { 'msg': 'Created fork of `%s` as `%s`' % (self.REPO, fork_name), 'success': True, } expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_repo(fork_name) def test_api_fork_repo_non_admin_specify_owner(self): fork_name = 'api-repo-fork' id_, params = _build_data(self.apikey_regular, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = 'Only Kallithea admin can specify `owner` param' self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(fork_name) def test_api_fork_repo_non_admin_no_permission_to_fork(self): RepoModel().grant_user_permission(repo=self.REPO, user=db.User.DEFAULT_USER_NAME, perm='repository.none') fork_name = 'api-repo-fork' try: id_, params = _build_data(self.apikey_regular, 'fork_repo', repoid=self.REPO, fork_name=fork_name, ) response = api_call(self, params) expected = 'repository `%s` does not exist' % (self.REPO) self._compare_error(id_, expected, given=response.body) finally: RepoModel().grant_user_permission(repo=self.REPO, user=db.User.DEFAULT_USER_NAME, perm='repository.read') fixture.destroy_repo(fork_name) @base.parametrize('name,perm', [ ('read', 'repository.read'), ('write', 'repository.write'), ('admin', 'repository.admin'), ]) def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm): fork_name = 'api-repo-fork' # regardless of base repository permission, forking is disallowed # when repository creation is disabled RepoModel().grant_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN, perm=perm) UserModel().revoke_perm('default', 'hg.create.repository') UserModel().grant_perm('default', 'hg.create.none') id_, params = _build_data(self.apikey_regular, 'fork_repo', repoid=self.REPO, fork_name=fork_name, ) response = api_call(self, params) expected = 'no permission to create top level repo' self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(fork_name) def test_api_fork_repo_unknown_owner(self): fork_name = 'api-repo-fork' owner = 'i-dont-exist' id_, params = _build_data(self.apikey, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=owner, ) response = api_call(self, params) expected = 'user `%s` does not exist' % owner self._compare_error(id_, expected, given=response.body) def test_api_fork_repo_fork_exists(self): fork_name = 'api-repo-fork' fixture.create_fork(self.REPO, fork_name) try: fork_name = 'api-repo-fork' id_, params = _build_data(self.apikey, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = "fork `%s` already exist" % fork_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_repo(fork_name) def test_api_fork_repo_repo_exists(self): fork_name = self.REPO id_, params = _build_data(self.apikey, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = "repo `%s` already exist" % fork_name self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoModel, 'create_fork', raise_exception) def test_api_fork_repo_exception_occurred(self): fork_name = 'api-repo-fork' id_, params = _build_data(self.apikey, 'fork_repo', repoid=self.REPO, fork_name=fork_name, owner=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = 'failed to fork repository `%s` as `%s`' % (self.REPO, fork_name) self._compare_error(id_, expected, given=response.body) def test_api_get_user_group(self): id_, params = _build_data(self.apikey, 'get_user_group', usergroupid=TEST_USER_GROUP) response = api_call(self, params) user_group = UserGroupModel().get_group(TEST_USER_GROUP) members = [] for user in user_group.members: user = user.user members.append(user.get_api_data()) ret = user_group.get_api_data() ret['members'] = members expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_get_user_groups(self): gr_name = 'test_user_group2' make_user_group(gr_name) try: id_, params = _build_data(self.apikey, 'get_user_groups', ) response = api_call(self, params) expected = [] for gr_name in [TEST_USER_GROUP, 'test_user_group2']: user_group = UserGroupModel().get_group(gr_name) ret = user_group.get_api_data() expected.append(ret) self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) def test_api_create_user_group(self): group_name = 'some_new_group' id_, params = _build_data(self.apikey, 'create_user_group', group_name=group_name) response = api_call(self, params) ret = { 'msg': 'created new user group `%s`' % group_name, 'user_group': jsonify(UserGroupModel() \ .get_by_name(group_name) \ .get_api_data()) } expected = ret self._compare_ok(id_, expected, given=response.body) fixture.destroy_user_group(group_name) def test_api_get_user_group_that_exist(self): id_, params = _build_data(self.apikey, 'create_user_group', group_name=TEST_USER_GROUP) response = api_call(self, params) expected = "user group `%s` already exist" % TEST_USER_GROUP self._compare_error(id_, expected, given=response.body) @mock.patch.object(UserGroupModel, 'create', raise_exception) def test_api_get_user_group_exception_occurred(self): group_name = 'exception_happens' id_, params = _build_data(self.apikey, 'create_user_group', group_name=group_name) response = api_call(self, params) expected = 'failed to create group `%s`' % group_name self._compare_error(id_, expected, given=response.body) @base.parametrize('changing_attr,updates', [ ('group_name', {'group_name': 'new_group_name'}), ('group_name', {'group_name': 'test_group_for_update'}), ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), ('active', {'active': False}), ('active', {'active': True}), ]) def test_api_update_user_group(self, changing_attr, updates): gr_name = 'test_group_for_update' user_group = fixture.create_user_group(gr_name) try: id_, params = _build_data(self.apikey, 'update_user_group', usergroupid=gr_name, **updates) response = api_call(self, params) expected = { 'msg': 'updated user group ID:%s %s' % (user_group.users_group_id, user_group.users_group_name), 'user_group': user_group.get_api_data() } self._compare_ok(id_, expected, given=response.body) finally: if changing_attr == 'group_name': # switch to updated name for proper cleanup gr_name = updates['group_name'] fixture.destroy_user_group(gr_name) @mock.patch.object(UserGroupModel, 'update', raise_exception) def test_api_update_user_group_exception_occurred(self): gr_name = 'test_group' fixture.create_user_group(gr_name) try: id_, params = _build_data(self.apikey, 'update_user_group', usergroupid=gr_name) response = api_call(self, params) expected = 'failed to update user group `%s`' % gr_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) def test_api_add_user_to_user_group(self): gr_name = 'test_group' fixture.create_user_group(gr_name) try: id_, params = _build_data(self.apikey, 'add_user_to_user_group', usergroupid=gr_name, userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = { 'msg': 'added member `%s` to user group `%s`' % ( base.TEST_USER_ADMIN_LOGIN, gr_name), 'success': True } self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) def test_api_add_user_to_user_group_that_doesnt_exist(self): id_, params = _build_data(self.apikey, 'add_user_to_user_group', usergroupid='false-group', userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = 'user group `%s` does not exist' % 'false-group' self._compare_error(id_, expected, given=response.body) @mock.patch.object(UserGroupModel, 'add_user_to_group', raise_exception) def test_api_add_user_to_user_group_exception_occurred(self): gr_name = 'test_group' fixture.create_user_group(gr_name) try: id_, params = _build_data(self.apikey, 'add_user_to_user_group', usergroupid=gr_name, userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = 'failed to add member to user group `%s`' % gr_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) def test_api_remove_user_from_user_group(self): gr_name = 'test_group_3' gr = fixture.create_user_group(gr_name) UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN) meta.Session().commit() try: id_, params = _build_data(self.apikey, 'remove_user_from_user_group', usergroupid=gr_name, userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = { 'msg': 'removed member `%s` from user group `%s`' % ( base.TEST_USER_ADMIN_LOGIN, gr_name ), 'success': True} self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) @mock.patch.object(UserGroupModel, 'remove_user_from_group', raise_exception) def test_api_remove_user_from_user_group_exception_occurred(self): gr_name = 'test_group_3' gr = fixture.create_user_group(gr_name) UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN) meta.Session().commit() try: id_, params = _build_data(self.apikey, 'remove_user_from_user_group', usergroupid=gr_name, userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = 'failed to remove member from user group `%s`' % gr_name self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) def test_api_delete_user_group(self): gr_name = 'test_group' ugroup = fixture.create_user_group(gr_name) gr_id = ugroup.users_group_id try: id_, params = _build_data(self.apikey, 'delete_user_group', usergroupid=gr_name) response = api_call(self, params) expected = { 'user_group': None, 'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name) } self._compare_ok(id_, expected, given=response.body) finally: if UserGroupModel().get_by_name(gr_name): fixture.destroy_user_group(gr_name) def test_api_delete_user_group_that_is_assigned(self): gr_name = 'test_group' ugroup = fixture.create_user_group(gr_name) gr_id = ugroup.users_group_id ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write') msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name try: id_, params = _build_data(self.apikey, 'delete_user_group', usergroupid=gr_name) response = api_call(self, params) expected = msg self._compare_error(id_, expected, given=response.body) finally: if UserGroupModel().get_by_name(gr_name): fixture.destroy_user_group(gr_name) def test_api_delete_user_group_exception_occurred(self): gr_name = 'test_group' ugroup = fixture.create_user_group(gr_name) gr_id = ugroup.users_group_id id_, params = _build_data(self.apikey, 'delete_user_group', usergroupid=gr_name) try: with mock.patch.object(UserGroupModel, 'delete', raise_exception): response = api_call(self, params) expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name) self._compare_error(id_, expected, given=response.body) finally: fixture.destroy_user_group(gr_name) @base.parametrize('name,perm', [ ('none', 'repository.none'), ('read', 'repository.read'), ('write', 'repository.write'), ('admin', 'repository.admin'), ]) def test_api_grant_user_permission(self, name, perm): id_, params = _build_data(self.apikey, 'grant_user_permission', repoid=self.REPO, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm) response = api_call(self, params) ret = { 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % ( perm, base.TEST_USER_ADMIN_LOGIN, self.REPO ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_grant_user_permission_wrong_permission(self): perm = 'haha.no.permission' id_, params = _build_data(self.apikey, 'grant_user_permission', repoid=self.REPO, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm) response = api_call(self, params) expected = 'permission `%s` does not exist' % perm self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoModel, 'grant_user_permission', raise_exception) def test_api_grant_user_permission_exception_when_adding(self): perm = 'repository.read' id_, params = _build_data(self.apikey, 'grant_user_permission', repoid=self.REPO, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm) response = api_call(self, params) expected = 'failed to edit permission for user: `%s` in repo: `%s`' % ( base.TEST_USER_ADMIN_LOGIN, self.REPO ) self._compare_error(id_, expected, given=response.body) def test_api_revoke_user_permission(self): id_, params = _build_data(self.apikey, 'revoke_user_permission', repoid=self.REPO, userid=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = { 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % ( base.TEST_USER_ADMIN_LOGIN, self.REPO ), 'success': True } self._compare_ok(id_, expected, given=response.body) @mock.patch.object(RepoModel, 'revoke_user_permission', raise_exception) def test_api_revoke_user_permission_exception_when_adding(self): id_, params = _build_data(self.apikey, 'revoke_user_permission', repoid=self.REPO, userid=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = 'failed to edit permission for user: `%s` in repo: `%s`' % ( base.TEST_USER_ADMIN_LOGIN, self.REPO ) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,perm', [ ('none', 'repository.none'), ('read', 'repository.read'), ('write', 'repository.write'), ('admin', 'repository.admin'), ]) def test_api_grant_user_group_permission(self, name, perm): id_, params = _build_data(self.apikey, 'grant_user_group_permission', repoid=self.REPO, usergroupid=TEST_USER_GROUP, perm=perm) response = api_call(self, params) ret = { 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % ( perm, TEST_USER_GROUP, self.REPO ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) def test_api_grant_user_group_permission_wrong_permission(self): perm = 'haha.no.permission' id_, params = _build_data(self.apikey, 'grant_user_group_permission', repoid=self.REPO, usergroupid=TEST_USER_GROUP, perm=perm) response = api_call(self, params) expected = 'permission `%s` does not exist' % perm self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoModel, 'grant_user_group_permission', raise_exception) def test_api_grant_user_group_permission_exception_when_adding(self): perm = 'repository.read' id_, params = _build_data(self.apikey, 'grant_user_group_permission', repoid=self.REPO, usergroupid=TEST_USER_GROUP, perm=perm) response = api_call(self, params) expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % ( TEST_USER_GROUP, self.REPO ) self._compare_error(id_, expected, given=response.body) def test_api_revoke_user_group_permission(self): RepoModel().grant_user_group_permission(repo=self.REPO, group_name=TEST_USER_GROUP, perm='repository.read') meta.Session().commit() id_, params = _build_data(self.apikey, 'revoke_user_group_permission', repoid=self.REPO, usergroupid=TEST_USER_GROUP, ) response = api_call(self, params) expected = { 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % ( TEST_USER_GROUP, self.REPO ), 'success': True } self._compare_ok(id_, expected, given=response.body) @mock.patch.object(RepoModel, 'revoke_user_group_permission', raise_exception) def test_api_revoke_user_group_permission_exception_when_adding(self): id_, params = _build_data(self.apikey, 'revoke_user_group_permission', repoid=self.REPO, usergroupid=TEST_USER_GROUP, ) response = api_call(self, params) expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % ( TEST_USER_GROUP, self.REPO ) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,perm,apply_to_children', [ ('none', 'group.none', 'none'), ('read', 'group.read', 'none'), ('write', 'group.write', 'none'), ('admin', 'group.admin', 'none'), ('none', 'group.none', 'all'), ('read', 'group.read', 'all'), ('write', 'group.write', 'all'), ('admin', 'group.admin', 'all'), ('none', 'group.none', 'repos'), ('read', 'group.read', 'repos'), ('write', 'group.write', 'repos'), ('admin', 'group.admin', 'repos'), ('none', 'group.none', 'groups'), ('read', 'group.read', 'groups'), ('write', 'group.write', 'groups'), ('admin', 'group.admin', 'groups'), ]) def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children): id_, params = _build_data(self.apikey, 'grant_user_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm, apply_to_children=apply_to_children) response = api_call(self, params) ret = { 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % ( perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [ ('none_fails', 'group.none', 'none', False, False), ('read_fails', 'group.read', 'none', False, False), ('write_fails', 'group.write', 'none', False, False), ('admin_fails', 'group.admin', 'none', False, False), # with granted perms ('none_ok', 'group.none', 'none', True, True), ('read_ok', 'group.read', 'none', True, True), ('write_ok', 'group.write', 'none', True, True), ('admin_ok', 'group.admin', 'none', True, True), ]) def test_api_grant_user_permission_to_repo_group_by_regular_user( self, name, perm, apply_to_children, grant_admin, access_ok): if grant_admin: RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, self.TEST_USER_LOGIN, 'group.admin') meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'grant_user_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm, apply_to_children=apply_to_children) response = api_call(self, params) if access_ok: ret = { 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % ( perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) else: expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP self._compare_error(id_, expected, given=response.body) def test_api_grant_user_permission_to_repo_group_wrong_permission(self): perm = 'haha.no.permission' id_, params = _build_data(self.apikey, 'grant_user_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm) response = api_call(self, params) expected = 'permission `%s` does not exist' % perm self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoGroupModel, 'grant_user_permission', raise_exception) def test_api_grant_user_permission_to_repo_group_exception_when_adding(self): perm = 'group.read' id_, params = _build_data(self.apikey, 'grant_user_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, perm=perm) response = api_call(self, params) expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % ( base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,apply_to_children', [ ('none', 'none'), ('all', 'all'), ('repos', 'repos'), ('groups', 'groups'), ]) def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children): RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP, user=base.TEST_USER_ADMIN_LOGIN, perm='group.read',) meta.Session().commit() id_, params = _build_data(self.apikey, 'revoke_user_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, apply_to_children=apply_to_children,) response = api_call(self, params) expected = { 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % ( apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ), 'success': True } self._compare_ok(id_, expected, given=response.body) @base.parametrize('name,apply_to_children,grant_admin,access_ok', [ ('none', 'none', False, False), ('all', 'all', False, False), ('repos', 'repos', False, False), ('groups', 'groups', False, False), # after granting admin rights ('none', 'none', False, False), ('all', 'all', False, False), ('repos', 'repos', False, False), ('groups', 'groups', False, False), ]) def test_api_revoke_user_permission_from_repo_group_by_regular_user( self, name, apply_to_children, grant_admin, access_ok): RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP, user=base.TEST_USER_ADMIN_LOGIN, perm='group.read',) meta.Session().commit() if grant_admin: RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, self.TEST_USER_LOGIN, 'group.admin') meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'revoke_user_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, apply_to_children=apply_to_children,) response = api_call(self, params) if access_ok: expected = { 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % ( apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ), 'success': True } self._compare_ok(id_, expected, given=response.body) else: expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoGroupModel, 'revoke_user_permission', raise_exception) def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self): id_, params = _build_data(self.apikey, 'revoke_user_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, userid=base.TEST_USER_ADMIN_LOGIN, ) response = api_call(self, params) expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % ( base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,perm,apply_to_children', [ ('none', 'group.none', 'none'), ('read', 'group.read', 'none'), ('write', 'group.write', 'none'), ('admin', 'group.admin', 'none'), ('none', 'group.none', 'all'), ('read', 'group.read', 'all'), ('write', 'group.write', 'all'), ('admin', 'group.admin', 'all'), ('none', 'group.none', 'repos'), ('read', 'group.read', 'repos'), ('write', 'group.write', 'repos'), ('admin', 'group.admin', 'repos'), ('none', 'group.none', 'groups'), ('read', 'group.read', 'groups'), ('write', 'group.write', 'groups'), ('admin', 'group.admin', 'groups'), ]) def test_api_grant_user_group_permission_to_repo_group(self, name, perm, apply_to_children): id_, params = _build_data(self.apikey, 'grant_user_group_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, perm=perm, apply_to_children=apply_to_children,) response = api_call(self, params) ret = { 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % ( perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [ ('none_fails', 'group.none', 'none', False, False), ('read_fails', 'group.read', 'none', False, False), ('write_fails', 'group.write', 'none', False, False), ('admin_fails', 'group.admin', 'none', False, False), # with granted perms ('none_ok', 'group.none', 'none', True, True), ('read_ok', 'group.read', 'none', True, True), ('write_ok', 'group.write', 'none', True, True), ('admin_ok', 'group.admin', 'none', True, True), ]) def test_api_grant_user_group_permission_to_repo_group_by_regular_user( self, name, perm, apply_to_children, grant_admin, access_ok): if grant_admin: RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, self.TEST_USER_LOGIN, 'group.admin') meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'grant_user_group_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, perm=perm, apply_to_children=apply_to_children,) response = api_call(self, params) if access_ok: ret = { 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % ( perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP ), 'success': True } expected = ret self._compare_ok(id_, expected, given=response.body) else: expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP self._compare_error(id_, expected, given=response.body) def test_api_grant_user_group_permission_to_repo_group_wrong_permission(self): perm = 'haha.no.permission' id_, params = _build_data(self.apikey, 'grant_user_group_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, perm=perm) response = api_call(self, params) expected = 'permission `%s` does not exist' % perm self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', raise_exception) def test_api_grant_user_group_permission_exception_when_adding_to_repo_group(self): perm = 'group.read' id_, params = _build_data(self.apikey, 'grant_user_group_permission_to_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, perm=perm) response = api_call(self, params) expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % ( TEST_USER_GROUP, TEST_REPO_GROUP ) self._compare_error(id_, expected, given=response.body) @base.parametrize('name,apply_to_children', [ ('none', 'none'), ('all', 'all'), ('repos', 'repos'), ('groups', 'groups'), ]) def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children): RepoGroupModel().grant_user_group_permission(repo_group=TEST_REPO_GROUP, group_name=TEST_USER_GROUP, perm='group.read',) meta.Session().commit() id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, apply_to_children=apply_to_children,) response = api_call(self, params) expected = { 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % ( apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP ), 'success': True } self._compare_ok(id_, expected, given=response.body) @base.parametrize('name,apply_to_children,grant_admin,access_ok', [ ('none', 'none', False, False), ('all', 'all', False, False), ('repos', 'repos', False, False), ('groups', 'groups', False, False), # after granting admin rights ('none', 'none', False, False), ('all', 'all', False, False), ('repos', 'repos', False, False), ('groups', 'groups', False, False), ]) def test_api_revoke_user_group_permission_from_repo_group_by_regular_user( self, name, apply_to_children, grant_admin, access_ok): RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP, user=base.TEST_USER_ADMIN_LOGIN, perm='group.read',) meta.Session().commit() if grant_admin: RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, self.TEST_USER_LOGIN, 'group.admin') meta.Session().commit() id_, params = _build_data(self.apikey_regular, 'revoke_user_group_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP, apply_to_children=apply_to_children,) response = api_call(self, params) if access_ok: expected = { 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % ( apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP ), 'success': True } self._compare_ok(id_, expected, given=response.body) else: expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP self._compare_error(id_, expected, given=response.body) @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', raise_exception) def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self): id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group', repogroupid=TEST_REPO_GROUP, usergroupid=TEST_USER_GROUP,) response = api_call(self, params) expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % ( TEST_USER_GROUP, TEST_REPO_GROUP ) self._compare_error(id_, expected, given=response.body) def test_api_get_gist(self): gist = fixture.create_gist() gist_id = gist.gist_access_id gist_created_on = gist.created_on id_, params = _build_data(self.apikey, 'get_gist', gistid=gist_id, ) response = api_call(self, params) expected = { 'access_id': gist_id, 'created_on': gist_created_on, 'description': 'new-gist', 'expires': -1.0, 'gist_id': int(gist_id), 'type': 'public', 'url': 'http://localhost:80/_admin/gists/%s' % gist_id } self._compare_ok(id_, expected, given=response.body) def test_api_get_gist_that_does_not_exist(self): id_, params = _build_data(self.apikey_regular, 'get_gist', gistid='12345', ) response = api_call(self, params) expected = 'gist `%s` does not exist' % ('12345',) self._compare_error(id_, expected, given=response.body) def test_api_get_gist_private_gist_without_permission(self): gist = fixture.create_gist() gist_id = gist.gist_access_id gist_created_on = gist.created_on id_, params = _build_data(self.apikey_regular, 'get_gist', gistid=gist_id, ) response = api_call(self, params) expected = 'gist `%s` does not exist' % gist_id self._compare_error(id_, expected, given=response.body) def test_api_get_gists(self): fixture.create_gist() fixture.create_gist() id_, params = _build_data(self.apikey, 'get_gists') response = api_call(self, params) expected = response.json assert len(response.json['result']) == 2 #self._compare_ok(id_, expected, given=response.body) def test_api_get_gists_regular_user(self): # by admin fixture.create_gist() fixture.create_gist() # by reg user fixture.create_gist(owner=self.TEST_USER_LOGIN) fixture.create_gist(owner=self.TEST_USER_LOGIN) fixture.create_gist(owner=self.TEST_USER_LOGIN) id_, params = _build_data(self.apikey_regular, 'get_gists') response = api_call(self, params) expected = response.json assert len(response.json['result']) == 3 #self._compare_ok(id_, expected, given=response.body) def test_api_get_gists_only_for_regular_user(self): # by admin fixture.create_gist() fixture.create_gist() # by reg user fixture.create_gist(owner=self.TEST_USER_LOGIN) fixture.create_gist(owner=self.TEST_USER_LOGIN) fixture.create_gist(owner=self.TEST_USER_LOGIN) id_, params = _build_data(self.apikey, 'get_gists', userid=self.TEST_USER_LOGIN) response = api_call(self, params) expected = response.json assert len(response.json['result']) == 3 #self._compare_ok(id_, expected, given=response.body) def test_api_get_gists_regular_user_with_different_userid(self): id_, params = _build_data(self.apikey_regular, 'get_gists', userid=base.TEST_USER_ADMIN_LOGIN) response = api_call(self, params) expected = 'userid is not the same as your user' self._compare_error(id_, expected, given=response.body) def test_api_create_gist(self): id_, params = _build_data(self.apikey_regular, 'create_gist', lifetime=10, description='foobar-gist', gist_type='public', files={'foobar': {'content': 'foo'}}) response = api_call(self, params) expected = { 'gist': { 'access_id': response.json['result']['gist']['access_id'], 'created_on': response.json['result']['gist']['created_on'], 'description': 'foobar-gist', 'expires': response.json['result']['gist']['expires'], 'gist_id': response.json['result']['gist']['gist_id'], 'type': 'public', 'url': response.json['result']['gist']['url'] }, 'msg': 'created new gist' } self._compare_ok(id_, expected, given=response.body) @mock.patch.object(GistModel, 'create', raise_exception) def test_api_create_gist_exception_occurred(self): id_, params = _build_data(self.apikey_regular, 'create_gist', files={}) response = api_call(self, params) expected = 'failed to create gist' self._compare_error(id_, expected, given=response.body) def test_api_delete_gist(self): gist_id = fixture.create_gist().gist_access_id id_, params = _build_data(self.apikey, 'delete_gist', gistid=gist_id) response = api_call(self, params) expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id} self._compare_ok(id_, expected, given=response.body) def test_api_delete_gist_regular_user(self): gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id id_, params = _build_data(self.apikey_regular, 'delete_gist', gistid=gist_id) response = api_call(self, params) expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id} self._compare_ok(id_, expected, given=response.body) def test_api_delete_gist_regular_user_no_permission(self): gist_id = fixture.create_gist().gist_access_id id_, params = _build_data(self.apikey_regular, 'delete_gist', gistid=gist_id) response = api_call(self, params) expected = 'gist `%s` does not exist' % (gist_id,) self._compare_error(id_, expected, given=response.body) @mock.patch.object(GistModel, 'delete', raise_exception) def test_api_delete_gist_exception_occurred(self): gist_id = fixture.create_gist().gist_access_id id_, params = _build_data(self.apikey, 'delete_gist', gistid=gist_id) response = api_call(self, params) expected = 'failed to delete gist ID:%s' % (gist_id,) self._compare_error(id_, expected, given=response.body) def test_api_get_ip(self): id_, params = _build_data(self.apikey, 'get_ip') response = api_call(self, params) expected = { 'server_ip_addr': '0.0.0.0', 'user_ips': [] } self._compare_ok(id_, expected, given=response.body) def test_api_get_server_info(self): id_, params = _build_data(self.apikey, 'get_server_info') response = api_call(self, params) expected = db.Setting.get_server_info() self._compare_ok(id_, expected, given=response.body) def test_api_get_changesets(self): id_, params = _build_data(self.apikey, 'get_changesets', repoid=self.REPO, start=0, end=2) response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert len(result) == 3 assert 'message' in result[0] assert 'added' not in result[0] def test_api_get_changesets_with_max_revisions(self): id_, params = _build_data(self.apikey, 'get_changesets', repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10) response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert len(result) == 10 assert 'message' in result[0] assert 'added' not in result[0] def test_api_get_changesets_with_branch(self): if self.REPO == 'vcs_test_hg': branch = 'stable' else: pytest.skip("skipping due to missing branches in git test repo") id_, params = _build_data(self.apikey, 'get_changesets', repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00") response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert len(result) == 5 assert 'message' in result[0] assert 'added' not in result[0] def test_api_get_changesets_with_file_list(self): id_, params = _build_data(self.apikey, 'get_changesets', repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True) response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert len(result) == 3 assert 'message' in result[0] assert 'added' in result[0] def test_api_get_changeset(self): review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved") id_, params = _build_data(self.apikey, 'get_changeset', repoid=self.REPO, raw_id=self.TEST_REVISION) response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert result["raw_id"] == self.TEST_REVISION assert "reviews" not in result def test_api_get_changeset_with_reviews(self): reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved") id_, params = _build_data(self.apikey, 'get_changeset', repoid=self.REPO, raw_id=self.TEST_REVISION, with_reviews=True) response = api_call(self, params) result = ext_json.loads(response.body)["result"] assert result["raw_id"] == self.TEST_REVISION assert "reviews" in result assert len(result["reviews"]) == 1 review = result["reviews"][0] expected = { 'status': 'approved', 'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(), 'reviewer': 'test_admin', } assert review == expected def test_api_get_changeset_that_does_not_exist(self): """ Fetch changeset status for non-existant changeset. revision id is the above git hash used in the test above with the last 3 nibbles replaced with 0xf. Should not exist for git _or_ hg. """ id_, params = _build_data(self.apikey, 'get_changeset', repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff') response = api_call(self, params) expected = 'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',) self._compare_error(id_, expected, given=response.body) def test_api_get_changeset_without_permission(self): review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved") RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN) RepoModel().revoke_user_permission(repo=self.REPO, user="default") id_, params = _build_data(self.apikey_regular, 'get_changeset', repoid=self.REPO, raw_id=self.TEST_REVISION) response = api_call(self, params) expected = 'Access denied to repo %s' % self.REPO self._compare_error(id_, expected, given=response.body) def test_api_get_pullrequest(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test') random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey, "method": 'get_pullrequest', "args": {"pullrequest_id": pull_request_id}, })) response = api_call(self, params) pullrequest = db.PullRequest().get(pull_request_id) expected = { "status": "new", "pull_request_id": pull_request_id, "description": "No description", "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"), "reviewers": [{"username": "test_regular"}], "org_repo_url": "http://localhost:80/%s" % self.REPO, "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC], "other_ref_parts": ["branch", "default", self.TEST_PR_DST], "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "", "comment_id": pullrequest.comments[0].comment_id}], "owner": base.TEST_USER_ADMIN_LOGIN, "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))], "title": "get test", "revisions": self.TEST_PR_REVISIONS, "created_on": "2000-01-01T00:00:00", "updated_on": "2000-01-01T00:00:00", } self._compare_ok(random_id, expected, given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d", b"2000-01-01T00:00:00", response.body)) def test_api_close_pullrequest(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test') random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey, "method": "comment_pullrequest", "args": {"pull_request_id": pull_request_id, "close_pr": True}, })) response = api_call(self, params) self._compare_ok(random_id, True, given=response.body) pullrequest = db.PullRequest().get(pull_request_id) assert pullrequest.comments[-1].text == '' assert pullrequest.status == db.PullRequest.STATUS_CLOSED assert pullrequest.is_closed() == True def test_api_status_pullrequest(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test") random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key, "method": "comment_pullrequest", "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED}, })) response = api_call(self, params) pullrequest = db.PullRequest().get(pull_request_id) self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body) assert db.ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2] params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key, "method": "comment_pullrequest", "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED}, })) response = api_call(self, params) self._compare_ok(random_id, True, given=response.body) pullrequest = db.PullRequest().get(pull_request_id) assert db.ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2] def test_api_comment_pullrequest(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test") random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey, "method": "comment_pullrequest", "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"}, })) response = api_call(self, params) self._compare_ok(random_id, True, given=response.body) pullrequest = db.PullRequest().get(pull_request_id) assert pullrequest.comments[-1].text == 'Looks good to me' def test_api_edit_reviewers_add_single(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN}, })) response = api_call(self, params) expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [], 'removed': [] } self._compare_ok(random_id, expected, given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_add_nonexistent(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": 999}, })) response = api_call(self, params) self._compare_error(random_id, "user `999` does not exist", given=response.body) def test_api_edit_reviewers_add_multiple(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": { "pull_request_id": pull_request_id, "add": [ self.TEST_USER_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] }, })) response = api_call(self, params) # list order depends on python sorting hash, which is randomized assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN, self.TEST_USER_LOGIN]) assert set(ext_json.loads(response.body)['result']['already_present']) == set() assert set(ext_json.loads(response.body)['result']['removed']) == set() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(self.TEST_USER_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_add_already_present(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": { "pull_request_id": pull_request_id, "add": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] }, })) response = api_call(self, params) expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [base.TEST_USER_REGULAR_LOGIN], 'removed': [], } self._compare_ok(random_id, expected, given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_add_closed(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = self.test_user PullRequestModel().close_pull_request(pull_request_id) random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN}, })) response = api_call(self, params) self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body) def test_api_edit_reviewers_add_not_owner(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN}, })) response = api_call(self, params) self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body) def test_api_edit_reviewers_remove_single(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN}, })) response = api_call(self, params) expected = { 'added': [], 'already_present': [], 'removed': [base.TEST_USER_REGULAR_LOGIN], } self._compare_ok(random_id, expected, given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users() def test_api_edit_reviewers_remove_nonexistent(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": 999}, })) response = api_call(self, params) self._compare_error(random_id, "user `999` does not exist", given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_remove_nonpresent(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR2_LOGIN}, })) response = api_call(self, params) # NOTE: no explicit indication that removed user was not even a reviewer expected = { 'added': [], 'already_present': [], 'removed': [base.TEST_USER_REGULAR2_LOGIN], } self._compare_ok(random_id, expected, given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users() def test_api_edit_reviewers_remove_multiple(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN), pullrequest) meta.Session().add(prr) meta.Session().commit() assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] }, })) response = api_call(self, params) # list order depends on python sorting hash, which is randomized assert set(ext_json.loads(response.body)['result']['added']) == set() assert set(ext_json.loads(response.body)['result']['already_present']) == set() assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN]) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users() def test_api_edit_reviewers_remove_closed(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() PullRequestModel().close_pull_request(pull_request_id) pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN}, })) response = api_call(self, params) self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_remove_not_owner(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN}, })) response = api_call(self, params) self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_add_remove_single(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN, "remove": base.TEST_USER_REGULAR_LOGIN }, })) response = api_call(self, params) expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [], 'removed': [base.TEST_USER_REGULAR_LOGIN], } self._compare_ok(random_id, expected, given=response.body) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_add_remove_multiple(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN), pullrequest) meta.Session().add(prr) meta.Session().commit() assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users() pullrequest.owner = self.test_user random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id, "add": [ base.TEST_USER_REGULAR2_LOGIN ], "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN ], }, })) response = api_call(self, params) # list order depends on python sorting hash, which is randomized assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN]) assert set(ext_json.loads(response.body)['result']['already_present']) == set() assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN]) assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) not in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users() assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users() def test_api_edit_reviewers_invalid_params(self): pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test') pullrequest = db.PullRequest().get(pull_request_id) assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users() pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) random_id = random.randrange(1, 9999) params = ascii_bytes(ext_json.dumps({ "id": random_id, "api_key": self.apikey_regular, "method": "edit_reviewers", "args": {"pull_request_id": pull_request_id}, })) response = api_call(self, params) self._compare_error(random_id, "Invalid request. Neither 'add' nor 'remove' is specified.", given=response.body) assert ext_json.loads(response.body)['result'] is None