Mercurial > kallithea
diff rhodecode/tests/fixture.py @ 3647:8a86836fad64 beta
more usage of fixture tools
in tests
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Mon, 01 Apr 2013 23:45:25 +0200 |
parents | 749dfd6b6e95 |
children | 7e3d89d9d3a2 |
line wrap: on
line diff
--- a/rhodecode/tests/fixture.py Sun Mar 31 21:44:27 2013 +0200 +++ b/rhodecode/tests/fixture.py Mon Apr 01 23:45:25 2013 +0200 @@ -1,12 +1,11 @@ """ Helpers for fixture generation """ -import os -import unittest from rhodecode.tests import * -from rhodecode.model.db import Repository, User +from rhodecode.model.db import Repository, User, RepoGroup from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel +from rhodecode.model.repos_group import ReposGroupModel class Fixture(object): @@ -14,30 +13,90 @@ def __init__(self): pass + def _get_repo_create_params(self, **custom): + defs = dict( + repo_name=None, + repo_type='hg', + clone_uri='', + repo_group='', + repo_description='DESC', + repo_private=False, + repo_landing_rev='tip' + ) + defs.update(custom) + if 'repo_name_full' not in custom: + defs.update({'repo_name_full': defs['repo_name']}) + + return defs + + def _get_group_create_params(self, **custom): + defs = dict( + group_name=None, + group_description='DESC', + group_parent_id=None, + perms_updates=[], + perms_new=[], + enable_locking=False, + recursive=False + ) + defs.update(custom) + + return defs + def create_repo(self, name, **kwargs): - form_data = _get_repo_create_params(repo_name=name, **kwargs) - cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) + if 'skip_if_exists' in kwargs: + del kwargs['skip_if_exists'] + r = Repository.get_by_repo_name(name) + if r: + return r + + if isinstance(kwargs.get('repos_group'), RepoGroup): + #TODO: rename the repos_group ! + kwargs['repo_group'] = kwargs['repos_group'].group_id + del kwargs['repos_group'] + + form_data = self._get_repo_create_params(repo_name=name, **kwargs) + cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) RepoModel().create(form_data, cur_user) + Session().commit() return Repository.get_by_repo_name(name) def create_fork(self, repo_to_fork, fork_name, **kwargs): repo_to_fork = Repository.get_by_repo_name(repo_to_fork) - vcs_type = repo_to_fork.repo_type + + form_data = self._get_repo_create_params(repo_name=fork_name, + fork_parent_id=repo_to_fork, + repo_type=repo_to_fork.repo_type, + **kwargs) + form_data['update_after_clone'] = False + + #TODO: fix it !! + form_data['description'] = form_data['repo_description'] + form_data['private'] = form_data['repo_private'] + form_data['landing_rev'] = form_data['repo_landing_rev'] + + owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) + RepoModel().create_fork(form_data, cur_user=owner) + Session().commit() + r = Repository.get_by_repo_name(fork_name) + assert r + return r - form_data = dict( - repo_name=fork_name, - repo_name_full=fork_name, - repo_group=None, - repo_type=vcs_type, - description='', - private=False, - copy_permissions=False, - landing_rev='tip', - update_after_clone=False, - fork_parent_id=repo_to_fork, - ) - cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) - RepoModel().create_fork(form_data, cur_user=cur_user) + def destroy_repo(self, repo_name): + RepoModel().delete(repo_name) + Session().commit() + def create_group(self, name, **kwargs): + if 'skip_if_exists' in kwargs: + del kwargs['skip_if_exists'] + gr = RepoGroup.get_by_group_name(group_name=name) + if gr: + return gr + form_data = self._get_group_create_params(group_name=name, **kwargs) + owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) + gr = ReposGroupModel().create(group_name=form_data['group_name'], + group_description=form_data['group_name'], + owner=owner, parent=form_data['group_parent_id']) Session().commit() - return Repository.get_by_repo_name(fork_name) + gr = RepoGroup.get_by_group_name(gr.group_name) + return gr