diff rhodecode/tests/fixture.py @ 3647:8a86836fad64 beta

more usage of fixture tools in tests
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 01 Apr 2013 23:45:25 +0200
parents 749dfd6b6e95
children 7e3d89d9d3a2
line wrap: on
line diff
--- a/rhodecode/tests/fixture.py	Sun Mar 31 21:44:27 2013 +0200
+++ b/rhodecode/tests/fixture.py	Mon Apr 01 23:45:25 2013 +0200
@@ -1,12 +1,11 @@
 """
 Helpers for fixture generation
 """
-import os
-import unittest
 from rhodecode.tests import *
-from rhodecode.model.db import Repository, User
+from rhodecode.model.db import Repository, User, RepoGroup
 from rhodecode.model.meta import Session
 from rhodecode.model.repo import RepoModel
+from rhodecode.model.repos_group import ReposGroupModel
 
 
 class Fixture(object):
@@ -14,30 +13,90 @@
     def __init__(self):
         pass
 
+    def _get_repo_create_params(self, **custom):
+        defs = dict(
+            repo_name=None,
+            repo_type='hg',
+            clone_uri='',
+            repo_group='',
+            repo_description='DESC',
+            repo_private=False,
+            repo_landing_rev='tip'
+        )
+        defs.update(custom)
+        if 'repo_name_full' not in custom:
+            defs.update({'repo_name_full': defs['repo_name']})
+
+        return defs
+
+    def _get_group_create_params(self, **custom):
+        defs = dict(
+            group_name=None,
+            group_description='DESC',
+            group_parent_id=None,
+            perms_updates=[],
+            perms_new=[],
+            enable_locking=False,
+            recursive=False
+        )
+        defs.update(custom)
+
+        return defs
+
     def create_repo(self, name, **kwargs):
-        form_data = _get_repo_create_params(repo_name=name, **kwargs)
-        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
+        if 'skip_if_exists' in kwargs:
+            del kwargs['skip_if_exists']
+            r = Repository.get_by_repo_name(name)
+            if r:
+                return r
+
+        if isinstance(kwargs.get('repos_group'), RepoGroup):
+            #TODO: rename the repos_group !
+            kwargs['repo_group'] = kwargs['repos_group'].group_id
+            del kwargs['repos_group']
+
+        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
+        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
         RepoModel().create(form_data, cur_user)
+        Session().commit()
         return Repository.get_by_repo_name(name)
 
     def create_fork(self, repo_to_fork, fork_name, **kwargs):
         repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
-        vcs_type = repo_to_fork.repo_type
+
+        form_data = self._get_repo_create_params(repo_name=fork_name,
+                                            fork_parent_id=repo_to_fork,
+                                            repo_type=repo_to_fork.repo_type,
+                                            **kwargs)
+        form_data['update_after_clone'] = False
+
+        #TODO: fix it !!
+        form_data['description'] = form_data['repo_description']
+        form_data['private'] = form_data['repo_private']
+        form_data['landing_rev'] = form_data['repo_landing_rev']
+
+        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
+        RepoModel().create_fork(form_data, cur_user=owner)
+        Session().commit()
+        r = Repository.get_by_repo_name(fork_name)
+        assert r
+        return r
 
-        form_data = dict(
-            repo_name=fork_name,
-            repo_name_full=fork_name,
-            repo_group=None,
-            repo_type=vcs_type,
-            description='',
-            private=False,
-            copy_permissions=False,
-            landing_rev='tip',
-            update_after_clone=False,
-            fork_parent_id=repo_to_fork,
-        )
-        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
-        RepoModel().create_fork(form_data, cur_user=cur_user)
+    def destroy_repo(self, repo_name):
+        RepoModel().delete(repo_name)
+        Session().commit()
 
+    def create_group(self, name, **kwargs):
+        if 'skip_if_exists' in kwargs:
+            del kwargs['skip_if_exists']
+            gr = RepoGroup.get_by_group_name(group_name=name)
+            if gr:
+                return gr
+        form_data = self._get_group_create_params(group_name=name, **kwargs)
+        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
+        gr = ReposGroupModel().create(group_name=form_data['group_name'],
+                                 group_description=form_data['group_name'],
+                                 owner=owner, parent=form_data['group_parent_id'])
         Session().commit()
-        return Repository.get_by_repo_name(fork_name)
+        gr = RepoGroup.get_by_group_name(gr.group_name)
+        return gr