Mercurial > kallithea
changeset 2526:473794943022 beta
Refactored API
- added ~100% line coverage tests for api
- test against hg and git
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Sun, 01 Jul 2012 16:22:38 +0200 |
parents | c35980ae7958 |
children | 95624ce4465f |
files | rhodecode/controllers/api/api.py rhodecode/tests/api/__init__.py rhodecode/tests/api/api_base.py rhodecode/tests/api/test_api_git.py rhodecode/tests/api/test_api_hg.py |
diffstat | 4 files changed, 1298 insertions(+), 370 deletions(-) [+] |
line wrap: on
line diff
--- a/rhodecode/controllers/api/api.py Sun Jul 01 16:11:38 2012 +0200 +++ b/rhodecode/controllers/api/api.py Sun Jul 01 16:22:38 2012 +0200 @@ -31,18 +31,99 @@ from rhodecode.controllers.api import JSONRPCController, JSONRPCError from rhodecode.lib.auth import HasPermissionAllDecorator, \ HasPermissionAnyDecorator, PasswordGenerator, AuthUser - +from rhodecode.lib.utils import map_groups from rhodecode.model.meta import Session from rhodecode.model.scm import ScmModel -from rhodecode.model.db import User, UsersGroup, Repository from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.model.users_group import UsersGroupModel -from rhodecode.lib.utils import map_groups +from rhodecode.model.permission import PermissionModel log = logging.getLogger(__name__) +class Optional(object): + """ + Defines an optional parameter:: + + param = param.getval() if isinstance(param, Optional) else param + param = param() if isinstance(param, Optional) else param + + is equivalent of:: + + param = Optional.extract(param) + + """ + def __init__(self, type_): + self.type_ = type_ + + def __repr__(self): + return '<Optional:%s>' % self.type_.__repr__() + + def __call__(self): + return self.getval() + + def getval(self): + """ + returns value from this Optional instance + """ + return self.type_ + + @classmethod + def extract(cls, val): + if isinstance(val, cls): + return val.getval() + return val + + +def get_user_or_error(userid): + """ + Get user by id or name or return JsonRPCError if not found + + :param userid: + """ + user = UserModel().get_user(userid) + if user is None: + raise JSONRPCError("user `%s` does not exist" % userid) + return user + + +def get_repo_or_error(repoid): + """ + Get repo by id or name or return JsonRPCError if not found + + :param userid: + """ + repo = RepoModel().get_repo(repoid) + if repo is None: + raise JSONRPCError('repository `%s` does not exist' % (repoid)) + return repo + + +def get_users_group_or_error(usersgroupid): + """ + Get users group by id or name or return JsonRPCError if not found + + :param userid: + """ + users_group = UsersGroupModel().get_group(usersgroupid) + if users_group is None: + raise JSONRPCError('users group `%s` does not exist' % usersgroupid) + return users_group + + +def get_perm_or_error(permid): + """ + Get permission by id or name or return JsonRPCError if not found + + :param userid: + """ + perm = PermissionModel().get_permission_by_name(permid) + if perm is None: + raise JSONRPCError('permission `%s` does not exist' % (permid)) + return perm + + class ApiController(JSONRPCController): """ API Controller @@ -60,23 +141,25 @@ """ @HasPermissionAllDecorator('hg.admin') - def pull(self, apiuser, repo_name): + def pull(self, apiuser, repoid): """ Dispatch pull action on given repo - - :param user: - :param repo_name: + :param apiuser: + :param repoid: """ - if Repository.is_valid(repo_name) is False: - raise JSONRPCError('Unknown repo "%s"' % repo_name) + repo = get_repo_or_error(repoid) try: - ScmModel().pull_changes(repo_name, self.rhodecode_user.username) - return 'Pulled from %s' % repo_name + ScmModel().pull_changes(repo.repo_name, + self.rhodecode_user.username) + return 'Pulled from `%s`' % repo.repo_name except Exception: - raise JSONRPCError('Unable to pull changes from "%s"' % repo_name) + log.error(traceback.format_exc()) + raise JSONRPCError( + 'Unable to pull changes from `%s`' % repo.repo_name + ) @HasPermissionAllDecorator('hg.admin') def get_user(self, apiuser, userid): @@ -84,26 +167,13 @@ Get a user by username :param apiuser: - :param username: + :param userid: """ - user = UserModel().get_user(userid) - if user is None: - return user - - return dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - emails=user.emails, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - permissions=AuthUser(user_id=user.user_id).permissions - ) + user = get_user_or_error(userid) + data = user.get_api_data() + data['permissions'] = AuthUser(user_id=user.user_id).permissions + return data @HasPermissionAllDecorator('hg.admin') def get_users(self, apiuser): @@ -114,43 +184,34 @@ """ result = [] - for user in User.getAll(): - result.append( - dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) - ) + for user in UserModel().get_all(): + result.append(user.get_api_data()) return result @HasPermissionAllDecorator('hg.admin') - def create_user(self, apiuser, username, email, password, firstname=None, - lastname=None, active=True, admin=False, ldap_dn=None): + def create_user(self, apiuser, username, email, password, + firstname=Optional(None), lastname=Optional(None), + active=Optional(True), admin=Optional(False), + ldap_dn=Optional(None)): """ Create new user :param apiuser: :param username: + :param email: :param password: - :param email: - :param name: + :param firstname: :param lastname: :param active: :param admin: :param ldap_dn: """ - if User.get_by_username(username): - raise JSONRPCError("user %s already exist" % username) - if User.get_by_email(email, case_insensitive=True): - raise JSONRPCError("email %s already exist" % email) + if UserModel().get_by_username(username): + raise JSONRPCError("user `%s` already exist" % username) + + if UserModel().get_by_email(email, case_insensitive=True): + raise JSONRPCError("email `%s` already exist" % email) if ldap_dn: # generate temporary password if ldap_dn @@ -158,73 +219,73 @@ try: user = UserModel().create_or_update( - username, password, email, firstname, - lastname, active, admin, ldap_dn + username=Optional.extract(username), + password=Optional.extract(password), + email=Optional.extract(email), + firstname=Optional.extract(firstname), + lastname=Optional.extract(lastname), + active=Optional.extract(active), + admin=Optional.extract(admin), + ldap_dn=Optional.extract(ldap_dn) ) - Session.commit() + Session().commit() return dict( - id=user.user_id, - msg='created new user %s' % username, - user=dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) + msg='created new user `%s`' % username, + user=user.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create user %s' % username) + raise JSONRPCError('failed to create user `%s`' % username) @HasPermissionAllDecorator('hg.admin') - def update_user(self, apiuser, userid, username, password, email, - firstname, lastname, active, admin, ldap_dn): + def update_user(self, apiuser, userid, username=Optional(None), + email=Optional(None), firstname=Optional(None), + lastname=Optional(None), active=Optional(None), + admin=Optional(None), ldap_dn=Optional(None), + password=Optional(None)): """ Updates given user :param apiuser: + :param userid: :param username: - :param password: :param email: - :param name: + :param firstname: :param lastname: :param active: :param admin: :param ldap_dn: + :param password: """ - usr = UserModel().get_user(userid) - if not usr: - raise JSONRPCError("user ID:%s does not exist" % userid) + + user = get_user_or_error(userid) + + #return old attribute if Optional is passed. We don't change parameter + # so user doesn't get updated parameters + get = lambda attr, name: ( + getattr(user, name) if isinstance(attr, Optional) else attr + ) try: + user = UserModel().create_or_update( - username, password, email, firstname, - lastname, active, admin, ldap_dn + username=get(username, 'username'), + password=get(password, 'password'), + email=get(email, 'email'), + firstname=get(firstname, 'name'), + lastname=get(lastname, 'lastname'), + active=get(active, 'active'), + admin=get(admin, 'admin'), + ldap_dn=get(ldap_dn, 'ldap_dn') ) - Session.commit() + Session().commit() return dict( - id=usr.user_id, msg='updated user ID:%s %s' % (user.user_id, user.username), - user=dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) + user=user.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to update user %s' % userid) + raise JSONRPCError('failed to update user `%s`' % userid) @HasPermissionAllDecorator('hg.admin') def delete_user(self, apiuser, userid): @@ -232,52 +293,40 @@ Deletes an user :param apiuser: + :param userid: """ - usr = UserModel().get_user(userid) - if not usr: - raise JSONRPCError("user ID:%s does not exist" % userid) + user = get_user_or_error(userid) try: UserModel().delete(userid) - Session.commit() + Session().commit() return dict( - id=usr.user_id, - msg='deleted user ID:%s %s' % (usr.user_id, usr.username) + msg='deleted user ID:%s %s' % (user.user_id, user.username), + user=None ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to delete ID:%s %s' % (usr.user_id, - usr.username)) + raise JSONRPCError('failed to delete ID:%s %s' % (user.user_id, + user.username)) @HasPermissionAllDecorator('hg.admin') - def get_users_group(self, apiuser, group_name): + def get_users_group(self, apiuser, usersgroupid): """" - Get users group by name + Get users group by name or id :param apiuser: - :param group_name: + :param usersgroupid: """ + users_group = get_users_group_or_error(usersgroupid) - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - return None + data = users_group.get_api_data() members = [] for user in users_group.members: user = user.user - members.append(dict(id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn)) - - return dict(id=users_group.users_group_id, - group_name=users_group.users_group_name, - active=users_group.users_group_active, - members=members) + members.append(user.get_api_data()) + data['members'] = members + return data @HasPermissionAllDecorator('hg.admin') def get_users_groups(self, apiuser): @@ -288,107 +337,96 @@ """ result = [] - for users_group in UsersGroup.getAll(): - members = [] - for user in users_group.members: - user = user.user - members.append(dict(id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn)) - - result.append(dict(id=users_group.users_group_id, - group_name=users_group.users_group_name, - active=users_group.users_group_active, - members=members)) + for users_group in UsersGroupModel().get_all(): + result.append(users_group.get_api_data()) return result @HasPermissionAllDecorator('hg.admin') - def create_users_group(self, apiuser, group_name, active=True): + def create_users_group(self, apiuser, group_name, active=Optional(True)): """ Creates an new usergroup + :param apiuser: :param group_name: :param active: """ - if self.get_users_group(apiuser, group_name): - raise JSONRPCError("users group %s already exist" % group_name) + if UsersGroupModel().get_by_name(group_name): + raise JSONRPCError("users group `%s` already exist" % group_name) try: + active = Optional.extract(active) ug = UsersGroupModel().create(name=group_name, active=active) - Session.commit() - return dict(id=ug.users_group_id, - msg='created new users group %s' % group_name) + Session().commit() + return dict( + msg='created new users group `%s`' % group_name, + users_group=ug.get_api_data() + ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create group %s' % group_name) + raise JSONRPCError('failed to create group `%s`' % group_name) @HasPermissionAllDecorator('hg.admin') - def add_user_to_users_group(self, apiuser, group_name, username): + def add_user_to_users_group(self, apiuser, usersgroupid, userid): """" Add a user to a users group :param apiuser: - :param group_name: - :param username: + :param usersgroupid: + :param userid: """ + user = get_user_or_error(userid) + users_group = get_users_group_or_error(usersgroupid) try: - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - raise JSONRPCError('unknown users group %s' % group_name) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - ugm = UsersGroupModel().add_user_to_group(users_group, user) success = True if ugm != True else False - msg = 'added member %s to users group %s' % (username, group_name) + msg = 'added member `%s` to users group `%s`' % ( + user.username, users_group.users_group_name + ) msg = msg if success else 'User is already in that group' - Session.commit() + Session().commit() return dict( - id=ugm.users_group_member_id if ugm != True else None, success=success, msg=msg ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to add users group member') + raise JSONRPCError( + 'failed to add member to users group `%s`' % ( + users_group.users_group_name + ) + ) @HasPermissionAllDecorator('hg.admin') - def remove_user_from_users_group(self, apiuser, group_name, username): + def remove_user_from_users_group(self, apiuser, usersgroupid, userid): """ Remove user from a group - :param apiuser - :param group_name - :param username + :param apiuser: + :param usersgroupid: + :param userid: """ + user = get_user_or_error(userid) + users_group = get_users_group_or_error(usersgroupid) try: - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - raise JSONRPCError('unknown users group %s' % group_name) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - - success = UsersGroupModel().remove_user_from_group(users_group, user) - msg = 'removed member %s from users group %s' % (username, group_name) + success = UsersGroupModel().remove_user_from_group(users_group, + user) + msg = 'removed member `%s` from users group `%s`' % ( + user.username, users_group.users_group_name + ) msg = msg if success else "User wasn't in group" - Session.commit() + Session().commit() return dict(success=success, msg=msg) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to remove user from group') + raise JSONRPCError( + 'failed to remove member from users group `%s`' % ( + users_group.users_group_name + ) + ) @HasPermissionAnyDecorator('hg.admin') def get_repo(self, apiuser, repoid): @@ -396,54 +434,30 @@ Get repository by name :param apiuser: - :param repo_name: + :param repoid: """ - - repo = RepoModel().get_repo(repoid) - if repo is None: - raise JSONRPCError('unknown repository "%s"' % (repo or repoid)) + repo = get_repo_or_error(repoid) members = [] for user in repo.repo_to_perm: perm = user.permission.permission_name user = user.user - members.append( - dict( - type="user", - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn, - permission=perm - ) - ) + user_data = user.get_api_data() + user_data['type'] = "user" + user_data['permission'] = perm + members.append(user_data) + for users_group in repo.users_group_to_perm: perm = users_group.permission.permission_name users_group = users_group.users_group - members.append( - dict( - type="users_group", - id=users_group.users_group_id, - name=users_group.users_group_name, - active=users_group.users_group_active, - permission=perm - ) - ) + users_group_data = users_group.get_api_data() + users_group_data['type'] = "users_group" + users_group_data['permission'] = perm + members.append(users_group_data) - return dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - members=members - ) + data = repo.get_api_data() + data['members'] = members + return data @HasPermissionAnyDecorator('hg.admin') def get_repos(self, apiuser): @@ -454,22 +468,12 @@ """ result = [] - for repo in Repository.getAll(): - result.append( - dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - ) - ) + for repo in RepoModel().get_all(): + result.append(repo.get_api_data()) return result @HasPermissionAnyDecorator('hg.admin') - def get_repo_nodes(self, apiuser, repo_name, revision, root_path, + def get_repo_nodes(self, apiuser, repoid, revision, root_path, ret_type='all'): """ returns a list of nodes and it's children @@ -477,13 +481,14 @@ to show only files or dirs :param apiuser: - :param repo_name: name of repository + :param repoid: name or id of repository :param revision: revision for which listing should be done :param root_path: path from which start displaying :param ret_type: return type 'all|files|dirs' nodes """ + repo = get_repo_or_error(repoid) try: - _d, _f = ScmModel().get_nodes(repo_name, revision, root_path, + _d, _f = ScmModel().get_nodes(repo, revision, root_path, flat=False) _map = { 'all': _d + _f, @@ -493,235 +498,242 @@ return _map[ret_type] except KeyError: raise JSONRPCError('ret_type must be one of %s' % _map.keys()) - except Exception, e: - raise JSONRPCError(e) + except Exception: + log.error(traceback.format_exc()) + raise JSONRPCError( + 'failed to get repo: `%s` nodes' % repo.repo_name + ) @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') - def create_repo(self, apiuser, repo_name, owner_name, description='', - repo_type='hg', private=False, clone_uri=None): + def create_repo(self, apiuser, repo_name, owner, repo_type, + description=Optional(''), private=Optional(False), + clone_uri=Optional(None), landing_rev=Optional('tip')): """ Create repository, if clone_url is given it makes a remote clone + if repo_name is withina group name the groups will be created + automatically if they aren't present :param apiuser: :param repo_name: - :param owner_name: + :param onwer: + :param repo_type: :param description: - :param repo_type: :param private: :param clone_uri: + :param landing_rev: """ + owner = get_user_or_error(owner) + + if RepoModel().get_by_repo_name(repo_name): + raise JSONRPCError("repo `%s` already exist" % repo_name) + + private = Optional.extract(private) + clone_uri = Optional.extract(clone_uri) + description = Optional.extract(description) + landing_rev = Optional.extract(landing_rev) try: - owner = User.get_by_username(owner_name) - if owner is None: - raise JSONRPCError('unknown user %s' % owner_name) - - if Repository.get_by_repo_name(repo_name): - raise JSONRPCError("repo %s already exist" % repo_name) - - groups = repo_name.split(Repository.url_sep()) - real_name = groups[-1] - # create structure of groups + # create structure of groups and return the last group group = map_groups(repo_name) - repo = RepoModel().create( - dict( - repo_name=real_name, - repo_name_full=repo_name, - description=description, - private=private, - repo_type=repo_type, - repo_group=group.group_id if group else None, - clone_uri=clone_uri - ) + repo = RepoModel().create_repo( + repo_name=repo_name, + repo_type=repo_type, + description=description, + owner=owner, + private=private, + clone_uri=clone_uri, + repos_group=group, + landing_rev=landing_rev, ) - Session.commit() + + Session().commit() return dict( - id=repo.repo_id, - msg="Created new repository %s" % (repo.repo_name), - repo=dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - ) + msg="Created new repository `%s`" % (repo.repo_name), + repo=repo.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create repository %s' % repo_name) + raise JSONRPCError('failed to create repository `%s`' % repo_name) + +# @HasPermissionAnyDecorator('hg.admin') +# def fork_repo(self, apiuser, repoid, fork_name): +# repo = get_repo_or_error(repoid) +# +# try: +# form_data = dict( +# +# ) +# RepoModel().create_fork(form_data, cur_user=apiuser) +# return dict( +# msg='Created fork of `%s` as `%s`' % (repo.repo_name, +# fork_name), +# success=True +# ) +# except Exception: +# log.error(traceback.format_exc()) +# raise JSONRPCError( +# 'failed to fork repository `%s` as `%s`' % (repo.repo_name, +# fork_name) +# ) @HasPermissionAnyDecorator('hg.admin') - def fork_repo(self, apiuser, repoid): - repo = RepoModel().get_repo(repoid) - if repo is None: - raise JSONRPCError('unknown repository "%s"' % (repo or repoid)) - - RepoModel().create_fork(form_data, cur_user) - - - @HasPermissionAnyDecorator('hg.admin') - def delete_repo(self, apiuser, repo_name): + def delete_repo(self, apiuser, repoid): """ Deletes a given repository - :param repo_name: + :param apiuser: + :param repoid: """ - if not Repository.get_by_repo_name(repo_name): - raise JSONRPCError("repo %s does not exist" % repo_name) + repo = get_repo_or_error(repoid) + try: - RepoModel().delete(repo_name) - Session.commit() + RepoModel().delete(repo) + Session().commit() return dict( - msg='Deleted repository %s' % repo_name + msg='Deleted repository `%s`' % repo.repo_name, + success=True ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to delete repository %s' % repo_name) + raise JSONRPCError( + 'failed to delete repository `%s`' % repo.repo_name + ) @HasPermissionAnyDecorator('hg.admin') - def grant_user_permission(self, apiuser, repo_name, username, perm): + def grant_user_permission(self, apiuser, repoid, userid, perm): """ Grant permission for user on given repository, or update existing one if found - :param repo_name: - :param username: + :param repoid: + :param userid: :param perm: """ + repo = get_repo_or_error(repoid) + user = get_user_or_error(userid) + perm = get_perm_or_error(perm) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) RepoModel().grant_user_permission(repo=repo, user=user, perm=perm) - Session.commit() + Session().commit() return dict( - msg='Granted perm: %s for user: %s in repo: %s' % ( - perm, username, repo_name - ) + msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % ( + perm.permission_name, user.username, repo.repo_name + ), + success=True ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(user)s' % dict( - user=username, repo=repo_name + 'failed to edit permission for user: `%s` in repo: `%s`' % ( + userid, repoid ) ) @HasPermissionAnyDecorator('hg.admin') - def revoke_user_permission(self, apiuser, repo_name, username): + def revoke_user_permission(self, apiuser, repoid, userid): """ Revoke permission for user on given repository - :param repo_name: - :param username: + :param apiuser: + :param repoid: + :param userid: """ + repo = get_repo_or_error(repoid) + user = get_user_or_error(userid) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) + + RepoModel().revoke_user_permission(repo=repo, user=user) - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - - RepoModel().revoke_user_permission(repo=repo_name, user=username) - - Session.commit() + Session().commit() return dict( - msg='Revoked perm for user: %s in repo: %s' % ( - username, repo_name - ) + msg='Revoked perm for user: `%s` in repo: `%s`' % ( + user.username, repo.repo_name + ), + success=True ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(user)s' % dict( - user=username, repo=repo_name + 'failed to edit permission for user: `%s` in repo: `%s`' % ( + userid, repoid ) ) @HasPermissionAnyDecorator('hg.admin') - def grant_users_group_permission(self, apiuser, repo_name, group_name, perm): + def grant_users_group_permission(self, apiuser, repoid, usersgroupid, + perm): """ Grant permission for users group on given repository, or update existing one if found - :param repo_name: - :param group_name: + :param apiuser: + :param repoid: + :param usersgroupid: :param perm: """ + repo = get_repo_or_error(repoid) + perm = get_perm_or_error(perm) + users_group = get_users_group_or_error(usersgroupid) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user_group = UsersGroup.get_by_group_name(group_name) - if user_group is None: - raise JSONRPCError('unknown users group %s' % user_group) - - RepoModel().grant_users_group_permission(repo=repo_name, - group_name=group_name, + RepoModel().grant_users_group_permission(repo=repo, + group_name=users_group, perm=perm) - Session.commit() + Session().commit() return dict( - msg='Granted perm: %s for group: %s in repo: %s' % ( - perm, group_name, repo_name + msg='Granted perm: `%s` for users group: `%s` in ' + 'repo: `%s`' % ( + perm.permission_name, users_group.users_group_name, + repo.repo_name + ), + success=True + ) + except Exception: + print traceback.format_exc() + log.error(traceback.format_exc()) + raise JSONRPCError( + 'failed to edit permission for users group: `%s` in ' + 'repo: `%s`' % ( + usersgroupid, repo.repo_name ) ) + + @HasPermissionAnyDecorator('hg.admin') + def revoke_users_group_permission(self, apiuser, repoid, usersgroupid): + """ + Revoke permission for users group on given repository + + :param apiuser: + :param repoid: + :param usersgroupid: + """ + repo = get_repo_or_error(repoid) + users_group = get_users_group_or_error(usersgroupid) + + try: + RepoModel().revoke_users_group_permission(repo=repo, + group_name=users_group) + + Session().commit() + return dict( + msg='Revoked perm for users group: `%s` in repo: `%s`' % ( + users_group.users_group_name, repo.repo_name + ), + success=True + ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(usersgr)s' % dict( - usersgr=group_name, repo=repo_name + 'failed to edit permission for users group: `%s` in ' + 'repo: `%s`' % ( + users_group.users_group_name, repo.repo_name ) ) - - @HasPermissionAnyDecorator('hg.admin') - def revoke_users_group_permission(self, apiuser, repo_name, group_name): - """ - Revoke permission for users group on given repository - - :param repo_name: - :param group_name: - """ - - try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user_group = UsersGroup.get_by_group_name(group_name) - if user_group is None: - raise JSONRPCError('unknown users group %s' % user_group) - - RepoModel().revoke_users_group_permission(repo=repo_name, - group_name=group_name) - - Session.commit() - return dict( - msg='Revoked perm for group: %s in repo: %s' % ( - group_name, repo_name - ) - ) - except Exception: - log.error(traceback.format_exc()) - raise JSONRPCError( - 'failed to edit permission %(repo)s for %(usersgr)s' % dict( - usersgr=group_name, repo=repo_name - ) - )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/api/api_base.py Sun Jul 01 16:22:38 2012 +0200 @@ -0,0 +1,902 @@ +import random +import mock + +from rhodecode.tests import * +from rhodecode.model.meta import Session +from rhodecode.lib.compat import json +from rhodecode.lib.auth import AuthUser +from rhodecode.model.user import UserModel +from rhodecode.model.users_group import UsersGroupModel +from rhodecode.model.repo import RepoModel +from nose.tools import with_setup + +API_URL = '/_admin/api' + + +def _build_data(apikey, method, **kw): + """ + Builds API data with given random ID + + :param random_id: + :type random_id: + """ + random_id = random.randrange(1, 9999) + return random_id, json.dumps({ + "id": random_id, + "api_key": apikey, + "method": method, + "args": kw + }) + +jsonify = lambda obj: json.loads(json.dumps(obj)) + + +def crash(*args, **kwargs): + raise Exception('Total Crash !') + + +TEST_USERS_GROUP = 'test_users_group' + + +def make_users_group(name=TEST_USERS_GROUP): + gr = UsersGroupModel().create(name=name) + UsersGroupModel().add_user_to_group(users_group=gr, + user=TEST_USER_ADMIN_LOGIN) + Session().commit() + return gr + + +def destroy_users_group(name=TEST_USERS_GROUP): + UsersGroupModel().delete(users_group=name, force=True) + Session().commit() + + +def create_repo(repo_name, repo_type): + # create new repo + form_data = dict(repo_name=repo_name, + repo_name_full=repo_name, + fork_name=None, + description='description %s' % repo_name, + repo_group=None, + private=False, + repo_type=repo_type, + clone_uri=None, + landing_rev='tip') + cur_user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + r = RepoModel().create(form_data, cur_user) + Session().commit() + return r + + +def destroy_repo(repo_name): + RepoModel().delete(repo_name) + Session().commit() + + +class BaseTestApi(object): + REPO = None + REPO_TYPE = None + + @classmethod + def setUpClass(self): + self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + self.apikey = self.usr.api_key + self.TEST_USER = UserModel().create_or_update( + username='test-api', + password='test', + email='test@api.rhodecode.org', + firstname='first', + lastname='last' + ) + Session().commit() + self.TEST_USER_LOGIN = self.TEST_USER.username + + @classmethod + def teardownClass(self): + pass + + def setUp(self): + self.maxDiff = None + make_users_group() + + def tearDown(self): + destroy_users_group() + + def _compare_ok(self, id_, expected, given): + expected = jsonify({ + 'id': id_, + 'error': None, + 'result': expected + }) + given = json.loads(given) + self.assertEqual(expected, given) + + def _compare_error(self, id_, expected, given): + expected = jsonify({ + 'id': id_, + 'error': expected, + 'result': None + }) + given = json.loads(given) + self.assertEqual(expected, given) + +# def test_Optional(self): +# from rhodecode.controllers.api.api import Optional +# option1 = Optional(None) +# self.assertEqual('<Optional:%s>' % None, repr(option1)) +# +# self.assertEqual(1, Optional.extract(Optional(1))) +# self.assertEqual('trololo', Optional.extract('trololo')) + + def test_api_wrong_key(self): + id_, params = _build_data('trololo', 'get_user') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'Invalid API KEY' + self._compare_error(id_, expected, given=response.body) + + def test_api_missing_non_optional_param(self): + id_, params = _build_data(self.apikey, 'get_user') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'Missing non optional `userid` arg in JSON DATA' + self._compare_error(id_, expected, given=response.body) + + def test_api_get_users(self): + id_, params = _build_data(self.apikey, 'get_users',) + response = self.app.post(API_URL, content_type='application/json', + params=params) + ret_all = [] + for usr in UserModel().get_all(): + ret = usr.get_api_data() + ret_all.append(jsonify(ret)) + expected = ret_all + self._compare_ok(id_, expected, given=response.body) + + def test_api_get_user(self): + id_, params = _build_data(self.apikey, 'get_user', + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + ret = usr.get_api_data() + ret['permissions'] = AuthUser(usr.user_id).permissions + + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_get_user_that_does_not_exist(self): + id_, params = _build_data(self.apikey, 'get_user', + userid='trololo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = "user `%s` does not exist" % 'trololo' + self._compare_error(id_, expected, given=response.body) + + def test_api_pull(self): + #TODO: issues with rhodecode_extras here.. not sure why ! + pass + +# repo_name = 'test_pull' +# r = create_repo(repo_name, self.REPO_TYPE) +# r.clone_uri = TEST_self.REPO +# Session.add(r) +# Session.commit() +# +# id_, params = _build_data(self.apikey, 'pull', +# repoid=repo_name,) +# response = self.app.post(API_URL, content_type='application/json', +# params=params) +# +# expected = 'Pulled from `%s`' % repo_name +# self._compare_ok(id_, expected, given=response.body) +# +# destroy_repo(repo_name) + + def test_api_pull_error(self): + id_, params = _build_data(self.apikey, 'pull', + repoid=self.REPO,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'Unable to pull changes from `%s`' % self.REPO + self._compare_error(id_, expected, given=response.body) + + def test_api_create_existing_user(self): + id_, params = _build_data(self.apikey, 'create_user', + username=TEST_USER_ADMIN_LOGIN, + email='test@foo.com', + password='trololo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN + self._compare_error(id_, expected, given=response.body) + + def test_api_create_user_with_existing_email(self): + id_, params = _build_data(self.apikey, 'create_user', + username=TEST_USER_ADMIN_LOGIN + 'new', + email=TEST_USER_REGULAR_EMAIL, + password='trololo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL + self._compare_error(id_, expected, given=response.body) + + def test_api_create_user(self): + username = 'test_new_api_user' + email = username + "@foo.com" + + id_, params = _build_data(self.apikey, 'create_user', + username=username, + email=email, + password='trololo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + usr = UserModel().get_by_username(username) + ret = dict( + msg='created new user `%s`' % username, + user=jsonify(usr.get_api_data()) + ) + + expected = ret + self._compare_ok(id_, expected, given=response.body) + + UserModel().delete(usr.user_id) + Session().commit() + + @mock.patch.object(UserModel, 'create_or_update', crash) + def test_api_create_user_when_exception_happened(self): + + username = 'test_new_api_user' + email = username + "@foo.com" + + id_, params = _build_data(self.apikey, 'create_user', + username=username, + email=email, + password='trololo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + expected = 'failed to create user `%s`' % username + self._compare_error(id_, expected, given=response.body) + + def test_api_delete_user(self): + usr = UserModel().create_or_update(username=u'test_user', + password=u'qweqwe', + email=u'u232@rhodecode.org', + firstname=u'u1', lastname=u'u1') + Session().commit() + username = usr.username + email = usr.email + usr_id = usr.user_id + ## DELETE THIS USER NOW + + id_, params = _build_data(self.apikey, 'delete_user', + userid=username,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username), + 'user': None} + expected = ret + self._compare_ok(id_, expected, given=response.body) + + @mock.patch.object(UserModel, 'delete', crash) + def test_api_delete_user_when_exception_happened(self): + usr = UserModel().create_or_update(username=u'test_user', + password=u'qweqwe', + email=u'u232@rhodecode.org', + firstname=u'u1', lastname=u'u1') + Session().commit() + username = usr.username + + id_, params = _build_data(self.apikey, 'delete_user', + userid=username,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + ret = 'failed to delete ID:%s %s' % (usr.user_id, + usr.username) + expected = ret + self._compare_error(id_, expected, given=response.body) + + @parameterized.expand([('firstname', 'new_username'), + ('lastname', 'new_username'), + ('email', 'new_username'), + ('admin', True), + ('admin', False), + ('ldap_dn', 'test'), + ('ldap_dn', None), + ('active', False), + ('active', True), + ('password', 'newpass') + ]) + def test_api_update_user(self, name, expected): + usr = UserModel().get_by_username(self.TEST_USER_LOGIN) + kw = {name: expected, + 'userid': usr.user_id} + id_, params = _build_data(self.apikey, 'update_user', **kw) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = { + 'msg': 'updated user ID:%s %s' % (usr.user_id, self.TEST_USER_LOGIN), + 'user': jsonify(UserModel()\ + .get_by_username(self.TEST_USER_LOGIN)\ + .get_api_data()) + } + + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_update_user_no_changed_params(self): + usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + ret = jsonify(usr.get_api_data()) + id_, params = _build_data(self.apikey, 'update_user', + userid=TEST_USER_ADMIN_LOGIN) + + response = self.app.post(API_URL, content_type='application/json', + params=params) + ret = { + 'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN), + 'user': ret + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_update_user_by_user_id(self): + usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + ret = jsonify(usr.get_api_data()) + id_, params = _build_data(self.apikey, 'update_user', + userid=usr.user_id) + + response = self.app.post(API_URL, content_type='application/json', + params=params) + ret = { + 'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN), + 'user': ret + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + @mock.patch.object(UserModel, 'create_or_update', crash) + def test_api_update_user_when_exception_happens(self): + usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) + ret = jsonify(usr.get_api_data()) + id_, params = _build_data(self.apikey, 'update_user', + userid=usr.user_id) + + response = self.app.post(API_URL, content_type='application/json', + params=params) + ret = 'failed to update user `%s`' % usr.user_id + + expected = ret + self._compare_error(id_, expected, given=response.body) + + def test_api_get_repo(self): + new_group = 'some_new_group' + make_users_group(new_group) + RepoModel().grant_users_group_permission(repo=self.REPO, + group_name=new_group, + perm='repository.read') + Session().commit() + id_, params = _build_data(self.apikey, 'get_repo', + repoid=self.REPO) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + repo = RepoModel().get_by_repo_name(self.REPO) + ret = repo.get_api_data() + + members = [] + for user in repo.repo_to_perm: + perm = user.permission.permission_name + user = user.user + user_data = user.get_api_data() + user_data['type'] = "user" + user_data['permission'] = perm + members.append(user_data) + + for users_group in repo.users_group_to_perm: + perm = users_group.permission.permission_name + users_group = users_group.users_group + users_group_data = users_group.get_api_data() + users_group_data['type'] = "users_group" + users_group_data['permission'] = perm + members.append(users_group_data) + + ret['members'] = members + + expected = ret + self._compare_ok(id_, expected, given=response.body) + destroy_users_group(new_group) + + def test_api_get_repo_that_doesn_not_exist(self): + id_, params = _build_data(self.apikey, 'get_repo', + repoid='no-such-repo') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = 'repository `%s` does not exist' % 'no-such-repo' + expected = ret + self._compare_error(id_, expected, given=response.body) + + def test_api_get_repos(self): + id_, params = _build_data(self.apikey, 'get_repos') + response = self.app.post(API_URL, content_type='application/json', + params=params) + + result = [] + for repo in RepoModel().get_all(): + result.append(repo.get_api_data()) + ret = jsonify(result) + + expected = ret + self._compare_ok(id_, expected, given=response.body) + + @parameterized.expand([('all', 'all'), + ('dirs', 'dirs'), + ('files', 'files'), ]) + def test_api_get_repo_nodes(self, name, ret_type): + rev = 'tip' + path = '/' + id_, params = _build_data(self.apikey, 'get_repo_nodes', + repoid=self.REPO, revision=rev, + root_path=path, + ret_type=ret_type) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + # we don't the actual return types here since it's tested somewhere + # else + expected = json.loads(response.body)['result'] + self._compare_ok(id_, expected, given=response.body) + + def test_api_get_repo_nodes_bad_revisions(self): + rev = 'i-dont-exist' + path = '/' + id_, params = _build_data(self.apikey, 'get_repo_nodes', + repoid=self.REPO, revision=rev, + root_path=path,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to get repo: `%s` nodes' % self.REPO + self._compare_error(id_, expected, given=response.body) + + def test_api_get_repo_nodes_bad_path(self): + rev = 'tip' + path = '/idontexits' + id_, params = _build_data(self.apikey, 'get_repo_nodes', + repoid=self.REPO, revision=rev, + root_path=path,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to get repo: `%s` nodes' % self.REPO + self._compare_error(id_, expected, given=response.body) + + def test_api_get_repo_nodes_bad_ret_type(self): + rev = 'tip' + path = '/' + ret_type = 'error' + id_, params = _build_data(self.apikey, 'get_repo_nodes', + repoid=self.REPO, revision=rev, + root_path=path, + ret_type=ret_type) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'ret_type must be one of %s' % (['files', 'dirs', 'all']) + self._compare_error(id_, expected, given=response.body) + + def test_api_create_repo(self): + repo_name = 'api-repo' + id_, params = _build_data(self.apikey, 'create_repo', + repo_name=repo_name, + owner=TEST_USER_ADMIN_LOGIN, + repo_type='hg', + ) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + repo = RepoModel().get_by_repo_name(repo_name) + ret = { + 'msg': 'Created new repository `%s`' % repo_name, + 'repo': jsonify(repo.get_api_data()) + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + destroy_repo(repo_name) + + def test_api_create_repo_unknown_owner(self): + repo_name = 'api-repo' + owner = 'i-dont-exist' + id_, params = _build_data(self.apikey, 'create_repo', + repo_name=repo_name, + owner=owner, + repo_type='hg', + ) + response = self.app.post(API_URL, content_type='application/json', + params=params) + expected = 'user `%s` does not exist' % owner + self._compare_error(id_, expected, given=response.body) + + def test_api_create_repo_exists(self): + repo_name = self.REPO + id_, params = _build_data(self.apikey, 'create_repo', + repo_name=repo_name, + owner=TEST_USER_ADMIN_LOGIN, + repo_type='hg', + ) + response = self.app.post(API_URL, content_type='application/json', + params=params) + expected = "repo `%s` already exist" % repo_name + self._compare_error(id_, expected, given=response.body) + + @mock.patch.object(RepoModel, 'create_repo', crash) + def test_api_create_repo_exception_occurred(self): + repo_name = 'api-repo' + id_, params = _build_data(self.apikey, 'create_repo', + repo_name=repo_name, + owner=TEST_USER_ADMIN_LOGIN, + repo_type='hg', + ) + response = self.app.post(API_URL, content_type='application/json', + params=params) + expected = 'failed to create repository `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + + def test_api_delete_repo(self): + repo_name = 'api_delete_me' + create_repo(repo_name, self.REPO_TYPE) + + id_, params = _build_data(self.apikey, 'delete_repo', + repoid=repo_name,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = { + 'msg': 'Deleted repository `%s`' % repo_name, + 'success': True + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_delete_repo_exception_occurred(self): + repo_name = 'api_delete_me' + create_repo(repo_name, self.REPO_TYPE) + with mock.patch.object(RepoModel, 'delete', crash): + id_, params = _build_data(self.apikey, 'delete_repo', + repoid=repo_name,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to delete repository `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + destroy_repo(repo_name) + + def test_api_fork_repo(self): + self.failIf(False, 'TODO:') + + def test_api_get_users_group(self): + id_, params = _build_data(self.apikey, 'get_users_group', + usersgroupid=TEST_USERS_GROUP) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + users_group = UsersGroupModel().get_group(TEST_USERS_GROUP) + members = [] + for user in users_group.members: + user = user.user + members.append(user.get_api_data()) + + ret = users_group.get_api_data() + ret['members'] = members + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_get_users_groups(self): + + make_users_group('test_users_group2') + + id_, params = _build_data(self.apikey, 'get_users_groups',) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = [] + for gr_name in [TEST_USERS_GROUP, 'test_users_group2']: + users_group = UsersGroupModel().get_group(gr_name) + ret = users_group.get_api_data() + expected.append(ret) + self._compare_ok(id_, expected, given=response.body) + + UsersGroupModel().delete(users_group='test_users_group2') + Session().commit() + + def test_api_create_users_group(self): + group_name = 'some_new_group' + id_, params = _build_data(self.apikey, 'create_users_group', + group_name=group_name) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = { + 'msg': 'created new users group `%s`' % group_name, + 'users_group': jsonify(UsersGroupModel()\ + .get_by_name(group_name)\ + .get_api_data()) + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + destroy_users_group(group_name) + + def test_api_get_users_group_that_exist(self): + id_, params = _build_data(self.apikey, 'create_users_group', + group_name=TEST_USERS_GROUP) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = "users group `%s` already exist" % TEST_USERS_GROUP + self._compare_error(id_, expected, given=response.body) + + @mock.patch.object(UsersGroupModel, 'create', crash) + def test_api_get_users_group_exception_occurred(self): + group_name = 'exception_happens' + id_, params = _build_data(self.apikey, 'create_users_group', + group_name=group_name) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to create group `%s`' % group_name + self._compare_error(id_, expected, given=response.body) + + def test_api_add_user_to_users_group(self): + gr_name = 'test_group' + UsersGroupModel().create(gr_name) + Session().commit() + id_, params = _build_data(self.apikey, 'add_user_to_users_group', + usersgroupid=gr_name, + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = { + 'msg': 'added member `%s` to users group `%s`' % ( + TEST_USER_ADMIN_LOGIN, gr_name + ), + 'success': True} + self._compare_ok(id_, expected, given=response.body) + + UsersGroupModel().delete(users_group=gr_name) + Session().commit() + + def test_api_add_user_to_users_group_that_doesnt_exist(self): + id_, params = _build_data(self.apikey, 'add_user_to_users_group', + usersgroupid='false-group', + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'users group `%s` does not exist' % 'false-group' + self._compare_error(id_, expected, given=response.body) + + @mock.patch.object(UsersGroupModel, 'add_user_to_group', crash) + def test_api_add_user_to_users_group_exception_occurred(self): + gr_name = 'test_group' + UsersGroupModel().create(gr_name) + Session().commit() + id_, params = _build_data(self.apikey, 'add_user_to_users_group', + usersgroupid=gr_name, + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to add member to users group `%s`' % gr_name + self._compare_error(id_, expected, given=response.body) + + UsersGroupModel().delete(users_group=gr_name) + Session().commit() + + def test_api_remove_user_from_users_group(self): + gr_name = 'test_group_3' + gr = UsersGroupModel().create(gr_name) + UsersGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN) + Session().commit() + id_, params = _build_data(self.apikey, 'remove_user_from_users_group', + usersgroupid=gr_name, + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = { + 'msg': 'removed member `%s` from users group `%s`' % ( + TEST_USER_ADMIN_LOGIN, gr_name + ), + 'success': True} + self._compare_ok(id_, expected, given=response.body) + + UsersGroupModel().delete(users_group=gr_name) + Session().commit() + + @mock.patch.object(UsersGroupModel, 'remove_user_from_group', crash) + def test_api_remove_user_from_users_group_exception_occurred(self): + gr_name = 'test_group_3' + gr = UsersGroupModel().create(gr_name) + UsersGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN) + Session().commit() + id_, params = _build_data(self.apikey, 'remove_user_from_users_group', + usersgroupid=gr_name, + userid=TEST_USER_ADMIN_LOGIN) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to remove member from users group `%s`' % gr_name + self._compare_error(id_, expected, given=response.body) + + UsersGroupModel().delete(users_group=gr_name) + Session().commit() + + @parameterized.expand([('none', 'repository.none'), + ('read', 'repository.read'), + ('write', 'repository.write'), + ('admin', 'repository.admin')]) + def test_api_grant_user_permission(self, name, perm): + id_, params = _build_data(self.apikey, 'grant_user_permission', + repoid=self.REPO, + userid=TEST_USER_ADMIN_LOGIN, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = { + 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % ( + perm, TEST_USER_ADMIN_LOGIN, self.REPO + ), + 'success': True + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_grant_user_permission_wrong_permission(self): + perm = 'haha.no.permission' + id_, params = _build_data(self.apikey, 'grant_user_permission', + repoid=self.REPO, + userid=TEST_USER_ADMIN_LOGIN, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'permission `%s` does not exist' % perm + self._compare_error(id_, expected, given=response.body) + + @mock.patch.object(RepoModel, 'grant_user_permission', crash) + def test_api_grant_user_permission_exception_when_adding(self): + perm = 'repository.read' + id_, params = _build_data(self.apikey, 'grant_user_permission', + repoid=self.REPO, + userid=TEST_USER_ADMIN_LOGIN, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to edit permission for user: `%s` in repo: `%s`' % ( + TEST_USER_ADMIN_LOGIN, self.REPO + ) + self._compare_error(id_, expected, given=response.body) + + def test_api_revoke_user_permission(self): + id_, params = _build_data(self.apikey, 'revoke_user_permission', + repoid=self.REPO, + userid=TEST_USER_ADMIN_LOGIN,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = { + 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % ( + TEST_USER_ADMIN_LOGIN, self.REPO + ), + 'success': True + } + self._compare_ok(id_, expected, given=response.body) + + @mock.patch.object(RepoModel, 'revoke_user_permission', crash) + def test_api_revoke_user_permission_exception_when_adding(self): + id_, params = _build_data(self.apikey, 'revoke_user_permission', + repoid=self.REPO, + userid=TEST_USER_ADMIN_LOGIN,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to edit permission for user: `%s` in repo: `%s`' % ( + TEST_USER_ADMIN_LOGIN, self.REPO + ) + self._compare_error(id_, expected, given=response.body) + + @parameterized.expand([('none', 'repository.none'), + ('read', 'repository.read'), + ('write', 'repository.write'), + ('admin', 'repository.admin')]) + def test_api_grant_users_group_permission(self, name, perm): + id_, params = _build_data(self.apikey, 'grant_users_group_permission', + repoid=self.REPO, + usersgroupid=TEST_USERS_GROUP, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + ret = { + 'msg': 'Granted perm: `%s` for users group: `%s` in repo: `%s`' % ( + perm, TEST_USERS_GROUP, self.REPO + ), + 'success': True + } + expected = ret + self._compare_ok(id_, expected, given=response.body) + + def test_api_grant_users_group_permission_wrong_permission(self): + perm = 'haha.no.permission' + id_, params = _build_data(self.apikey, 'grant_users_group_permission', + repoid=self.REPO, + usersgroupid=TEST_USERS_GROUP, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'permission `%s` does not exist' % perm + self._compare_error(id_, expected, given=response.body) + + @mock.patch.object(RepoModel, 'grant_users_group_permission', crash) + def test_api_grant_users_group_permission_exception_when_adding(self): + perm = 'repository.read' + id_, params = _build_data(self.apikey, 'grant_users_group_permission', + repoid=self.REPO, + usersgroupid=TEST_USERS_GROUP, + perm=perm) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to edit permission for users group: `%s` in repo: `%s`' % ( + TEST_USERS_GROUP, self.REPO + ) + self._compare_error(id_, expected, given=response.body) + + def test_api_revoke_users_group_permission(self): + RepoModel().grant_users_group_permission(repo=self.REPO, + group_name=TEST_USERS_GROUP, + perm='repository.read') + Session().commit() + id_, params = _build_data(self.apikey, 'revoke_users_group_permission', + repoid=self.REPO, + usersgroupid=TEST_USERS_GROUP,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = { + 'msg': 'Revoked perm for users group: `%s` in repo: `%s`' % ( + TEST_USERS_GROUP, self.REPO + ), + 'success': True + } + self._compare_ok(id_, expected, given=response.body) + + @mock.patch.object(RepoModel, 'revoke_users_group_permission', crash) + def test_api_revoke_users_group_permission_exception_when_adding(self): + + id_, params = _build_data(self.apikey, 'revoke_users_group_permission', + repoid=self.REPO, + usersgroupid=TEST_USERS_GROUP,) + response = self.app.post(API_URL, content_type='application/json', + params=params) + + expected = 'failed to edit permission for users group: `%s` in repo: `%s`' % ( + TEST_USERS_GROUP, self.REPO + ) + self._compare_error(id_, expected, given=response.body) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/api/test_api_git.py Sun Jul 01 16:22:38 2012 +0200 @@ -0,0 +1,7 @@ +from rhodecode.tests import * +from rhodecode.tests.api.api_base import BaseTestApi + + +class TestGitApi(BaseTestApi, TestController): + REPO = GIT_REPO + REPO_TYPE = 'git'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/api/test_api_hg.py Sun Jul 01 16:22:38 2012 +0200 @@ -0,0 +1,7 @@ +from rhodecode.tests import * +from rhodecode.tests.api.api_base import BaseTestApi + + +class TestHgApi(BaseTestApi, TestController): + REPO = HG_REPO + REPO_TYPE = 'hg' \ No newline at end of file