Mercurial > kallithea
comparison rhodecode/tests/fixture.py @ 3647:8a86836fad64 beta
more usage of fixture tools
in tests
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Mon, 01 Apr 2013 23:45:25 +0200 |
parents | 749dfd6b6e95 |
children | 7e3d89d9d3a2 |
comparison
equal
deleted
inserted
replaced
3646:63e49418a4cc | 3647:8a86836fad64 |
---|---|
1 """ | 1 """ |
2 Helpers for fixture generation | 2 Helpers for fixture generation |
3 """ | 3 """ |
4 import os | |
5 import unittest | |
6 from rhodecode.tests import * | 4 from rhodecode.tests import * |
7 from rhodecode.model.db import Repository, User | 5 from rhodecode.model.db import Repository, User, RepoGroup |
8 from rhodecode.model.meta import Session | 6 from rhodecode.model.meta import Session |
9 from rhodecode.model.repo import RepoModel | 7 from rhodecode.model.repo import RepoModel |
8 from rhodecode.model.repos_group import ReposGroupModel | |
10 | 9 |
11 | 10 |
12 class Fixture(object): | 11 class Fixture(object): |
13 | 12 |
14 def __init__(self): | 13 def __init__(self): |
15 pass | 14 pass |
16 | 15 |
16 def _get_repo_create_params(self, **custom): | |
17 defs = dict( | |
18 repo_name=None, | |
19 repo_type='hg', | |
20 clone_uri='', | |
21 repo_group='', | |
22 repo_description='DESC', | |
23 repo_private=False, | |
24 repo_landing_rev='tip' | |
25 ) | |
26 defs.update(custom) | |
27 if 'repo_name_full' not in custom: | |
28 defs.update({'repo_name_full': defs['repo_name']}) | |
29 | |
30 return defs | |
31 | |
32 def _get_group_create_params(self, **custom): | |
33 defs = dict( | |
34 group_name=None, | |
35 group_description='DESC', | |
36 group_parent_id=None, | |
37 perms_updates=[], | |
38 perms_new=[], | |
39 enable_locking=False, | |
40 recursive=False | |
41 ) | |
42 defs.update(custom) | |
43 | |
44 return defs | |
45 | |
17 def create_repo(self, name, **kwargs): | 46 def create_repo(self, name, **kwargs): |
18 form_data = _get_repo_create_params(repo_name=name, **kwargs) | 47 if 'skip_if_exists' in kwargs: |
19 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | 48 del kwargs['skip_if_exists'] |
49 r = Repository.get_by_repo_name(name) | |
50 if r: | |
51 return r | |
52 | |
53 if isinstance(kwargs.get('repos_group'), RepoGroup): | |
54 #TODO: rename the repos_group ! | |
55 kwargs['repo_group'] = kwargs['repos_group'].group_id | |
56 del kwargs['repos_group'] | |
57 | |
58 form_data = self._get_repo_create_params(repo_name=name, **kwargs) | |
59 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) | |
20 RepoModel().create(form_data, cur_user) | 60 RepoModel().create(form_data, cur_user) |
61 Session().commit() | |
21 return Repository.get_by_repo_name(name) | 62 return Repository.get_by_repo_name(name) |
22 | 63 |
23 def create_fork(self, repo_to_fork, fork_name, **kwargs): | 64 def create_fork(self, repo_to_fork, fork_name, **kwargs): |
24 repo_to_fork = Repository.get_by_repo_name(repo_to_fork) | 65 repo_to_fork = Repository.get_by_repo_name(repo_to_fork) |
25 vcs_type = repo_to_fork.repo_type | |
26 | 66 |
27 form_data = dict( | 67 form_data = self._get_repo_create_params(repo_name=fork_name, |
28 repo_name=fork_name, | 68 fork_parent_id=repo_to_fork, |
29 repo_name_full=fork_name, | 69 repo_type=repo_to_fork.repo_type, |
30 repo_group=None, | 70 **kwargs) |
31 repo_type=vcs_type, | 71 form_data['update_after_clone'] = False |
32 description='', | |
33 private=False, | |
34 copy_permissions=False, | |
35 landing_rev='tip', | |
36 update_after_clone=False, | |
37 fork_parent_id=repo_to_fork, | |
38 ) | |
39 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) | |
40 RepoModel().create_fork(form_data, cur_user=cur_user) | |
41 | 72 |
73 #TODO: fix it !! | |
74 form_data['description'] = form_data['repo_description'] | |
75 form_data['private'] = form_data['repo_private'] | |
76 form_data['landing_rev'] = form_data['repo_landing_rev'] | |
77 | |
78 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) | |
79 RepoModel().create_fork(form_data, cur_user=owner) | |
42 Session().commit() | 80 Session().commit() |
43 return Repository.get_by_repo_name(fork_name) | 81 r = Repository.get_by_repo_name(fork_name) |
82 assert r | |
83 return r | |
84 | |
85 def destroy_repo(self, repo_name): | |
86 RepoModel().delete(repo_name) | |
87 Session().commit() | |
88 | |
89 def create_group(self, name, **kwargs): | |
90 if 'skip_if_exists' in kwargs: | |
91 del kwargs['skip_if_exists'] | |
92 gr = RepoGroup.get_by_group_name(group_name=name) | |
93 if gr: | |
94 return gr | |
95 form_data = self._get_group_create_params(group_name=name, **kwargs) | |
96 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) | |
97 gr = ReposGroupModel().create(group_name=form_data['group_name'], | |
98 group_description=form_data['group_name'], | |
99 owner=owner, parent=form_data['group_parent_id']) | |
100 Session().commit() | |
101 gr = RepoGroup.get_by_group_name(gr.group_name) | |
102 return gr |