view rhodecode/tests/functional/test_admin_repos.py @ 3222:b4daef4cc26d beta

Group management delegation: this allows users to become an admin of groups a group admin can manage a group, change it's name or permissions. A group admin can create child group only within a group he is an admin off. Top-level group can be only created by super admins
author Marcin Kuzminski <marcin@python-works.com>
date Fri, 25 Jan 2013 01:40:09 +0100
parents 42d7ca49d073
children 7abbb77b02d0
line wrap: on
line source

# -*- coding: utf-8 -*-

import os
from rhodecode.lib import vcs

from rhodecode.model.db import Repository, RepoGroup
from rhodecode.tests import *
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.repo import RepoModel


class TestAdminReposController(TestController):

    def __make_repo(self):
        pass

    def test_index(self):
        self.log_user()
        response = self.app.get(url('repos'))
        # Test response...

    def test_index_as_xml(self):
        response = self.app.get(url('formatted_repos', format='xml'))

    def test_create_hg(self):
        self.log_user()
        repo_name = NEW_HG_REPO
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_description=description))
        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

    def test_create_hg_non_ascii(self):
        self.log_user()
        non_ascii = "ąęł"
        repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
        repo_name_unicode = repo_name.decode('utf8')
        description = 'description for newly created repo' + non_ascii
        description_unicode = description.decode('utf8')
        private = False
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_description=description))
        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name_unicode))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_unicode).one()

        self.assertEqual(new_repo.repo_name, repo_name_unicode)
        self.assertEqual(new_repo.description, description_unicode)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

    def test_create_hg_in_group(self):
        self.log_user()

        ## create GROUP
        group_name = 'sometest'
        gr = ReposGroupModel().create(group_name=group_name,
                                      group_description='test',
                                      owner=TEST_USER_ADMIN_LOGIN)
        self.Session().commit()

        repo_name = 'ingroup'
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_name=repo_name,
                                                repo_description=description,
                                                repo_group=gr.group_id,))

        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_full).one()

        self.assertEqual(new_repo.repo_name, repo_name_full)
        self.assertEqual(new_repo.description, description)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name_full)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
        except:
            ReposGroupModel().delete(group_name)
            self.Session().commit()
            self.fail('no repo %s in filesystem' % repo_name)

        RepoModel().delete(repo_name_full)
        ReposGroupModel().delete(group_name)
        self.Session().commit()

    def test_create_git(self):
        self.log_user()
        repo_name = NEW_GIT_REPO
        description = 'description for newly created repo'

        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_type='git',
                                                repo_name=repo_name,
                                                repo_description=description))
        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

    def test_create_git_non_ascii(self):
        self.log_user()
        non_ascii = "ąęł"
        repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
        repo_name_unicode = repo_name.decode('utf8')
        description = 'description for newly created repo' + non_ascii
        description_unicode = description.decode('utf8')
        private = False
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_type='git',
                                                repo_name=repo_name,
                                                repo_description=description))

        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name_unicode))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name_unicode).one()

        self.assertEqual(new_repo.repo_name, repo_name_unicode)
        self.assertEqual(new_repo.description, description_unicode)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

    def test_new(self):
        self.log_user()
        response = self.app.get(url('new_repo'))

    def test_new_as_xml(self):
        response = self.app.get(url('formatted_new_repo', format='xml'))

    def test_update(self):
        response = self.app.put(url('repo', repo_name=HG_REPO))

    def test_update_browser_fakeout(self):
        response = self.app.post(url('repo', repo_name=HG_REPO),
                                 params=dict(_method='put'))

    def test_delete_hg(self):
        self.log_user()
        repo_name = 'vcs_test_new_to_delete'
        description = 'description for newly created repo'
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_type='hg',
                                                repo_name=repo_name,
                                                repo_description=description))

        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

        response = self.app.delete(url('repo', repo_name=repo_name))

        self.assertTrue('''deleted repository %s''' % (repo_name) in
                        response.session['flash'][0])

        response.follow()

        #check if repo was deleted from db
        deleted_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).scalar()

        self.assertEqual(deleted_repo, None)

        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
                                  False)

    def test_delete_git(self):
        self.log_user()
        repo_name = 'vcs_test_new_to_delete'
        description = 'description for newly created repo'
        private = False
        response = self.app.post(url('repos'),
                        _get_repo_create_params(repo_private=False,
                                                repo_type='git',
                                                repo_name=repo_name,
                                                repo_description=description))

        self.checkSessionFlash(response,
                               'created repository %s' % (repo_name))

        #test if the repo was created in the database
        new_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).one()

        self.assertEqual(new_repo.repo_name, repo_name)
        self.assertEqual(new_repo.description, description)

        #test if repository is visible in the list ?
        response = response.follow()

        response.mustcontain(repo_name)

        #test if repository was created on filesystem
        try:
            vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
        except:
            self.fail('no repo %s in filesystem' % repo_name)

        response = self.app.delete(url('repo', repo_name=repo_name))

        self.assertTrue('''deleted repository %s''' % (repo_name) in
                        response.session['flash'][0])

        response.follow()

        #check if repo was deleted from db
        deleted_repo = self.Session().query(Repository)\
            .filter(Repository.repo_name == repo_name).scalar()

        self.assertEqual(deleted_repo, None)

        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
                                  False)

    def test_delete_repo_with_group(self):
        #TODO:
        pass

    def test_delete_browser_fakeout(self):
        response = self.app.post(url('repo', repo_name=HG_REPO),
                                 params=dict(_method='delete'))

    def test_show_hg(self):
        self.log_user()
        response = self.app.get(url('repo', repo_name=HG_REPO))

    def test_show_git(self):
        self.log_user()
        response = self.app.get(url('repo', repo_name=GIT_REPO))


    def test_edit(self):
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))