view rhodecode/controllers/api/api.py @ 2165:dc2584ba5fbc

merged beta into default branch
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 28 Mar 2012 19:54:16 +0200
parents 82a88013a3fd 12ceeda33339
children 63e58ef80ef1
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
    rhodecode.controllers.api
    ~~~~~~~~~~~~~~~~~~~~~~~~~

    API controller for RhodeCode

    :created_on: Aug 20, 2011
    :author: marcink
    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
    :license: GPLv3, see COPYING for more details.
"""
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA  02110-1301, USA.

import traceback
import logging

from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import HasPermissionAllDecorator, \
    HasPermissionAnyDecorator, PasswordGenerator, AuthUser

from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.db import User, UsersGroup, Repository
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.lib.utils import map_groups

log = logging.getLogger(__name__)


class ApiController(JSONRPCController):
    """
    API Controller


    Each method needs to have USER as argument this is then based on given
    API_KEY propagated as instance of user object

    Preferably this should be first argument also


    Each function should also **raise** JSONRPCError for any
    errors that happens

    """

    @HasPermissionAllDecorator('hg.admin')
    def pull(self, apiuser, repo_name):
        """
        Dispatch pull action on given repo


        :param user:
        :param repo_name:
        """

        if Repository.is_valid(repo_name) is False:
            raise JSONRPCError('Unknown repo "%s"' % repo_name)

        try:
            ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
            return 'Pulled from %s' % repo_name
        except Exception:
            raise JSONRPCError('Unable to pull changes from "%s"' % repo_name)

    @HasPermissionAllDecorator('hg.admin')
    def get_user(self, apiuser, userid):
        """"
        Get a user by username

        :param apiuser:
        :param username:
        """

        user = UserModel().get_user(userid)
        if user is None:
            return user

        return dict(
            id=user.user_id,
            username=user.username,
            firstname=user.name,
            lastname=user.lastname,
            email=user.email,
            active=user.active,
            admin=user.admin,
            ldap_dn=user.ldap_dn,
            last_login=user.last_login,
            permissions=AuthUser(user_id=user.user_id).permissions
        )

    @HasPermissionAllDecorator('hg.admin')
    def get_users(self, apiuser):
        """"
        Get all users

        :param apiuser:
        """

        result = []
        for user in User.getAll():
            result.append(
                dict(
                    id=user.user_id,
                    username=user.username,
                    firstname=user.name,
                    lastname=user.lastname,
                    email=user.email,
                    active=user.active,
                    admin=user.admin,
                    ldap_dn=user.ldap_dn,
                    last_login=user.last_login,
                )
            )
        return result

    @HasPermissionAllDecorator('hg.admin')
    def create_user(self, apiuser, username, email, password, firstname=None,
                    lastname=None, active=True, admin=False, ldap_dn=None):
        """
        Create new user

        :param apiuser:
        :param username:
        :param password:
        :param email:
        :param name:
        :param lastname:
        :param active:
        :param admin:
        :param ldap_dn:
        """
        if User.get_by_username(username):
            raise JSONRPCError("user %s already exist" % username)

        if User.get_by_email(email, case_insensitive=True):
            raise JSONRPCError("email %s already exist" % email)

        if ldap_dn:
            # generate temporary password if ldap_dn
            password = PasswordGenerator().gen_password(length=8)

        try:
            usr = UserModel().create_or_update(
                username, password, email, firstname,
                lastname, active, admin, ldap_dn
            )
            Session.commit()
            return dict(
                id=usr.user_id,
                msg='created new user %s' % username
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to create user %s' % username)

    @HasPermissionAllDecorator('hg.admin')
    def update_user(self, apiuser, userid, username, password, email,
                    firstname, lastname, active, admin, ldap_dn):
        """
        Updates given user

        :param apiuser:
        :param username:
        :param password:
        :param email:
        :param name:
        :param lastname:
        :param active:
        :param admin:
        :param ldap_dn:
        """
        if not UserModel().get_user(userid):
            raise JSONRPCError("user %s does not exist" % username)

        try:
            usr = UserModel().create_or_update(
                username, password, email, firstname,
                lastname, active, admin, ldap_dn
            )
            Session.commit()
            return dict(
                id=usr.user_id,
                msg='updated user %s' % username
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to update user %s' % username)

    @HasPermissionAllDecorator('hg.admin')
    def get_users_group(self, apiuser, group_name):
        """"
        Get users group by name

        :param apiuser:
        :param group_name:
        """

        users_group = UsersGroup.get_by_group_name(group_name)
        if not users_group:
            return None

        members = []
        for user in users_group.members:
            user = user.user
            members.append(dict(id=user.user_id,
                            username=user.username,
                            firstname=user.name,
                            lastname=user.lastname,
                            email=user.email,
                            active=user.active,
                            admin=user.admin,
                            ldap=user.ldap_dn))

        return dict(id=users_group.users_group_id,
                    group_name=users_group.users_group_name,
                    active=users_group.users_group_active,
                    members=members)

    @HasPermissionAllDecorator('hg.admin')
    def get_users_groups(self, apiuser):
        """"
        Get all users groups

        :param apiuser:
        """

        result = []
        for users_group in UsersGroup.getAll():
            members = []
            for user in users_group.members:
                user = user.user
                members.append(dict(id=user.user_id,
                                username=user.username,
                                firstname=user.name,
                                lastname=user.lastname,
                                email=user.email,
                                active=user.active,
                                admin=user.admin,
                                ldap=user.ldap_dn))

            result.append(dict(id=users_group.users_group_id,
                                group_name=users_group.users_group_name,
                                active=users_group.users_group_active,
                                members=members))
        return result

    @HasPermissionAllDecorator('hg.admin')
    def create_users_group(self, apiuser, group_name, active=True):
        """
        Creates an new usergroup

        :param group_name:
        :param active:
        """

        if self.get_users_group(apiuser, group_name):
            raise JSONRPCError("users group %s already exist" % group_name)

        try:
            ug = UsersGroupModel().create(name=group_name, active=active)
            Session.commit()
            return dict(id=ug.users_group_id,
                        msg='created new users group %s' % group_name)
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to create group %s' % group_name)

    @HasPermissionAllDecorator('hg.admin')
    def add_user_to_users_group(self, apiuser, group_name, username):
        """"
        Add a user to a users group

        :param apiuser:
        :param group_name:
        :param username:
        """

        try:
            users_group = UsersGroup.get_by_group_name(group_name)
            if not users_group:
                raise JSONRPCError('unknown users group %s' % group_name)

            user = User.get_by_username(username)
            if user is None:
                raise JSONRPCError('unknown user %s' % username)

            ugm = UsersGroupModel().add_user_to_group(users_group, user)
            success = True if ugm != True else False
            msg = 'added member %s to users group %s' % (username, group_name)
            msg = msg if success else 'User is already in that group'
            Session.commit()

            return dict(
                id=ugm.users_group_member_id if ugm != True else None,
                success=success,
                msg=msg
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to add users group member')

    @HasPermissionAllDecorator('hg.admin')
    def remove_user_from_users_group(self, apiuser, group_name, username):
        """
        Remove user from a group

        :param apiuser
        :param group_name
        :param username
        """

        try:
            users_group = UsersGroup.get_by_group_name(group_name)
            if not users_group:
                raise JSONRPCError('unknown users group %s' % group_name)

            user = User.get_by_username(username)
            if user is None:
                raise JSONRPCError('unknown user %s' % username)

            success = UsersGroupModel().remove_user_from_group(users_group, user)
            msg = 'removed member %s from users group %s' % (username, group_name)
            msg = msg if success else "User wasn't in group"
            Session.commit()
            return dict(success=success, msg=msg)
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to remove user from group')

    @HasPermissionAnyDecorator('hg.admin')
    def get_repo(self, apiuser, repoid):
        """"
        Get repository by name

        :param apiuser:
        :param repo_name:
        """

        repo = RepoModel().get_repo(repoid)
        if repo is None:
            raise JSONRPCError('unknown repository %s' % repo)

        members = []
        for user in repo.repo_to_perm:
            perm = user.permission.permission_name
            user = user.user
            members.append(
                dict(
                    type="user",
                    id=user.user_id,
                    username=user.username,
                    firstname=user.name,
                    lastname=user.lastname,
                    email=user.email,
                    active=user.active,
                    admin=user.admin,
                    ldap=user.ldap_dn,
                    permission=perm
                )
            )
        for users_group in repo.users_group_to_perm:
            perm = users_group.permission.permission_name
            users_group = users_group.users_group
            members.append(
                dict(
                    type="users_group",
                    id=users_group.users_group_id,
                    name=users_group.users_group_name,
                    active=users_group.users_group_active,
                    permission=perm
                )
            )

        return dict(
            id=repo.repo_id,
            repo_name=repo.repo_name,
            type=repo.repo_type,
            description=repo.description,
            members=members
        )

    @HasPermissionAnyDecorator('hg.admin')
    def get_repos(self, apiuser):
        """"
        Get all repositories

        :param apiuser:
        """

        result = []
        for repository in Repository.getAll():
            result.append(
                dict(
                    id=repository.repo_id,
                    repo_name=repository.repo_name,
                    type=repository.repo_type,
                    description=repository.description
                )
            )
        return result

    @HasPermissionAnyDecorator('hg.admin')
    def get_repo_nodes(self, apiuser, repo_name, revision, root_path,
                       ret_type='all'):
        """
        returns a list of nodes and it's children
        for a given path at given revision. It's possible to specify ret_type
        to show only files or dirs

        :param apiuser:
        :param repo_name: name of repository
        :param revision: revision for which listing should be done
        :param root_path: path from which start displaying
        :param ret_type: return type 'all|files|dirs' nodes
        """
        try:
            _d, _f = ScmModel().get_nodes(repo_name, revision, root_path,
                                          flat=False)
            _map = {
                'all': _d + _f,
                'files': _f,
                'dirs': _d,
            }
            return _map[ret_type]
        except KeyError:
            raise JSONRPCError('ret_type must be one of %s' % _map.keys())
        except Exception, e:
            raise JSONRPCError(e)

    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
    def create_repo(self, apiuser, repo_name, owner_name, description='',
                    repo_type='hg', private=False, clone_uri=None):
        """
        Create repository, if clone_url is given it makes a remote clone

        :param apiuser:
        :param repo_name:
        :param owner_name:
        :param description:
        :param repo_type:
        :param private:
        :param clone_uri:
        """

        try:
            owner = User.get_by_username(owner_name)
            if owner is None:
                raise JSONRPCError('unknown user %s' % owner_name)

            if Repository.get_by_repo_name(repo_name):
                raise JSONRPCError("repo %s already exist" % repo_name)

            groups = repo_name.split(Repository.url_sep())
            real_name = groups[-1]
            # create structure of groups
            group = map_groups(repo_name)

            repo = RepoModel().create(
                dict(
                    repo_name=real_name,
                    repo_name_full=repo_name,
                    description=description,
                    private=private,
                    repo_type=repo_type,
                    repo_group=group.group_id if group else None,
                    clone_uri=clone_uri
                ),
                owner
            )
            Session.commit()

            return dict(
                id=repo.repo_id,
                msg="Created new repository %s" % repo.repo_name
            )

        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to create repository %s' % repo_name)

    @HasPermissionAnyDecorator('hg.admin')
    def delete_repo(self, apiuser, repo_name):
        """
        Deletes a given repository

        :param repo_name:
        """
        if not Repository.get_by_repo_name(repo_name):
            raise JSONRPCError("repo %s does not exist" % repo_name)
        try:
            RepoModel().delete(repo_name)
            Session.commit()
            return dict(
                msg='Deleted repository %s' % repo_name
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError('failed to delete repository %s' % repo_name)

    @HasPermissionAnyDecorator('hg.admin')
    def grant_user_permission(self, apiuser, repo_name, username, perm):
        """
        Grant permission for user on given repository, or update existing one
        if found

        :param repo_name:
        :param username:
        :param perm:
        """

        try:
            repo = Repository.get_by_repo_name(repo_name)
            if repo is None:
                raise JSONRPCError('unknown repository %s' % repo)

            user = User.get_by_username(username)
            if user is None:
                raise JSONRPCError('unknown user %s' % username)

            RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)

            Session.commit()
            return dict(
                msg='Granted perm: %s for user: %s in repo: %s' % (
                    perm, username, repo_name
                )
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError(
                'failed to edit permission %(repo)s for %(user)s' % dict(
                    user=username, repo=repo_name
                )
            )

    @HasPermissionAnyDecorator('hg.admin')
    def revoke_user_permission(self, apiuser, repo_name, username):
        """
        Revoke permission for user on given repository

        :param repo_name:
        :param username:
        """

        try:
            repo = Repository.get_by_repo_name(repo_name)
            if repo is None:
                raise JSONRPCError('unknown repository %s' % repo)

            user = User.get_by_username(username)
            if user is None:
                raise JSONRPCError('unknown user %s' % username)

            RepoModel().revoke_user_permission(repo=repo_name, user=username)

            Session.commit()
            return dict(
                msg='Revoked perm for user: %s in repo: %s' % (
                    username, repo_name
                )
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError(
                'failed to edit permission %(repo)s for %(user)s' % dict(
                    user=username, repo=repo_name
                )
            )

    @HasPermissionAnyDecorator('hg.admin')
    def grant_users_group_permission(self, apiuser, repo_name, group_name, perm):
        """
        Grant permission for users group on given repository, or update
        existing one if found

        :param repo_name:
        :param group_name:
        :param perm:
        """

        try:
            repo = Repository.get_by_repo_name(repo_name)
            if repo is None:
                raise JSONRPCError('unknown repository %s' % repo)

            user_group = UsersGroup.get_by_group_name(group_name)
            if user_group is None:
                raise JSONRPCError('unknown users group %s' % user_group)

            RepoModel().grant_users_group_permission(repo=repo_name,
                                                     group_name=group_name,
                                                     perm=perm)

            Session.commit()
            return dict(
                msg='Granted perm: %s for group: %s in repo: %s' % (
                    perm, group_name, repo_name
                )
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError(
                'failed to edit permission %(repo)s for %(usersgr)s' % dict(
                    usersgr=group_name, repo=repo_name
                )
            )

    @HasPermissionAnyDecorator('hg.admin')
    def revoke_users_group_permission(self, apiuser, repo_name, group_name):
        """
        Revoke permission for users group on given repository

        :param repo_name:
        :param group_name:
        """

        try:
            repo = Repository.get_by_repo_name(repo_name)
            if repo is None:
                raise JSONRPCError('unknown repository %s' % repo)

            user_group = UsersGroup.get_by_group_name(group_name)
            if user_group is None:
                raise JSONRPCError('unknown users group %s' % user_group)

            RepoModel().revoke_users_group_permission(repo=repo_name,
                                                      group_name=group_name)

            Session.commit()
            return dict(
                msg='Revoked perm for group: %s in repo: %s' % (
                    group_name, repo_name
                )
            )
        except Exception:
            log.error(traceback.format_exc())
            raise JSONRPCError(
                'failed to edit permission %(repo)s for %(usersgr)s' % dict(
                    usersgr=group_name, repo=repo_name
                )
            )