changeset 3824:a5746b83123f beta

moved top-level tests to rhodecode/tests/other. - There are no toplevel tests - moved proper files to fixtures where they should be in the first place
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 08 May 2013 18:06:22 +0200
parents 972ad33c5610
children 3306bdd95f11
files rhodecode/lib/utils.py rhodecode/tests/fixtures/vcs_test_git.tar.gz rhodecode/tests/fixtures/vcs_test_hg.tar.gz rhodecode/tests/other/__init__.py rhodecode/tests/other/test_libs.py rhodecode/tests/other/test_validators.py rhodecode/tests/other/test_vcs_operations.py rhodecode/tests/scripts/test_vcs_operations.py rhodecode/tests/test_libs.py rhodecode/tests/test_validators.py rhodecode/tests/vcs_test_git.tar.gz rhodecode/tests/vcs_test_hg.tar.gz
diffstat 11 files changed, 1058 insertions(+), 1059 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/lib/utils.py	Wed May 08 17:43:59 2013 +0200
+++ b/rhodecode/lib/utils.py	Wed May 08 18:06:22 2013 +0200
@@ -657,12 +657,12 @@
 
     #CREATE DEFAULT TEST REPOS
     cur_dir = dn(dn(abspath(__file__)))
-    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
+    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
     tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
     tar.close()
 
     cur_dir = dn(dn(abspath(__file__)))
-    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
+    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
     tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
     tar.close()
 
Binary file rhodecode/tests/fixtures/vcs_test_git.tar.gz has changed
Binary file rhodecode/tests/fixtures/vcs_test_hg.tar.gz has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/other/test_libs.py	Wed May 08 18:06:22 2013 +0200
@@ -0,0 +1,295 @@
+# -*- coding: utf-8 -*-
+"""
+    rhodecode.tests.test_libs
+    ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+
+    Package for testing various lib/helper functions in rhodecode
+
+    :created_on: Jun 9, 2011
+    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+from __future__ import with_statement
+import unittest
+import datetime
+import hashlib
+import mock
+from rhodecode.tests import *
+
+proto = 'http'
+TEST_URLS = [
+    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
+     '%s://127.0.0.1:8080' % proto),
+    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
+     '%s://domain.org' % proto),
+    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
+                                                '8080'],
+     '%s://domain.org:8080' % proto),
+]
+
+proto = 'https'
+TEST_URLS += [
+    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
+     '%s://127.0.0.1' % proto),
+    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
+     '%s://127.0.0.1:8080' % proto),
+    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
+     '%s://domain.org' % proto),
+    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
+                                                '8080'],
+     '%s://domain.org:8080' % proto),
+]
+
+
+class TestLibs(unittest.TestCase):
+
+    @parameterized.expand(TEST_URLS)
+    def test_uri_filter(self, test_url, expected, expected_creds):
+        from rhodecode.lib.utils2 import uri_filter
+        self.assertEqual(uri_filter(test_url), expected)
+
+    @parameterized.expand(TEST_URLS)
+    def test_credentials_filter(self, test_url, expected, expected_creds):
+        from rhodecode.lib.utils2 import credentials_filter
+        self.assertEqual(credentials_filter(test_url), expected_creds)
+
+    @parameterized.expand([('t', True),
+                           ('true', True),
+                           ('y', True),
+                           ('yes', True),
+                           ('on', True),
+                           ('1', True),
+                           ('Y', True),
+                           ('yeS', True),
+                           ('Y', True),
+                           ('TRUE', True),
+                           ('T', True),
+                           ('False', False),
+                           ('F', False),
+                           ('FALSE', False),
+                           ('0', False),
+                           ('-1', False),
+                           ('', False)
+    ])
+    def test_str2bool(self, str_bool, expected):
+        from rhodecode.lib.utils2 import str2bool
+        self.assertEqual(str2bool(str_bool), expected)
+
+    def test_mention_extractor(self):
+        from rhodecode.lib.utils2 import extract_mentioned_users
+        sample = (
+            "@first hi there @marcink here's my email marcin@email.com "
+            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
+            "@MARCIN    @maRCiN @2one_more22 @john please see this http://org.pl "
+            "@marian.user just do it @marco-polo and next extract @marco_polo "
+            "user.dot  hej ! not-needed maril@domain.org"
+        )
+
+        s = sorted([
+        'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
+        'marian.user', 'marco-polo', 'marco_polo'
+        ], key=lambda k: k.lower())
+        self.assertEqual(s, extract_mentioned_users(sample))
+
+    @parameterized.expand([
+        (dict(), u'just now'),
+        (dict(seconds= -1), u'1 second ago'),
+        (dict(seconds= -60 * 2), u'2 minutes ago'),
+        (dict(hours= -1), u'1 hour ago'),
+        (dict(hours= -24), u'1 day ago'),
+        (dict(hours= -24 * 5), u'5 days ago'),
+        (dict(months= -1), u'1 month ago'),
+        (dict(months= -1, days= -2), u'1 month and 2 days ago'),
+        (dict(years= -1, months= -1), u'1 year and 1 month ago'),
+    ])
+    def test_age(self, age_args, expected):
+        from rhodecode.lib.utils2 import age
+        from dateutil import relativedelta
+        n = datetime.datetime(year=2012, month=5, day=17)
+        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
+        self.assertEqual(age(n + delt(**age_args), now=n), expected)
+
+    @parameterized.expand([
+
+        (dict(), u'just now'),
+        (dict(seconds=1), u'in 1 second'),
+        (dict(seconds=60 * 2), u'in 2 minutes'),
+        (dict(hours=1), u'in 1 hour'),
+        (dict(hours=24), u'in 1 day'),
+        (dict(hours=24 * 5), u'in 5 days'),
+        (dict(months=1), u'in 1 month'),
+        (dict(months=1, days=1), u'in 1 month and 1 day'),
+        (dict(years=1, months=1), u'in 1 year and 1 month')
+    ])
+    def test_age_in_future(self, age_args, expected):
+        from rhodecode.lib.utils2 import age
+        from dateutil import relativedelta
+        n = datetime.datetime(year=2012, month=5, day=17)
+        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
+        self.assertEqual(age(n + delt(**age_args), now=n), expected)
+
+    def test_tag_exctrator(self):
+        sample = (
+            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
+            "[requires] [stale] [see<>=>] [see => http://url.com]"
+            "[requires => url] [lang => python] [just a tag]"
+            "[,d] [ => ULR ] [obsolete] [desc]]"
+        )
+        from rhodecode.lib.helpers import desc_stylize
+        res = desc_stylize(sample)
+        self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
+        self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
+        self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
+        self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
+        self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
+        self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
+
+    def test_alternative_gravatar(self):
+        from rhodecode.lib.helpers import gravatar_url
+        _md5 = lambda s: hashlib.md5(s).hexdigest()
+
+        def fake_conf(**kwargs):
+            from pylons import config
+            config['app_conf'] = {}
+            config['app_conf']['use_gravatar'] = True
+            config['app_conf'].update(kwargs)
+            return config
+
+        class fake_url():
+            @classmethod
+            def current(cls, *args, **kwargs):
+                return 'https://server.com'
+
+        with mock.patch('pylons.url', fake_url):
+            fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
+            with mock.patch('pylons.config', fake):
+                    from pylons import url
+                    assert url.current() == 'https://server.com'
+                    grav = gravatar_url(email_address='test@foo.com', size=24)
+                    assert grav == 'http://test.com/test@foo.com'
+
+            fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
+            with mock.patch('pylons.config', fake):
+                grav = gravatar_url(email_address='test@foo.com', size=24)
+                assert grav == 'http://test.com/test@foo.com'
+
+            fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
+            with mock.patch('pylons.config', fake):
+                em = 'test@foo.com'
+                grav = gravatar_url(email_address=em, size=24)
+                assert grav == 'http://test.com/%s' % (_md5(em))
+
+            fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
+            with mock.patch('pylons.config', fake):
+                em = 'test@foo.com'
+                grav = gravatar_url(email_address=em, size=24)
+                assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
+
+            fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
+            with mock.patch('pylons.config', fake):
+                em = 'test@foo.com'
+                grav = gravatar_url(email_address=em, size=24)
+                assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
+
+    def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
+        """
+        Changes `some text url[foo]` => `some text <a href="/">foo</a>
+
+        :param text:
+        """
+        import re
+        # quickly change expected url[] into a link
+        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
+
+        def url_func(match_obj):
+            _url = match_obj.groups()[0]
+            return tmpl % (url_ or '/some-url', _url)
+        return URL_PAT.sub(url_func, text)
+
+    @parameterized.expand([
+      ("",
+       ""),
+      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
+       "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
+      ("from rev 000000000000",
+       "from rev url[000000000000]"),
+      ("from rev 000000000000123123 also rev 000000000000",
+       "from rev url[000000000000123123] also rev url[000000000000]"),
+      ("this should-000 00",
+       "this should-000 00"),
+      ("longtextffffffffff rev 123123123123",
+       "longtextffffffffff rev url[123123123123]"),
+      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
+       "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
+      ("ffffffffffff some text traalaa",
+       "url[ffffffffffff] some text traalaa"),
+       ("""Multi line
+       123123123123
+       some text 123123123123
+       sometimes !
+       """,
+       """Multi line
+       url[123123123123]
+       some text url[123123123123]
+       sometimes !
+       """)
+    ])
+    def test_urlify_changesets(self, sample, expected):
+        def fake_url(self, *args, **kwargs):
+            return '/some-url'
+
+        expected = self._quick_url(expected)
+
+        with mock.patch('pylons.url', fake_url):
+            from rhodecode.lib.helpers import urlify_changesets
+            self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
+
+    @parameterized.expand([
+      ("",
+       "",
+       ""),
+      ("https://svn.apache.org/repos",
+       "url[https://svn.apache.org/repos]",
+       "https://svn.apache.org/repos"),
+      ("http://svn.apache.org/repos",
+       "url[http://svn.apache.org/repos]",
+       "http://svn.apache.org/repos"),
+      ("from rev a also rev http://google.com",
+       "from rev a also rev url[http://google.com]",
+       "http://google.com"),
+       ("""Multi line
+       https://foo.bar.com
+       some text lalala""",
+       """Multi line
+       url[https://foo.bar.com]
+       some text lalala""",
+       "https://foo.bar.com")
+    ])
+    def test_urlify_test(self, sample, expected, url_):
+        from rhodecode.lib.helpers import urlify_text
+        expected = self._quick_url(expected,
+                                   tmpl="""<a href="%s">%s</a>""", url_=url_)
+        self.assertEqual(urlify_text(sample), expected)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/other/test_validators.py	Wed May 08 18:06:22 2013 +0200
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+import unittest
+import formencode
+
+from rhodecode.tests import *
+
+from rhodecode.model import validators as v
+from rhodecode.model.users_group import UserGroupModel
+
+from rhodecode.model.meta import Session
+from rhodecode.model.repos_group import ReposGroupModel
+from rhodecode.model.db import ChangesetStatus, Repository
+from rhodecode.model.changeset_status import ChangesetStatusModel
+from rhodecode.tests.fixture import Fixture
+
+fixture = Fixture()
+
+
+class TestReposGroups(unittest.TestCase):
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        Session.remove()
+
+    def test_Message_extractor(self):
+        validator = v.ValidUsername()
+        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
+
+        class StateObj(object):
+            pass
+
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, 'default', StateObj)
+
+    def test_ValidUsername(self):
+        validator = v.ValidUsername()
+
+        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
+        self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
+        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
+        self.assertRaises(formencode.Invalid, validator.to_python,
+                          TEST_USER_ADMIN_LOGIN)
+        self.assertEqual('test', validator.to_python('test'))
+
+        validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
+
+    def test_ValidRepoUser(self):
+        validator = v.ValidRepoUser()
+        self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
+        self.assertEqual(TEST_USER_ADMIN_LOGIN,
+                         validator.to_python(TEST_USER_ADMIN_LOGIN))
+
+    def test_ValidUserGroup(self):
+        validator = v.ValidUserGroup()
+        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
+        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
+
+        gr = fixture.create_user_group('test')
+        gr2 = fixture.create_user_group('tes2')
+        Session().commit()
+        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
+        assert gr.users_group_id != None
+        validator = v.ValidUserGroup(edit=True,
+                                    old_data={'users_group_id':
+                                              gr2.users_group_id})
+
+        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
+        self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
+        self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
+        UserGroupModel().delete(gr)
+        UserGroupModel().delete(gr2)
+        Session().commit()
+
+    def test_ValidReposGroup(self):
+        validator = v.ValidReposGroup()
+        model = ReposGroupModel()
+        self.assertRaises(formencode.Invalid, validator.to_python,
+                          {'group_name': HG_REPO, })
+        gr = model.create(group_name='test_gr', group_description='desc',
+                          parent=None,
+                          just_db=True,
+                          owner=TEST_USER_ADMIN_LOGIN)
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, {'group_name': gr.group_name, })
+
+        validator = v.ValidReposGroup(edit=True,
+                                      old_data={'group_id':  gr.group_id})
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, {
+                                        'group_name': gr.group_name + 'n',
+                                        'group_parent_id': gr.group_id
+                                        })
+        model.delete(gr)
+
+    def test_ValidPassword(self):
+        validator = v.ValidPassword()
+        self.assertEqual('lol', validator.to_python('lol'))
+        self.assertEqual(None, validator.to_python(None))
+        self.assertRaises(formencode.Invalid, validator.to_python, 'ąćżź')
+
+    def test_ValidPasswordsMatch(self):
+        validator = v.ValidPasswordsMatch()
+        self.assertRaises(formencode.Invalid,
+                    validator.to_python, {'password': 'pass',
+                                          'password_confirmation': 'pass2'})
+
+        self.assertRaises(formencode.Invalid,
+                    validator.to_python, {'new_password': 'pass',
+                                          'password_confirmation': 'pass2'})
+
+        self.assertEqual({'new_password': 'pass',
+                          'password_confirmation': 'pass'},
+                    validator.to_python({'new_password': 'pass',
+                                         'password_confirmation': 'pass'}))
+
+        self.assertEqual({'password': 'pass',
+                          'password_confirmation': 'pass'},
+                    validator.to_python({'password': 'pass',
+                                         'password_confirmation': 'pass'}))
+
+    def test_ValidAuth(self):
+        validator = v.ValidAuth()
+        valid_creds = {
+            'username': TEST_USER_REGULAR2_LOGIN,
+            'password': TEST_USER_REGULAR2_PASS,
+        }
+        invalid_creds = {
+            'username': 'err',
+            'password': 'err',
+        }
+        self.assertEqual(valid_creds, validator.to_python(valid_creds))
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, invalid_creds)
+
+    def test_ValidAuthToken(self):
+        validator = v.ValidAuthToken()
+        # this is untestable without a threadlocal
+#        self.assertRaises(formencode.Invalid,
+#                          validator.to_python, 'BadToken')
+        validator
+
+    def test_ValidRepoName(self):
+        validator = v.ValidRepoName()
+
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, {'repo_name': ''})
+
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, {'repo_name': HG_REPO})
+
+        gr = ReposGroupModel().create(group_name='group_test',
+                                      group_description='desc',
+                                      parent=None,
+                                      owner=TEST_USER_ADMIN_LOGIN)
+        self.assertRaises(formencode.Invalid,
+                          validator.to_python, {'repo_name': gr.group_name})
+
+        #TODO: write an error case for that ie. create a repo withinh a group
+#        self.assertRaises(formencode.Invalid,
+#                          validator.to_python, {'repo_name': 'some',
+#                                                'repo_group': gr.group_id})
+
+    def test_ValidForkName(self):
+        # this uses ValidRepoName validator
+        assert True
+
+    @parameterized.expand([
+        ('test', 'test'), ('lolz!', 'lolz'), ('  aavv', 'aavv'),
+        ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
+        ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
+        ('/]re po', 're-po')])
+    def test_SlugifyName(self, name, expected):
+        validator = v.SlugifyName()
+        self.assertEqual(expected, validator.to_python(name))
+
+    def test_ValidCloneUri(self):
+            #TODO: write this one
+            pass
+
+    def test_ValidForkType(self):
+            validator = v.ValidForkType(old_data={'repo_type': 'hg'})
+            self.assertEqual('hg', validator.to_python('hg'))
+            self.assertRaises(formencode.Invalid, validator.to_python, 'git')
+
+    def test_ValidPerms(self):
+            #TODO: write this one
+            pass
+
+    def test_ValidSettings(self):
+        validator = v.ValidSettings()
+        self.assertEqual({'pass': 'pass'},
+                         validator.to_python(value={'user': 'test',
+                                                    'pass': 'pass'}))
+
+        self.assertEqual({'user2': 'test', 'pass': 'pass'},
+                         validator.to_python(value={'user2': 'test',
+                                                    'pass': 'pass'}))
+
+    def test_ValidPath(self):
+            validator = v.ValidPath()
+            self.assertEqual(TESTS_TMP_PATH,
+                             validator.to_python(TESTS_TMP_PATH))
+            self.assertRaises(formencode.Invalid, validator.to_python,
+                              '/no_such_dir')
+
+    def test_UniqSystemEmail(self):
+        validator = v.UniqSystemEmail(old_data={})
+
+        self.assertEqual('mail@python.org',
+                         validator.to_python('MaiL@Python.org'))
+
+        email = TEST_USER_REGULAR2_EMAIL
+        self.assertRaises(formencode.Invalid, validator.to_python, email)
+
+    def test_ValidSystemEmail(self):
+        validator = v.ValidSystemEmail()
+        email = TEST_USER_REGULAR2_EMAIL
+
+        self.assertEqual(email, validator.to_python(email))
+        self.assertRaises(formencode.Invalid, validator.to_python, 'err')
+
+    def test_LdapLibValidator(self):
+        if ldap_lib_installed:
+            validator = v.LdapLibValidator()
+            self.assertEqual("DN", validator.to_python('DN'))
+        else:
+            validator = v.LdapLibValidator()
+            self.assertRaises(v.LdapImportError, validator.to_python, 'err')
+
+    def test_AttrLoginValidator(self):
+        validator = v.AttrLoginValidator()
+        self.assertEqual('DN_attr', validator.to_python('DN_attr'))
+
+    def test_NotReviewedRevisions(self):
+        repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
+        validator = v.NotReviewedRevisions(repo_id)
+        rev = '0' * 40
+        # add status for a rev, that should throw an error because it is already
+        # reviewed
+        new_status = ChangesetStatus()
+        new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
+        new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
+        new_status.status = ChangesetStatus.STATUS_APPROVED
+        new_status.comment = None
+        new_status.revision = rev
+        Session().add(new_status)
+        Session().commit()
+        try:
+            self.assertRaises(formencode.Invalid, validator.to_python, [rev])
+        finally:
+            Session().delete(new_status)
+            Session().commit()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/tests/other/test_vcs_operations.py	Wed May 08 18:06:22 2013 +0200
@@ -0,0 +1,507 @@
+# -*- coding: utf-8 -*-
+"""
+    rhodecode.tests.test_scm_operations
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Test suite for making push/pull operations.
+    Run using after doing paster serve test.ini::
+     RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
+
+    You must have git > 1.8.1 for tests to work fine
+
+    :created_on: Dec 30, 2010
+    :author: marcink
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import tempfile
+import unittest
+import time
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from rhodecode.tests import *
+from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\
+    CacheInvalidation
+from rhodecode.model.meta import Session
+from rhodecode.model.repo import RepoModel
+from rhodecode.model.user import UserModel
+
+DEBUG = True
+HOST = '127.0.0.1:5000'  # test host
+
+
+class Command(object):
+
+    def __init__(self, cwd):
+        self.cwd = cwd
+
+    def execute(self, cmd, *args):
+        """
+        Runs command on the system with given ``args``.
+        """
+
+        command = cmd + ' ' + ' '.join(args)
+        if DEBUG:
+            print '*** CMD %s ***' % command
+        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+        stdout, stderr = p.communicate()
+        if DEBUG:
+            print stdout, stderr
+        return stdout, stderr
+
+
+def _get_tmp_dir():
+    return tempfile.mkdtemp(prefix='rc_integration_test')
+
+
+def _construct_url(repo, dest=None, **kwargs):
+    if dest is None:
+        #make temp clone
+        dest = _get_tmp_dir()
+    params = {
+        'user': TEST_USER_ADMIN_LOGIN,
+        'passwd': TEST_USER_ADMIN_PASS,
+        'host': HOST,
+        'cloned_repo': repo,
+        'dest': dest
+    }
+    params.update(**kwargs)
+    if params['user'] and params['passwd']:
+        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
+    else:
+        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
+    return _url
+
+
+def _add_files_and_push(vcs, DEST, **kwargs):
+    """
+    Generate some files, add it to DEST repo and push back
+    vcs is git or hg and defines what VCS we want to make those files for
+
+    :param vcs:
+    :param DEST:
+    """
+    # commit some stuff into this repo
+    cwd = path = jn(DEST)
+    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
+    Command(cwd).execute('touch %s' % added_file)
+    Command(cwd).execute('%s add %s' % (vcs, added_file))
+
+    for i in xrange(kwargs.get('files_no', 3)):
+        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+        Command(cwd).execute(cmd)
+        author_str = 'Marcin Kuźminski <me@email.com>'
+        if vcs == 'hg':
+            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
+                i, author_str, added_file
+            )
+        elif vcs == 'git':
+            cmd = """git commit -m 'commited new %s' --author '%s' %s """ % (
+                i, author_str, added_file
+            )
+        Command(cwd).execute(cmd)
+    # PUSH it back
+    if vcs == 'hg':
+        _REPO = HG_REPO
+    elif vcs == 'git':
+        _REPO = GIT_REPO
+
+    kwargs['dest'] = ''
+    clone_url = _construct_url(_REPO, **kwargs)
+    if 'clone_url' in kwargs:
+        clone_url = kwargs['clone_url']
+    if vcs == 'hg':
+        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
+    elif vcs == 'git':
+        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
+
+    return stdout, stderr
+
+
+def set_anonymous_access(enable=True):
+    user = User.get_by_username(User.DEFAULT_USER)
+    user.active = enable
+    Session().add(user)
+    Session().commit()
+    print '\tanonymous access is now:', enable
+    if enable != User.get_by_username(User.DEFAULT_USER).active:
+        raise Exception('Cannot set anonymous access')
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+
+class TestVCSOperations(unittest.TestCase):
+
+    @classmethod
+    def setup_class(cls):
+        #DISABLE ANONYMOUS ACCESS
+        set_anonymous_access(False)
+
+    def setUp(self):
+        r = Repository.get_by_repo_name(GIT_REPO)
+        Repository.unlock(r)
+        r.enable_locking = False
+        Session().add(r)
+        Session().commit()
+
+        r = Repository.get_by_repo_name(HG_REPO)
+        Repository.unlock(r)
+        r.enable_locking = False
+        Session().add(r)
+        Session().commit()
+
+    def test_clone_hg_repo_by_admin(self):
+        clone_url = _construct_url(HG_REPO)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        assert 'requesting all changes' in stdout
+        assert 'adding changesets' in stdout
+        assert 'adding manifests' in stdout
+        assert 'adding file changes' in stdout
+
+        assert stderr == ''
+
+    def test_clone_git_repo_by_admin(self):
+        clone_url = _construct_url(GIT_REPO)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        assert 'Cloning into' in stdout
+        assert stderr == ''
+
+    def test_clone_wrong_credentials_hg(self):
+        clone_url = _construct_url(HG_REPO, passwd='bad!')
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+        assert 'abort: authorization failed' in stderr
+
+    def test_clone_wrong_credentials_git(self):
+        clone_url = _construct_url(GIT_REPO, passwd='bad!')
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+        assert 'fatal: Authentication failed' in stderr
+
+    def test_clone_git_dir_as_hg(self):
+        clone_url = _construct_url(GIT_REPO)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+        assert 'HTTP Error 404: Not Found' in stderr
+
+    def test_clone_hg_repo_as_git(self):
+        clone_url = _construct_url(HG_REPO)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+        assert 'not found:' in stderr
+
+    def test_clone_non_existing_path_hg(self):
+        clone_url = _construct_url('trololo')
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+        assert 'HTTP Error 404: Not Found' in stderr
+
+    def test_clone_non_existing_path_git(self):
+        clone_url = _construct_url('trololo')
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+        assert 'not found:' in stderr
+
+    def test_push_new_file_hg(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('hg', DEST)
+
+        assert 'pushing to' in stdout
+        assert 'Repository size' in stdout
+        assert 'Last revision is now' in stdout
+
+    def test_push_new_file_git(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(GIT_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        # commit some stuff into this repo
+        stdout, stderr = _add_files_and_push('git', DEST)
+
+        #WTF git stderr ?!
+        assert 'master -> master' in stderr
+
+    def test_push_invalidates_cache_hg(self):
+        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
+                                               ==HG_REPO).one()
+        key.cache_active = True
+        Session().add(key)
+        Session().commit()
+
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
+        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
+                                               ==HG_REPO).one()
+        self.assertEqual(key.cache_active, False)
+
+    def test_push_invalidates_cache_git(self):
+        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
+                                               ==GIT_REPO).one()
+        key.cache_active = True
+        Session().add(key)
+        Session().commit()
+
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(GIT_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        # commit some stuff into this repo
+        stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
+
+        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
+                                               ==GIT_REPO).one()
+        self.assertEqual(key.cache_active, False)
+
+    def test_push_wrong_credentials_hg(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
+                                             passwd='name')
+
+        assert 'abort: authorization failed' in stderr
+
+    def test_push_wrong_credentials_git(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(GIT_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
+                                             passwd='name')
+
+        assert 'fatal: Authentication failed' in stderr
+
+    def test_push_back_to_wrong_url_hg(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('hg', DEST,
+                                    clone_url='http://127.0.0.1:5000/tmp',)
+
+        assert 'HTTP Error 404: Not Found' in stderr
+
+    def test_push_back_to_wrong_url_git(self):
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(GIT_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        stdout, stderr = _add_files_and_push('git', DEST,
+                                    clone_url='http://127.0.0.1:5000/tmp',)
+
+        assert 'not found:' in stderr
+
+    def test_clone_and_create_lock_hg(self):
+        # enable locking
+        r = Repository.get_by_repo_name(HG_REPO)
+        r.enable_locking = True
+        Session().add(r)
+        Session().commit()
+        # clone
+        clone_url = _construct_url(HG_REPO)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        #check if lock was made
+        r = Repository.get_by_repo_name(HG_REPO)
+        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+    def test_clone_and_create_lock_git(self):
+        # enable locking
+        r = Repository.get_by_repo_name(GIT_REPO)
+        r.enable_locking = True
+        Session().add(r)
+        Session().commit()
+        # clone
+        clone_url = _construct_url(GIT_REPO)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        #check if lock was made
+        r = Repository.get_by_repo_name(GIT_REPO)
+        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+    def test_clone_after_repo_was_locked_hg(self):
+        #lock repo
+        r = Repository.get_by_repo_name(HG_REPO)
+        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+        #pull fails since repo is locked
+        clone_url = _construct_url(HG_REPO)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
+        assert msg in stderr
+
+    def test_clone_after_repo_was_locked_git(self):
+        #lock repo
+        r = Repository.get_by_repo_name(GIT_REPO)
+        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+        #pull fails since repo is locked
+        clone_url = _construct_url(GIT_REPO)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+        msg = ("""The requested URL returned error: 423""")
+        assert msg in stderr
+
+    def test_push_on_locked_repo_by_other_user_hg(self):
+        #clone some temp
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        #lock repo
+        r = Repository.get_by_repo_name(HG_REPO)
+        # let this user actually push !
+        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
+                                          perm='repository.write')
+        Session().commit()
+        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+
+        #push fails repo is locked by other user !
+        stdout, stderr = _add_files_and_push('hg', DEST,
+                                             user=TEST_USER_REGULAR_LOGIN,
+                                             passwd=TEST_USER_REGULAR_PASS)
+        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
+        assert msg in stderr
+
+#TODO: fix me ! somehow during tests hooks don't get called on GIT
+#    def test_push_on_locked_repo_by_other_user_git(self):
+#        #clone some temp
+#        DEST = _get_tmp_dir()
+#        clone_url = _construct_url(GIT_REPO, dest=DEST)
+#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+#
+#        #lock repo
+#        r = Repository.get_by_repo_name(GIT_REPO)
+#        # let this user actually push !
+#        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
+#                                          perm='repository.write')
+#        Session().commit()
+#        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+#
+#        #push fails repo is locked by other user !
+#        stdout, stderr = _add_files_and_push('git', DEST,
+#                                             user=TEST_USER_REGULAR_LOGIN,
+#                                             passwd=TEST_USER_REGULAR_PASS)
+#        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+#                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
+#        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
+#        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
+#        msg = "405 Method Not Allowed"
+#        assert msg in stderr
+
+    def test_push_unlocks_repository_hg(self):
+        # enable locking
+        r = Repository.get_by_repo_name(HG_REPO)
+        r.enable_locking = True
+        Session().add(r)
+        Session().commit()
+        #clone some temp
+        DEST = _get_tmp_dir()
+        clone_url = _construct_url(HG_REPO, dest=DEST)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        #check for lock repo after clone
+        r = Repository.get_by_repo_name(HG_REPO)
+        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+        #push is ok and repo is now unlocked
+        stdout, stderr = _add_files_and_push('hg', DEST)
+        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
+        #we need to cleanup the Session Here !
+        Session.remove()
+        r = Repository.get_by_repo_name(HG_REPO)
+        assert r.locked == [None, None]
+
+#TODO: fix me ! somehow during tests hooks don't get called on GIT
+#    def test_push_unlocks_repository_git(self):
+#        # enable locking
+#        r = Repository.get_by_repo_name(GIT_REPO)
+#        r.enable_locking = True
+#        Session().add(r)
+#        Session().commit()
+#        #clone some temp
+#        DEST = _get_tmp_dir()
+#        clone_url = _construct_url(GIT_REPO, dest=DEST)
+#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+#
+#        #check for lock repo after clone
+#        r = Repository.get_by_repo_name(GIT_REPO)
+#        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+#
+#        #push is ok and repo is now unlocked
+#        stdout, stderr = _add_files_and_push('git', DEST)
+#        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
+#        #we need to cleanup the Session Here !
+#        Session.remove()
+#        r = Repository.get_by_repo_name(GIT_REPO)
+#        assert r.locked == [None, None]
+
+    def test_ip_restriction_hg(self):
+        user_model = UserModel()
+        try:
+            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
+            Session().commit()
+            clone_url = _construct_url(HG_REPO)
+            stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+            assert 'abort: HTTP Error 403: Forbidden' in stderr
+        finally:
+            #release IP restrictions
+            for ip in UserIpMap.getAll():
+                UserIpMap.delete(ip.ip_id)
+            Session().commit()
+
+        time.sleep(2)
+        clone_url = _construct_url(HG_REPO)
+        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+        assert 'requesting all changes' in stdout
+        assert 'adding changesets' in stdout
+        assert 'adding manifests' in stdout
+        assert 'adding file changes' in stdout
+
+        assert stderr == ''
+
+    def test_ip_restriction_git(self):
+        user_model = UserModel()
+        try:
+            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
+            Session().commit()
+            clone_url = _construct_url(GIT_REPO)
+            stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+            msg = ("""The requested URL returned error: 403""")
+            assert msg in stderr
+        finally:
+            #release IP restrictions
+            for ip in UserIpMap.getAll():
+                UserIpMap.delete(ip.ip_id)
+            Session().commit()
+
+        time.sleep(2)
+        clone_url = _construct_url(GIT_REPO)
+        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+        assert 'Cloning into' in stdout
+        assert stderr == ''
--- a/rhodecode/tests/scripts/test_vcs_operations.py	Wed May 08 17:43:59 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,507 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    rhodecode.tests.test_scm_operations
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-    Test suite for making push/pull operations.
-    Run using after doing paster serve test.ini::
-     RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
-
-    You must have git > 1.8.1 for tests to work fine
-
-    :created_on: Dec 30, 2010
-    :author: marcink
-    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
-    :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import tempfile
-import unittest
-import time
-from os.path import join as jn
-from os.path import dirname as dn
-
-from tempfile import _RandomNameSequence
-from subprocess import Popen, PIPE
-
-from rhodecode.tests import *
-from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\
-    CacheInvalidation
-from rhodecode.model.meta import Session
-from rhodecode.model.repo import RepoModel
-from rhodecode.model.user import UserModel
-
-DEBUG = True
-HOST = '127.0.0.1:5000'  # test host
-
-
-class Command(object):
-
-    def __init__(self, cwd):
-        self.cwd = cwd
-
-    def execute(self, cmd, *args):
-        """
-        Runs command on the system with given ``args``.
-        """
-
-        command = cmd + ' ' + ' '.join(args)
-        if DEBUG:
-            print '*** CMD %s ***' % command
-        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
-        stdout, stderr = p.communicate()
-        if DEBUG:
-            print stdout, stderr
-        return stdout, stderr
-
-
-def _get_tmp_dir():
-    return tempfile.mkdtemp(prefix='rc_integration_test')
-
-
-def _construct_url(repo, dest=None, **kwargs):
-    if dest is None:
-        #make temp clone
-        dest = _get_tmp_dir()
-    params = {
-        'user': TEST_USER_ADMIN_LOGIN,
-        'passwd': TEST_USER_ADMIN_PASS,
-        'host': HOST,
-        'cloned_repo': repo,
-        'dest': dest
-    }
-    params.update(**kwargs)
-    if params['user'] and params['passwd']:
-        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
-    else:
-        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
-    return _url
-
-
-def _add_files_and_push(vcs, DEST, **kwargs):
-    """
-    Generate some files, add it to DEST repo and push back
-    vcs is git or hg and defines what VCS we want to make those files for
-
-    :param vcs:
-    :param DEST:
-    """
-    # commit some stuff into this repo
-    cwd = path = jn(DEST)
-    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
-    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
-    Command(cwd).execute('touch %s' % added_file)
-    Command(cwd).execute('%s add %s' % (vcs, added_file))
-
-    for i in xrange(kwargs.get('files_no', 3)):
-        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
-        Command(cwd).execute(cmd)
-        author_str = 'Marcin Kuźminski <me@email.com>'
-        if vcs == 'hg':
-            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
-                i, author_str, added_file
-            )
-        elif vcs == 'git':
-            cmd = """git commit -m 'commited new %s' --author '%s' %s """ % (
-                i, author_str, added_file
-            )
-        Command(cwd).execute(cmd)
-    # PUSH it back
-    if vcs == 'hg':
-        _REPO = HG_REPO
-    elif vcs == 'git':
-        _REPO = GIT_REPO
-
-    kwargs['dest'] = ''
-    clone_url = _construct_url(_REPO, **kwargs)
-    if 'clone_url' in kwargs:
-        clone_url = kwargs['clone_url']
-    if vcs == 'hg':
-        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
-    elif vcs == 'git':
-        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
-
-    return stdout, stderr
-
-
-def set_anonymous_access(enable=True):
-    user = User.get_by_username(User.DEFAULT_USER)
-    user.active = enable
-    Session().add(user)
-    Session().commit()
-    print '\tanonymous access is now:', enable
-    if enable != User.get_by_username(User.DEFAULT_USER).active:
-        raise Exception('Cannot set anonymous access')
-
-
-#==============================================================================
-# TESTS
-#==============================================================================
-
-class TestVCSOperations(unittest.TestCase):
-
-    @classmethod
-    def setup_class(cls):
-        #DISABLE ANONYMOUS ACCESS
-        set_anonymous_access(False)
-
-    def setUp(self):
-        r = Repository.get_by_repo_name(GIT_REPO)
-        Repository.unlock(r)
-        r.enable_locking = False
-        Session().add(r)
-        Session().commit()
-
-        r = Repository.get_by_repo_name(HG_REPO)
-        Repository.unlock(r)
-        r.enable_locking = False
-        Session().add(r)
-        Session().commit()
-
-    def test_clone_hg_repo_by_admin(self):
-        clone_url = _construct_url(HG_REPO)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        assert 'requesting all changes' in stdout
-        assert 'adding changesets' in stdout
-        assert 'adding manifests' in stdout
-        assert 'adding file changes' in stdout
-
-        assert stderr == ''
-
-    def test_clone_git_repo_by_admin(self):
-        clone_url = _construct_url(GIT_REPO)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        assert 'Cloning into' in stdout
-        assert stderr == ''
-
-    def test_clone_wrong_credentials_hg(self):
-        clone_url = _construct_url(HG_REPO, passwd='bad!')
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-        assert 'abort: authorization failed' in stderr
-
-    def test_clone_wrong_credentials_git(self):
-        clone_url = _construct_url(GIT_REPO, passwd='bad!')
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-        assert 'fatal: Authentication failed' in stderr
-
-    def test_clone_git_dir_as_hg(self):
-        clone_url = _construct_url(GIT_REPO)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-        assert 'HTTP Error 404: Not Found' in stderr
-
-    def test_clone_hg_repo_as_git(self):
-        clone_url = _construct_url(HG_REPO)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-        assert 'not found:' in stderr
-
-    def test_clone_non_existing_path_hg(self):
-        clone_url = _construct_url('trololo')
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-        assert 'HTTP Error 404: Not Found' in stderr
-
-    def test_clone_non_existing_path_git(self):
-        clone_url = _construct_url('trololo')
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-        assert 'not found:' in stderr
-
-    def test_push_new_file_hg(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('hg', DEST)
-
-        assert 'pushing to' in stdout
-        assert 'Repository size' in stdout
-        assert 'Last revision is now' in stdout
-
-    def test_push_new_file_git(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        # commit some stuff into this repo
-        stdout, stderr = _add_files_and_push('git', DEST)
-
-        #WTF git stderr ?!
-        assert 'master -> master' in stderr
-
-    def test_push_invalidates_cache_hg(self):
-        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
-                                               ==HG_REPO).one()
-        key.cache_active = True
-        Session().add(key)
-        Session().commit()
-
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
-        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
-                                               ==HG_REPO).one()
-        self.assertEqual(key.cache_active, False)
-
-    def test_push_invalidates_cache_git(self):
-        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
-                                               ==GIT_REPO).one()
-        key.cache_active = True
-        Session().add(key)
-        Session().commit()
-
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        # commit some stuff into this repo
-        stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
-
-        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
-                                               ==GIT_REPO).one()
-        self.assertEqual(key.cache_active, False)
-
-    def test_push_wrong_credentials_hg(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
-                                             passwd='name')
-
-        assert 'abort: authorization failed' in stderr
-
-    def test_push_wrong_credentials_git(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
-                                             passwd='name')
-
-        assert 'fatal: Authentication failed' in stderr
-
-    def test_push_back_to_wrong_url_hg(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('hg', DEST,
-                                    clone_url='http://127.0.0.1:5000/tmp',)
-
-        assert 'HTTP Error 404: Not Found' in stderr
-
-    def test_push_back_to_wrong_url_git(self):
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(GIT_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        stdout, stderr = _add_files_and_push('git', DEST,
-                                    clone_url='http://127.0.0.1:5000/tmp',)
-
-        assert 'not found:' in stderr
-
-    def test_clone_and_create_lock_hg(self):
-        # enable locking
-        r = Repository.get_by_repo_name(HG_REPO)
-        r.enable_locking = True
-        Session().add(r)
-        Session().commit()
-        # clone
-        clone_url = _construct_url(HG_REPO)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        #check if lock was made
-        r = Repository.get_by_repo_name(HG_REPO)
-        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
-
-    def test_clone_and_create_lock_git(self):
-        # enable locking
-        r = Repository.get_by_repo_name(GIT_REPO)
-        r.enable_locking = True
-        Session().add(r)
-        Session().commit()
-        # clone
-        clone_url = _construct_url(GIT_REPO)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        #check if lock was made
-        r = Repository.get_by_repo_name(GIT_REPO)
-        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
-
-    def test_clone_after_repo_was_locked_hg(self):
-        #lock repo
-        r = Repository.get_by_repo_name(HG_REPO)
-        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
-        #pull fails since repo is locked
-        clone_url = _construct_url(HG_REPO)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
-                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
-        assert msg in stderr
-
-    def test_clone_after_repo_was_locked_git(self):
-        #lock repo
-        r = Repository.get_by_repo_name(GIT_REPO)
-        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
-        #pull fails since repo is locked
-        clone_url = _construct_url(GIT_REPO)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-        msg = ("""The requested URL returned error: 423""")
-        assert msg in stderr
-
-    def test_push_on_locked_repo_by_other_user_hg(self):
-        #clone some temp
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        #lock repo
-        r = Repository.get_by_repo_name(HG_REPO)
-        # let this user actually push !
-        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
-                                          perm='repository.write')
-        Session().commit()
-        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
-
-        #push fails repo is locked by other user !
-        stdout, stderr = _add_files_and_push('hg', DEST,
-                                             user=TEST_USER_REGULAR_LOGIN,
-                                             passwd=TEST_USER_REGULAR_PASS)
-        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
-                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
-        assert msg in stderr
-
-#TODO: fix me ! somehow during tests hooks don't get called on GIT
-#    def test_push_on_locked_repo_by_other_user_git(self):
-#        #clone some temp
-#        DEST = _get_tmp_dir()
-#        clone_url = _construct_url(GIT_REPO, dest=DEST)
-#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-#
-#        #lock repo
-#        r = Repository.get_by_repo_name(GIT_REPO)
-#        # let this user actually push !
-#        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
-#                                          perm='repository.write')
-#        Session().commit()
-#        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
-#
-#        #push fails repo is locked by other user !
-#        stdout, stderr = _add_files_and_push('git', DEST,
-#                                             user=TEST_USER_REGULAR_LOGIN,
-#                                             passwd=TEST_USER_REGULAR_PASS)
-#        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
-#                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
-#        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
-#        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
-#        msg = "405 Method Not Allowed"
-#        assert msg in stderr
-
-    def test_push_unlocks_repository_hg(self):
-        # enable locking
-        r = Repository.get_by_repo_name(HG_REPO)
-        r.enable_locking = True
-        Session().add(r)
-        Session().commit()
-        #clone some temp
-        DEST = _get_tmp_dir()
-        clone_url = _construct_url(HG_REPO, dest=DEST)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        #check for lock repo after clone
-        r = Repository.get_by_repo_name(HG_REPO)
-        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
-
-        #push is ok and repo is now unlocked
-        stdout, stderr = _add_files_and_push('hg', DEST)
-        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
-        #we need to cleanup the Session Here !
-        Session.remove()
-        r = Repository.get_by_repo_name(HG_REPO)
-        assert r.locked == [None, None]
-
-#TODO: fix me ! somehow during tests hooks don't get called on GIT
-#    def test_push_unlocks_repository_git(self):
-#        # enable locking
-#        r = Repository.get_by_repo_name(GIT_REPO)
-#        r.enable_locking = True
-#        Session().add(r)
-#        Session().commit()
-#        #clone some temp
-#        DEST = _get_tmp_dir()
-#        clone_url = _construct_url(GIT_REPO, dest=DEST)
-#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-#
-#        #check for lock repo after clone
-#        r = Repository.get_by_repo_name(GIT_REPO)
-#        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
-#
-#        #push is ok and repo is now unlocked
-#        stdout, stderr = _add_files_and_push('git', DEST)
-#        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
-#        #we need to cleanup the Session Here !
-#        Session.remove()
-#        r = Repository.get_by_repo_name(GIT_REPO)
-#        assert r.locked == [None, None]
-
-    def test_ip_restriction_hg(self):
-        user_model = UserModel()
-        try:
-            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
-            Session().commit()
-            clone_url = _construct_url(HG_REPO)
-            stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-            assert 'abort: HTTP Error 403: Forbidden' in stderr
-        finally:
-            #release IP restrictions
-            for ip in UserIpMap.getAll():
-                UserIpMap.delete(ip.ip_id)
-            Session().commit()
-
-        time.sleep(2)
-        clone_url = _construct_url(HG_REPO)
-        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
-        assert 'requesting all changes' in stdout
-        assert 'adding changesets' in stdout
-        assert 'adding manifests' in stdout
-        assert 'adding file changes' in stdout
-
-        assert stderr == ''
-
-    def test_ip_restriction_git(self):
-        user_model = UserModel()
-        try:
-            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
-            Session().commit()
-            clone_url = _construct_url(GIT_REPO)
-            stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-            msg = ("""The requested URL returned error: 403""")
-            assert msg in stderr
-        finally:
-            #release IP restrictions
-            for ip in UserIpMap.getAll():
-                UserIpMap.delete(ip.ip_id)
-            Session().commit()
-
-        time.sleep(2)
-        clone_url = _construct_url(GIT_REPO)
-        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
-        assert 'Cloning into' in stdout
-        assert stderr == ''
--- a/rhodecode/tests/test_libs.py	Wed May 08 17:43:59 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,295 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    rhodecode.tests.test_libs
-    ~~~~~~~~~~~~~~~~~~~~~~~~~
-
-
-    Package for testing various lib/helper functions in rhodecode
-
-    :created_on: Jun 9, 2011
-    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
-    :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-from __future__ import with_statement
-import unittest
-import datetime
-import hashlib
-import mock
-from rhodecode.tests import *
-
-proto = 'http'
-TEST_URLS = [
-    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
-     '%s://127.0.0.1:8080' % proto),
-    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
-     '%s://domain.org' % proto),
-    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
-                                                '8080'],
-     '%s://domain.org:8080' % proto),
-]
-
-proto = 'https'
-TEST_URLS += [
-    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
-     '%s://127.0.0.1' % proto),
-    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
-     '%s://127.0.0.1:8080' % proto),
-    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
-     '%s://domain.org' % proto),
-    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
-                                                '8080'],
-     '%s://domain.org:8080' % proto),
-]
-
-
-class TestLibs(unittest.TestCase):
-
-    @parameterized.expand(TEST_URLS)
-    def test_uri_filter(self, test_url, expected, expected_creds):
-        from rhodecode.lib.utils2 import uri_filter
-        self.assertEqual(uri_filter(test_url), expected)
-
-    @parameterized.expand(TEST_URLS)
-    def test_credentials_filter(self, test_url, expected, expected_creds):
-        from rhodecode.lib.utils2 import credentials_filter
-        self.assertEqual(credentials_filter(test_url), expected_creds)
-
-    @parameterized.expand([('t', True),
-                           ('true', True),
-                           ('y', True),
-                           ('yes', True),
-                           ('on', True),
-                           ('1', True),
-                           ('Y', True),
-                           ('yeS', True),
-                           ('Y', True),
-                           ('TRUE', True),
-                           ('T', True),
-                           ('False', False),
-                           ('F', False),
-                           ('FALSE', False),
-                           ('0', False),
-                           ('-1', False),
-                           ('', False)
-    ])
-    def test_str2bool(self, str_bool, expected):
-        from rhodecode.lib.utils2 import str2bool
-        self.assertEqual(str2bool(str_bool), expected)
-
-    def test_mention_extractor(self):
-        from rhodecode.lib.utils2 import extract_mentioned_users
-        sample = (
-            "@first hi there @marcink here's my email marcin@email.com "
-            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
-            "@MARCIN    @maRCiN @2one_more22 @john please see this http://org.pl "
-            "@marian.user just do it @marco-polo and next extract @marco_polo "
-            "user.dot  hej ! not-needed maril@domain.org"
-        )
-
-        s = sorted([
-        'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
-        'marian.user', 'marco-polo', 'marco_polo'
-        ], key=lambda k: k.lower())
-        self.assertEqual(s, extract_mentioned_users(sample))
-
-    @parameterized.expand([
-        (dict(), u'just now'),
-        (dict(seconds= -1), u'1 second ago'),
-        (dict(seconds= -60 * 2), u'2 minutes ago'),
-        (dict(hours= -1), u'1 hour ago'),
-        (dict(hours= -24), u'1 day ago'),
-        (dict(hours= -24 * 5), u'5 days ago'),
-        (dict(months= -1), u'1 month ago'),
-        (dict(months= -1, days= -2), u'1 month and 2 days ago'),
-        (dict(years= -1, months= -1), u'1 year and 1 month ago'),
-    ])
-    def test_age(self, age_args, expected):
-        from rhodecode.lib.utils2 import age
-        from dateutil import relativedelta
-        n = datetime.datetime(year=2012, month=5, day=17)
-        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
-        self.assertEqual(age(n + delt(**age_args), now=n), expected)
-
-    @parameterized.expand([
-
-        (dict(), u'just now'),
-        (dict(seconds=1), u'in 1 second'),
-        (dict(seconds=60 * 2), u'in 2 minutes'),
-        (dict(hours=1), u'in 1 hour'),
-        (dict(hours=24), u'in 1 day'),
-        (dict(hours=24 * 5), u'in 5 days'),
-        (dict(months=1), u'in 1 month'),
-        (dict(months=1, days=1), u'in 1 month and 1 day'),
-        (dict(years=1, months=1), u'in 1 year and 1 month')
-    ])
-    def test_age_in_future(self, age_args, expected):
-        from rhodecode.lib.utils2 import age
-        from dateutil import relativedelta
-        n = datetime.datetime(year=2012, month=5, day=17)
-        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
-        self.assertEqual(age(n + delt(**age_args), now=n), expected)
-
-    def test_tag_exctrator(self):
-        sample = (
-            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
-            "[requires] [stale] [see<>=>] [see => http://url.com]"
-            "[requires => url] [lang => python] [just a tag]"
-            "[,d] [ => ULR ] [obsolete] [desc]]"
-        )
-        from rhodecode.lib.helpers import desc_stylize
-        res = desc_stylize(sample)
-        self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
-        self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
-        self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
-        self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
-        self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
-        self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
-
-    def test_alternative_gravatar(self):
-        from rhodecode.lib.helpers import gravatar_url
-        _md5 = lambda s: hashlib.md5(s).hexdigest()
-
-        def fake_conf(**kwargs):
-            from pylons import config
-            config['app_conf'] = {}
-            config['app_conf']['use_gravatar'] = True
-            config['app_conf'].update(kwargs)
-            return config
-
-        class fake_url():
-            @classmethod
-            def current(cls, *args, **kwargs):
-                return 'https://server.com'
-
-        with mock.patch('pylons.url', fake_url):
-            fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
-            with mock.patch('pylons.config', fake):
-                    from pylons import url
-                    assert url.current() == 'https://server.com'
-                    grav = gravatar_url(email_address='test@foo.com', size=24)
-                    assert grav == 'http://test.com/test@foo.com'
-
-            fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
-            with mock.patch('pylons.config', fake):
-                grav = gravatar_url(email_address='test@foo.com', size=24)
-                assert grav == 'http://test.com/test@foo.com'
-
-            fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
-            with mock.patch('pylons.config', fake):
-                em = 'test@foo.com'
-                grav = gravatar_url(email_address=em, size=24)
-                assert grav == 'http://test.com/%s' % (_md5(em))
-
-            fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
-            with mock.patch('pylons.config', fake):
-                em = 'test@foo.com'
-                grav = gravatar_url(email_address=em, size=24)
-                assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
-
-            fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
-            with mock.patch('pylons.config', fake):
-                em = 'test@foo.com'
-                grav = gravatar_url(email_address=em, size=24)
-                assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
-
-    def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
-        """
-        Changes `some text url[foo]` => `some text <a href="/">foo</a>
-
-        :param text:
-        """
-        import re
-        # quickly change expected url[] into a link
-        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
-
-        def url_func(match_obj):
-            _url = match_obj.groups()[0]
-            return tmpl % (url_ or '/some-url', _url)
-        return URL_PAT.sub(url_func, text)
-
-    @parameterized.expand([
-      ("",
-       ""),
-      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
-       "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
-      ("from rev 000000000000",
-       "from rev url[000000000000]"),
-      ("from rev 000000000000123123 also rev 000000000000",
-       "from rev url[000000000000123123] also rev url[000000000000]"),
-      ("this should-000 00",
-       "this should-000 00"),
-      ("longtextffffffffff rev 123123123123",
-       "longtextffffffffff rev url[123123123123]"),
-      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
-       "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
-      ("ffffffffffff some text traalaa",
-       "url[ffffffffffff] some text traalaa"),
-       ("""Multi line
-       123123123123
-       some text 123123123123
-       sometimes !
-       """,
-       """Multi line
-       url[123123123123]
-       some text url[123123123123]
-       sometimes !
-       """)
-    ])
-    def test_urlify_changesets(self, sample, expected):
-        def fake_url(self, *args, **kwargs):
-            return '/some-url'
-
-        expected = self._quick_url(expected)
-
-        with mock.patch('pylons.url', fake_url):
-            from rhodecode.lib.helpers import urlify_changesets
-            self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
-
-    @parameterized.expand([
-      ("",
-       "",
-       ""),
-      ("https://svn.apache.org/repos",
-       "url[https://svn.apache.org/repos]",
-       "https://svn.apache.org/repos"),
-      ("http://svn.apache.org/repos",
-       "url[http://svn.apache.org/repos]",
-       "http://svn.apache.org/repos"),
-      ("from rev a also rev http://google.com",
-       "from rev a also rev url[http://google.com]",
-       "http://google.com"),
-       ("""Multi line
-       https://foo.bar.com
-       some text lalala""",
-       """Multi line
-       url[https://foo.bar.com]
-       some text lalala""",
-       "https://foo.bar.com")
-    ])
-    def test_urlify_test(self, sample, expected, url_):
-        from rhodecode.lib.helpers import urlify_text
-        expected = self._quick_url(expected,
-                                   tmpl="""<a href="%s">%s</a>""", url_=url_)
-        self.assertEqual(urlify_text(sample), expected)
--- a/rhodecode/tests/test_validators.py	Wed May 08 17:43:59 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,255 +0,0 @@
-# -*- coding: utf-8 -*-
-import unittest
-import formencode
-
-from rhodecode.tests import *
-
-from rhodecode.model import validators as v
-from rhodecode.model.users_group import UserGroupModel
-
-from rhodecode.model.meta import Session
-from rhodecode.model.repos_group import ReposGroupModel
-from rhodecode.config.routing import ADMIN_PREFIX
-from rhodecode.model.db import ChangesetStatus, Repository
-from rhodecode.model.changeset_status import ChangesetStatusModel
-from rhodecode.tests.fixture import Fixture
-
-fixture = Fixture()
-
-
-class TestReposGroups(unittest.TestCase):
-
-    def setUp(self):
-        pass
-
-    def tearDown(self):
-        Session.remove()
-
-    def test_Message_extractor(self):
-        validator = v.ValidUsername()
-        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
-
-        class StateObj(object):
-            pass
-
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, 'default', StateObj)
-
-    def test_ValidUsername(self):
-        validator = v.ValidUsername()
-
-        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
-        self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
-        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
-        self.assertRaises(formencode.Invalid, validator.to_python,
-                          TEST_USER_ADMIN_LOGIN)
-        self.assertEqual('test', validator.to_python('test'))
-
-        validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
-
-    def test_ValidRepoUser(self):
-        validator = v.ValidRepoUser()
-        self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
-        self.assertEqual(TEST_USER_ADMIN_LOGIN,
-                         validator.to_python(TEST_USER_ADMIN_LOGIN))
-
-    def test_ValidUserGroup(self):
-        validator = v.ValidUserGroup()
-        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
-        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
-
-        gr = fixture.create_user_group('test')
-        gr2 = fixture.create_user_group('tes2')
-        Session().commit()
-        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
-        assert gr.users_group_id != None
-        validator = v.ValidUserGroup(edit=True,
-                                    old_data={'users_group_id':
-                                              gr2.users_group_id})
-
-        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
-        self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
-        self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
-        UserGroupModel().delete(gr)
-        UserGroupModel().delete(gr2)
-        Session().commit()
-
-    def test_ValidReposGroup(self):
-        validator = v.ValidReposGroup()
-        model = ReposGroupModel()
-        self.assertRaises(formencode.Invalid, validator.to_python,
-                          {'group_name': HG_REPO, })
-        gr = model.create(group_name='test_gr', group_description='desc',
-                          parent=None,
-                          just_db=True,
-                          owner=TEST_USER_ADMIN_LOGIN)
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, {'group_name': gr.group_name, })
-
-        validator = v.ValidReposGroup(edit=True,
-                                      old_data={'group_id':  gr.group_id})
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, {
-                                        'group_name': gr.group_name + 'n',
-                                        'group_parent_id': gr.group_id
-                                        })
-        model.delete(gr)
-
-    def test_ValidPassword(self):
-        validator = v.ValidPassword()
-        self.assertEqual('lol', validator.to_python('lol'))
-        self.assertEqual(None, validator.to_python(None))
-        self.assertRaises(formencode.Invalid, validator.to_python, 'ąćżź')
-
-    def test_ValidPasswordsMatch(self):
-        validator = v.ValidPasswordsMatch()
-        self.assertRaises(formencode.Invalid,
-                    validator.to_python, {'password': 'pass',
-                                          'password_confirmation': 'pass2'})
-
-        self.assertRaises(formencode.Invalid,
-                    validator.to_python, {'new_password': 'pass',
-                                          'password_confirmation': 'pass2'})
-
-        self.assertEqual({'new_password': 'pass',
-                          'password_confirmation': 'pass'},
-                    validator.to_python({'new_password': 'pass',
-                                         'password_confirmation': 'pass'}))
-
-        self.assertEqual({'password': 'pass',
-                          'password_confirmation': 'pass'},
-                    validator.to_python({'password': 'pass',
-                                         'password_confirmation': 'pass'}))
-
-    def test_ValidAuth(self):
-        validator = v.ValidAuth()
-        valid_creds = {
-            'username': TEST_USER_REGULAR2_LOGIN,
-            'password': TEST_USER_REGULAR2_PASS,
-        }
-        invalid_creds = {
-            'username': 'err',
-            'password': 'err',
-        }
-        self.assertEqual(valid_creds, validator.to_python(valid_creds))
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, invalid_creds)
-
-    def test_ValidAuthToken(self):
-        validator = v.ValidAuthToken()
-        # this is untestable without a threadlocal
-#        self.assertRaises(formencode.Invalid,
-#                          validator.to_python, 'BadToken')
-        validator
-
-    def test_ValidRepoName(self):
-        validator = v.ValidRepoName()
-
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, {'repo_name': ''})
-
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, {'repo_name': HG_REPO})
-
-        gr = ReposGroupModel().create(group_name='group_test',
-                                      group_description='desc',
-                                      parent=None,
-                                      owner=TEST_USER_ADMIN_LOGIN)
-        self.assertRaises(formencode.Invalid,
-                          validator.to_python, {'repo_name': gr.group_name})
-
-        #TODO: write an error case for that ie. create a repo withinh a group
-#        self.assertRaises(formencode.Invalid,
-#                          validator.to_python, {'repo_name': 'some',
-#                                                'repo_group': gr.group_id})
-
-    def test_ValidForkName(self):
-        # this uses ValidRepoName validator
-        assert True
-
-    @parameterized.expand([
-        ('test', 'test'), ('lolz!', 'lolz'), ('  aavv', 'aavv'),
-        ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
-        ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
-        ('/]re po', 're-po')])
-    def test_SlugifyName(self, name, expected):
-        validator = v.SlugifyName()
-        self.assertEqual(expected, validator.to_python(name))
-
-    def test_ValidCloneUri(self):
-            #TODO: write this one
-            pass
-
-    def test_ValidForkType(self):
-            validator = v.ValidForkType(old_data={'repo_type': 'hg'})
-            self.assertEqual('hg', validator.to_python('hg'))
-            self.assertRaises(formencode.Invalid, validator.to_python, 'git')
-
-    def test_ValidPerms(self):
-            #TODO: write this one
-            pass
-
-    def test_ValidSettings(self):
-        validator = v.ValidSettings()
-        self.assertEqual({'pass': 'pass'},
-                         validator.to_python(value={'user': 'test',
-                                                    'pass': 'pass'}))
-
-        self.assertEqual({'user2': 'test', 'pass': 'pass'},
-                         validator.to_python(value={'user2': 'test',
-                                                    'pass': 'pass'}))
-
-    def test_ValidPath(self):
-            validator = v.ValidPath()
-            self.assertEqual(TESTS_TMP_PATH,
-                             validator.to_python(TESTS_TMP_PATH))
-            self.assertRaises(formencode.Invalid, validator.to_python,
-                              '/no_such_dir')
-
-    def test_UniqSystemEmail(self):
-        validator = v.UniqSystemEmail(old_data={})
-
-        self.assertEqual('mail@python.org',
-                         validator.to_python('MaiL@Python.org'))
-
-        email = TEST_USER_REGULAR2_EMAIL
-        self.assertRaises(formencode.Invalid, validator.to_python, email)
-
-    def test_ValidSystemEmail(self):
-        validator = v.ValidSystemEmail()
-        email = TEST_USER_REGULAR2_EMAIL
-
-        self.assertEqual(email, validator.to_python(email))
-        self.assertRaises(formencode.Invalid, validator.to_python, 'err')
-
-    def test_LdapLibValidator(self):
-        if ldap_lib_installed:
-            validator = v.LdapLibValidator()
-            self.assertEqual("DN", validator.to_python('DN'))
-        else:
-            validator = v.LdapLibValidator()
-            self.assertRaises(v.LdapImportError, validator.to_python, 'err')
-
-    def test_AttrLoginValidator(self):
-        validator = v.AttrLoginValidator()
-        self.assertEqual('DN_attr', validator.to_python('DN_attr'))
-
-    def test_NotReviewedRevisions(self):
-        repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
-        validator = v.NotReviewedRevisions(repo_id)
-        rev = '0' * 40
-        # add status for a rev, that should throw an error because it is already
-        # reviewed
-        new_status = ChangesetStatus()
-        new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
-        new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
-        new_status.status = ChangesetStatus.STATUS_APPROVED
-        new_status.comment = None
-        new_status.revision = rev
-        Session().add(new_status)
-        Session().commit()
-        try:
-            self.assertRaises(formencode.Invalid, validator.to_python, [rev])
-        finally:
-            Session().delete(new_status)
-            Session().commit()
Binary file rhodecode/tests/vcs_test_git.tar.gz has changed
Binary file rhodecode/tests/vcs_test_hg.tar.gz has changed