# HG changeset patch # User Marcin Kuzminski # Date 1368029182 -7200 # Node ID a5746b83123f5e4495311b47c38930d1ab4034f6 # Parent 972ad33c5610825338421a1129ba15d446e68d5c moved top-level tests to rhodecode/tests/other. - There are no toplevel tests - moved proper files to fixtures where they should be in the first place diff -r 972ad33c5610 -r a5746b83123f rhodecode/lib/utils.py --- a/rhodecode/lib/utils.py Wed May 08 17:43:59 2013 +0200 +++ b/rhodecode/lib/utils.py Wed May 08 18:06:22 2013 +0200 @@ -657,12 +657,12 @@ #CREATE DEFAULT TEST REPOS cur_dir = dn(dn(abspath(__file__))) - tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz")) + tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz")) tar.extractall(jn(TESTS_TMP_PATH, HG_REPO)) tar.close() cur_dir = dn(dn(abspath(__file__))) - tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz")) + tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz")) tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO)) tar.close() diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/fixtures/vcs_test_git.tar.gz Binary file rhodecode/tests/fixtures/vcs_test_git.tar.gz has changed diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/fixtures/vcs_test_hg.tar.gz Binary file rhodecode/tests/fixtures/vcs_test_hg.tar.gz has changed diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/other/__init__.py diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/other/test_libs.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/other/test_libs.py Wed May 08 18:06:22 2013 +0200 @@ -0,0 +1,295 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_libs + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + + Package for testing various lib/helper functions in rhodecode + + :created_on: Jun 9, 2011 + :copyright: (C) 2011-2012 Marcin Kuzminski + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from __future__ import with_statement +import unittest +import datetime +import hashlib +import mock +from rhodecode.tests import * + +proto = 'http' +TEST_URLS = [ + ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'], + '%s://127.0.0.1:8080' % proto), + ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'], + '%s://domain.org' % proto), + ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org', + '8080'], + '%s://domain.org:8080' % proto), +] + +proto = 'https' +TEST_URLS += [ + ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], + '%s://127.0.0.1' % proto), + ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'], + '%s://127.0.0.1:8080' % proto), + ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'], + '%s://domain.org' % proto), + ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org', + '8080'], + '%s://domain.org:8080' % proto), +] + + +class TestLibs(unittest.TestCase): + + @parameterized.expand(TEST_URLS) + def test_uri_filter(self, test_url, expected, expected_creds): + from rhodecode.lib.utils2 import uri_filter + self.assertEqual(uri_filter(test_url), expected) + + @parameterized.expand(TEST_URLS) + def test_credentials_filter(self, test_url, expected, expected_creds): + from rhodecode.lib.utils2 import credentials_filter + self.assertEqual(credentials_filter(test_url), expected_creds) + + @parameterized.expand([('t', True), + ('true', True), + ('y', True), + ('yes', True), + ('on', True), + ('1', True), + ('Y', True), + ('yeS', True), + ('Y', True), + ('TRUE', True), + ('T', True), + ('False', False), + ('F', False), + ('FALSE', False), + ('0', False), + ('-1', False), + ('', False) + ]) + def test_str2bool(self, str_bool, expected): + from rhodecode.lib.utils2 import str2bool + self.assertEqual(str2bool(str_bool), expected) + + def test_mention_extractor(self): + from rhodecode.lib.utils2 import extract_mentioned_users + sample = ( + "@first hi there @marcink here's my email marcin@email.com " + "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three " + "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl " + "@marian.user just do it @marco-polo and next extract @marco_polo " + "user.dot hej ! not-needed maril@domain.org" + ) + + s = sorted([ + 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john', + 'marian.user', 'marco-polo', 'marco_polo' + ], key=lambda k: k.lower()) + self.assertEqual(s, extract_mentioned_users(sample)) + + @parameterized.expand([ + (dict(), u'just now'), + (dict(seconds= -1), u'1 second ago'), + (dict(seconds= -60 * 2), u'2 minutes ago'), + (dict(hours= -1), u'1 hour ago'), + (dict(hours= -24), u'1 day ago'), + (dict(hours= -24 * 5), u'5 days ago'), + (dict(months= -1), u'1 month ago'), + (dict(months= -1, days= -2), u'1 month and 2 days ago'), + (dict(years= -1, months= -1), u'1 year and 1 month ago'), + ]) + def test_age(self, age_args, expected): + from rhodecode.lib.utils2 import age + from dateutil import relativedelta + n = datetime.datetime(year=2012, month=5, day=17) + delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) + self.assertEqual(age(n + delt(**age_args), now=n), expected) + + @parameterized.expand([ + + (dict(), u'just now'), + (dict(seconds=1), u'in 1 second'), + (dict(seconds=60 * 2), u'in 2 minutes'), + (dict(hours=1), u'in 1 hour'), + (dict(hours=24), u'in 1 day'), + (dict(hours=24 * 5), u'in 5 days'), + (dict(months=1), u'in 1 month'), + (dict(months=1, days=1), u'in 1 month and 1 day'), + (dict(years=1, months=1), u'in 1 year and 1 month') + ]) + def test_age_in_future(self, age_args, expected): + from rhodecode.lib.utils2 import age + from dateutil import relativedelta + n = datetime.datetime(year=2012, month=5, day=17) + delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) + self.assertEqual(age(n + delt(**age_args), now=n), expected) + + def test_tag_exctrator(self): + sample = ( + "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]" + "[requires] [stale] [see<>=>] [see => http://url.com]" + "[requires => url] [lang => python] [just a tag]" + "[,d] [ => ULR ] [obsolete] [desc]]" + ) + from rhodecode.lib.helpers import desc_stylize + res = desc_stylize(sample) + self.assertTrue('
tag
' in res) + self.assertTrue('
obsolete
' in res) + self.assertTrue('
stale
' in res) + self.assertTrue('
python
' in res) + self.assertTrue('
requires => url
' in res) + self.assertTrue('
tag
' in res) + + def test_alternative_gravatar(self): + from rhodecode.lib.helpers import gravatar_url + _md5 = lambda s: hashlib.md5(s).hexdigest() + + def fake_conf(**kwargs): + from pylons import config + config['app_conf'] = {} + config['app_conf']['use_gravatar'] = True + config['app_conf'].update(kwargs) + return config + + class fake_url(): + @classmethod + def current(cls, *args, **kwargs): + return 'https://server.com' + + with mock.patch('pylons.url', fake_url): + fake = fake_conf(alternative_gravatar_url='http://test.com/{email}') + with mock.patch('pylons.config', fake): + from pylons import url + assert url.current() == 'https://server.com' + grav = gravatar_url(email_address='test@foo.com', size=24) + assert grav == 'http://test.com/test@foo.com' + + fake = fake_conf(alternative_gravatar_url='http://test.com/{email}') + with mock.patch('pylons.config', fake): + grav = gravatar_url(email_address='test@foo.com', size=24) + assert grav == 'http://test.com/test@foo.com' + + fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}') + with mock.patch('pylons.config', fake): + em = 'test@foo.com' + grav = gravatar_url(email_address=em, size=24) + assert grav == 'http://test.com/%s' % (_md5(em)) + + fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}') + with mock.patch('pylons.config', fake): + em = 'test@foo.com' + grav = gravatar_url(email_address=em, size=24) + assert grav == 'http://test.com/%s/%s' % (_md5(em), 24) + + fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}') + with mock.patch('pylons.config', fake): + em = 'test@foo.com' + grav = gravatar_url(email_address=em, size=24) + assert grav == 'https://server.com/%s/%s' % (_md5(em), 24) + + def _quick_url(self, text, tmpl="""%s""", url_=None): + """ + Changes `some text url[foo]` => `some text foo + + :param text: + """ + import re + # quickly change expected url[] into a link + URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])') + + def url_func(match_obj): + _url = match_obj.groups()[0] + return tmpl % (url_ or '/some-url', _url) + return URL_PAT.sub(url_func, text) + + @parameterized.expand([ + ("", + ""), + ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68", + "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"), + ("from rev 000000000000", + "from rev url[000000000000]"), + ("from rev 000000000000123123 also rev 000000000000", + "from rev url[000000000000123123] also rev url[000000000000]"), + ("this should-000 00", + "this should-000 00"), + ("longtextffffffffff rev 123123123123", + "longtextffffffffff rev url[123123123123]"), + ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff", + "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"), + ("ffffffffffff some text traalaa", + "url[ffffffffffff] some text traalaa"), + ("""Multi line + 123123123123 + some text 123123123123 + sometimes ! + """, + """Multi line + url[123123123123] + some text url[123123123123] + sometimes ! + """) + ]) + def test_urlify_changesets(self, sample, expected): + def fake_url(self, *args, **kwargs): + return '/some-url' + + expected = self._quick_url(expected) + + with mock.patch('pylons.url', fake_url): + from rhodecode.lib.helpers import urlify_changesets + self.assertEqual(urlify_changesets(sample, 'repo_name'), expected) + + @parameterized.expand([ + ("", + "", + ""), + ("https://svn.apache.org/repos", + "url[https://svn.apache.org/repos]", + "https://svn.apache.org/repos"), + ("http://svn.apache.org/repos", + "url[http://svn.apache.org/repos]", + "http://svn.apache.org/repos"), + ("from rev a also rev http://google.com", + "from rev a also rev url[http://google.com]", + "http://google.com"), + ("""Multi line + https://foo.bar.com + some text lalala""", + """Multi line + url[https://foo.bar.com] + some text lalala""", + "https://foo.bar.com") + ]) + def test_urlify_test(self, sample, expected, url_): + from rhodecode.lib.helpers import urlify_text + expected = self._quick_url(expected, + tmpl="""%s""", url_=url_) + self.assertEqual(urlify_text(sample), expected) diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/other/test_validators.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/other/test_validators.py Wed May 08 18:06:22 2013 +0200 @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +import unittest +import formencode + +from rhodecode.tests import * + +from rhodecode.model import validators as v +from rhodecode.model.users_group import UserGroupModel + +from rhodecode.model.meta import Session +from rhodecode.model.repos_group import ReposGroupModel +from rhodecode.model.db import ChangesetStatus, Repository +from rhodecode.model.changeset_status import ChangesetStatusModel +from rhodecode.tests.fixture import Fixture + +fixture = Fixture() + + +class TestReposGroups(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + Session.remove() + + def test_Message_extractor(self): + validator = v.ValidUsername() + self.assertRaises(formencode.Invalid, validator.to_python, 'default') + + class StateObj(object): + pass + + self.assertRaises(formencode.Invalid, + validator.to_python, 'default', StateObj) + + def test_ValidUsername(self): + validator = v.ValidUsername() + + self.assertRaises(formencode.Invalid, validator.to_python, 'default') + self.assertRaises(formencode.Invalid, validator.to_python, 'new_user') + self.assertRaises(formencode.Invalid, validator.to_python, '.,') + self.assertRaises(formencode.Invalid, validator.to_python, + TEST_USER_ADMIN_LOGIN) + self.assertEqual('test', validator.to_python('test')) + + validator = v.ValidUsername(edit=True, old_data={'user_id': 1}) + + def test_ValidRepoUser(self): + validator = v.ValidRepoUser() + self.assertRaises(formencode.Invalid, validator.to_python, 'nouser') + self.assertEqual(TEST_USER_ADMIN_LOGIN, + validator.to_python(TEST_USER_ADMIN_LOGIN)) + + def test_ValidUserGroup(self): + validator = v.ValidUserGroup() + self.assertRaises(formencode.Invalid, validator.to_python, 'default') + self.assertRaises(formencode.Invalid, validator.to_python, '.,') + + gr = fixture.create_user_group('test') + gr2 = fixture.create_user_group('tes2') + Session().commit() + self.assertRaises(formencode.Invalid, validator.to_python, 'test') + assert gr.users_group_id != None + validator = v.ValidUserGroup(edit=True, + old_data={'users_group_id': + gr2.users_group_id}) + + self.assertRaises(formencode.Invalid, validator.to_python, 'test') + self.assertRaises(formencode.Invalid, validator.to_python, 'TesT') + self.assertRaises(formencode.Invalid, validator.to_python, 'TEST') + UserGroupModel().delete(gr) + UserGroupModel().delete(gr2) + Session().commit() + + def test_ValidReposGroup(self): + validator = v.ValidReposGroup() + model = ReposGroupModel() + self.assertRaises(formencode.Invalid, validator.to_python, + {'group_name': HG_REPO, }) + gr = model.create(group_name='test_gr', group_description='desc', + parent=None, + just_db=True, + owner=TEST_USER_ADMIN_LOGIN) + self.assertRaises(formencode.Invalid, + validator.to_python, {'group_name': gr.group_name, }) + + validator = v.ValidReposGroup(edit=True, + old_data={'group_id': gr.group_id}) + self.assertRaises(formencode.Invalid, + validator.to_python, { + 'group_name': gr.group_name + 'n', + 'group_parent_id': gr.group_id + }) + model.delete(gr) + + def test_ValidPassword(self): + validator = v.ValidPassword() + self.assertEqual('lol', validator.to_python('lol')) + self.assertEqual(None, validator.to_python(None)) + self.assertRaises(formencode.Invalid, validator.to_python, 'ąćżź') + + def test_ValidPasswordsMatch(self): + validator = v.ValidPasswordsMatch() + self.assertRaises(formencode.Invalid, + validator.to_python, {'password': 'pass', + 'password_confirmation': 'pass2'}) + + self.assertRaises(formencode.Invalid, + validator.to_python, {'new_password': 'pass', + 'password_confirmation': 'pass2'}) + + self.assertEqual({'new_password': 'pass', + 'password_confirmation': 'pass'}, + validator.to_python({'new_password': 'pass', + 'password_confirmation': 'pass'})) + + self.assertEqual({'password': 'pass', + 'password_confirmation': 'pass'}, + validator.to_python({'password': 'pass', + 'password_confirmation': 'pass'})) + + def test_ValidAuth(self): + validator = v.ValidAuth() + valid_creds = { + 'username': TEST_USER_REGULAR2_LOGIN, + 'password': TEST_USER_REGULAR2_PASS, + } + invalid_creds = { + 'username': 'err', + 'password': 'err', + } + self.assertEqual(valid_creds, validator.to_python(valid_creds)) + self.assertRaises(formencode.Invalid, + validator.to_python, invalid_creds) + + def test_ValidAuthToken(self): + validator = v.ValidAuthToken() + # this is untestable without a threadlocal +# self.assertRaises(formencode.Invalid, +# validator.to_python, 'BadToken') + validator + + def test_ValidRepoName(self): + validator = v.ValidRepoName() + + self.assertRaises(formencode.Invalid, + validator.to_python, {'repo_name': ''}) + + self.assertRaises(formencode.Invalid, + validator.to_python, {'repo_name': HG_REPO}) + + gr = ReposGroupModel().create(group_name='group_test', + group_description='desc', + parent=None, + owner=TEST_USER_ADMIN_LOGIN) + self.assertRaises(formencode.Invalid, + validator.to_python, {'repo_name': gr.group_name}) + + #TODO: write an error case for that ie. create a repo withinh a group +# self.assertRaises(formencode.Invalid, +# validator.to_python, {'repo_name': 'some', +# 'repo_group': gr.group_id}) + + def test_ValidForkName(self): + # this uses ValidRepoName validator + assert True + + @parameterized.expand([ + ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'), + ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'), + ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'), + ('/]re po', 're-po')]) + def test_SlugifyName(self, name, expected): + validator = v.SlugifyName() + self.assertEqual(expected, validator.to_python(name)) + + def test_ValidCloneUri(self): + #TODO: write this one + pass + + def test_ValidForkType(self): + validator = v.ValidForkType(old_data={'repo_type': 'hg'}) + self.assertEqual('hg', validator.to_python('hg')) + self.assertRaises(formencode.Invalid, validator.to_python, 'git') + + def test_ValidPerms(self): + #TODO: write this one + pass + + def test_ValidSettings(self): + validator = v.ValidSettings() + self.assertEqual({'pass': 'pass'}, + validator.to_python(value={'user': 'test', + 'pass': 'pass'})) + + self.assertEqual({'user2': 'test', 'pass': 'pass'}, + validator.to_python(value={'user2': 'test', + 'pass': 'pass'})) + + def test_ValidPath(self): + validator = v.ValidPath() + self.assertEqual(TESTS_TMP_PATH, + validator.to_python(TESTS_TMP_PATH)) + self.assertRaises(formencode.Invalid, validator.to_python, + '/no_such_dir') + + def test_UniqSystemEmail(self): + validator = v.UniqSystemEmail(old_data={}) + + self.assertEqual('mail@python.org', + validator.to_python('MaiL@Python.org')) + + email = TEST_USER_REGULAR2_EMAIL + self.assertRaises(formencode.Invalid, validator.to_python, email) + + def test_ValidSystemEmail(self): + validator = v.ValidSystemEmail() + email = TEST_USER_REGULAR2_EMAIL + + self.assertEqual(email, validator.to_python(email)) + self.assertRaises(formencode.Invalid, validator.to_python, 'err') + + def test_LdapLibValidator(self): + if ldap_lib_installed: + validator = v.LdapLibValidator() + self.assertEqual("DN", validator.to_python('DN')) + else: + validator = v.LdapLibValidator() + self.assertRaises(v.LdapImportError, validator.to_python, 'err') + + def test_AttrLoginValidator(self): + validator = v.AttrLoginValidator() + self.assertEqual('DN_attr', validator.to_python('DN_attr')) + + def test_NotReviewedRevisions(self): + repo_id = Repository.get_by_repo_name(HG_REPO).repo_id + validator = v.NotReviewedRevisions(repo_id) + rev = '0' * 40 + # add status for a rev, that should throw an error because it is already + # reviewed + new_status = ChangesetStatus() + new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN) + new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO) + new_status.status = ChangesetStatus.STATUS_APPROVED + new_status.comment = None + new_status.revision = rev + Session().add(new_status) + Session().commit() + try: + self.assertRaises(formencode.Invalid, validator.to_python, [rev]) + finally: + Session().delete(new_status) + Session().commit() diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/other/test_vcs_operations.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/other/test_vcs_operations.py Wed May 08 18:06:22 2013 +0200 @@ -0,0 +1,507 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_scm_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations. + Run using after doing paster serve test.ini:: + RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py + + You must have git > 1.8.1 for tests to work fine + + :created_on: Dec 30, 2010 + :author: marcink + :copyright: (C) 2010-2012 Marcin Kuzminski + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import tempfile +import unittest +import time +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from rhodecode.tests import * +from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\ + CacheInvalidation +from rhodecode.model.meta import Session +from rhodecode.model.repo import RepoModel +from rhodecode.model.user import UserModel + +DEBUG = True +HOST = '127.0.0.1:5000' # test host + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """ + Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + if DEBUG: + print '*** CMD %s ***' % command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def _get_tmp_dir(): + return tempfile.mkdtemp(prefix='rc_integration_test') + + +def _construct_url(repo, dest=None, **kwargs): + if dest is None: + #make temp clone + dest = _get_tmp_dir() + params = { + 'user': TEST_USER_ADMIN_LOGIN, + 'passwd': TEST_USER_ADMIN_PASS, + 'host': HOST, + 'cloned_repo': repo, + 'dest': dest + } + params.update(**kwargs) + if params['user'] and params['passwd']: + _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params + else: + _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params + return _url + + +def _add_files_and_push(vcs, DEST, **kwargs): + """ + Generate some files, add it to DEST repo and push back + vcs is git or hg and defines what VCS we want to make those files for + + :param vcs: + :param DEST: + """ + # commit some stuff into this repo + cwd = path = jn(DEST) + #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) + added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) + Command(cwd).execute('touch %s' % added_file) + Command(cwd).execute('%s add %s' % (vcs, added_file)) + + for i in xrange(kwargs.get('files_no', 3)): + cmd = """echo 'added_line%s' >> %s""" % (i, added_file) + Command(cwd).execute(cmd) + author_str = 'Marcin Kuźminski ' + if vcs == 'hg': + cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( + i, author_str, added_file + ) + elif vcs == 'git': + cmd = """git commit -m 'commited new %s' --author '%s' %s """ % ( + i, author_str, added_file + ) + Command(cwd).execute(cmd) + # PUSH it back + if vcs == 'hg': + _REPO = HG_REPO + elif vcs == 'git': + _REPO = GIT_REPO + + kwargs['dest'] = '' + clone_url = _construct_url(_REPO, **kwargs) + if 'clone_url' in kwargs: + clone_url = kwargs['clone_url'] + if vcs == 'hg': + stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) + elif vcs == 'git': + stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master") + + return stdout, stderr + + +def set_anonymous_access(enable=True): + user = User.get_by_username(User.DEFAULT_USER) + user.active = enable + Session().add(user) + Session().commit() + print '\tanonymous access is now:', enable + if enable != User.get_by_username(User.DEFAULT_USER).active: + raise Exception('Cannot set anonymous access') + + +#============================================================================== +# TESTS +#============================================================================== + +class TestVCSOperations(unittest.TestCase): + + @classmethod + def setup_class(cls): + #DISABLE ANONYMOUS ACCESS + set_anonymous_access(False) + + def setUp(self): + r = Repository.get_by_repo_name(GIT_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + r = Repository.get_by_repo_name(HG_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + def test_clone_hg_repo_by_admin(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' + + def test_clone_git_repo_by_admin(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + assert 'Cloning into' in stdout + assert stderr == '' + + def test_clone_wrong_credentials_hg(self): + clone_url = _construct_url(HG_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: authorization failed' in stderr + + def test_clone_wrong_credentials_git(self): + clone_url = _construct_url(GIT_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'fatal: Authentication failed' in stderr + + def test_clone_git_dir_as_hg(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_hg_repo_as_git(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found:' in stderr + + def test_clone_non_existing_path_hg(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_non_existing_path_git(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found:' in stderr + + def test_push_new_file_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST) + + assert 'pushing to' in stdout + assert 'Repository size' in stdout + assert 'Last revision is now' in stdout + + def test_push_new_file_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + # commit some stuff into this repo + stdout, stderr = _add_files_and_push('git', DEST) + + #WTF git stderr ?! + assert 'master -> master' in stderr + + def test_push_invalidates_cache_hg(self): + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==HG_REPO).one() + key.cache_active = True + Session().add(key) + Session().commit() + + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==HG_REPO).one() + self.assertEqual(key.cache_active, False) + + def test_push_invalidates_cache_git(self): + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==GIT_REPO).one() + key.cache_active = True + Session().add(key) + Session().commit() + + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + # commit some stuff into this repo + stdout, stderr = _add_files_and_push('git', DEST, files_no=1) + + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key + ==GIT_REPO).one() + self.assertEqual(key.cache_active, False) + + def test_push_wrong_credentials_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, user='bad', + passwd='name') + + assert 'abort: authorization failed' in stderr + + def test_push_wrong_credentials_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, user='bad', + passwd='name') + + assert 'fatal: Authentication failed' in stderr + + def test_push_back_to_wrong_url_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'HTTP Error 404: Not Found' in stderr + + def test_push_back_to_wrong_url_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'not found:' in stderr + + def test_clone_and_create_lock_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_and_create_lock_git(self): + # enable locking + r = Repository.get_by_repo_name(GIT_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_after_repo_was_locked_hg(self): + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + + def test_clone_after_repo_was_locked_git(self): + #lock repo + r = Repository.get_by_repo_name(GIT_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + msg = ("""The requested URL returned error: 423""") + assert msg in stderr + + def test_push_on_locked_repo_by_other_user_hg(self): + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + # let this user actually push ! + RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, + perm='repository.write') + Session().commit() + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + + #push fails repo is locked by other user ! + stdout, stderr = _add_files_and_push('hg', DEST, + user=TEST_USER_REGULAR_LOGIN, + passwd=TEST_USER_REGULAR_PASS) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + +#TODO: fix me ! somehow during tests hooks don't get called on GIT +# def test_push_on_locked_repo_by_other_user_git(self): +# #clone some temp +# DEST = _get_tmp_dir() +# clone_url = _construct_url(GIT_REPO, dest=DEST) +# stdout, stderr = Command('/tmp').execute('git clone', clone_url) +# +# #lock repo +# r = Repository.get_by_repo_name(GIT_REPO) +# # let this user actually push ! +# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, +# perm='repository.write') +# Session().commit() +# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) +# +# #push fails repo is locked by other user ! +# stdout, stderr = _add_files_and_push('git', DEST, +# user=TEST_USER_REGULAR_LOGIN, +# passwd=TEST_USER_REGULAR_PASS) +# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" +# % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) +# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw +# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ +# msg = "405 Method Not Allowed" +# assert msg in stderr + + def test_push_unlocks_repository_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check for lock repo after clone + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + #push is ok and repo is now unlocked + stdout, stderr = _add_files_and_push('hg', DEST) + assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout + #we need to cleanup the Session Here ! + Session.remove() + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked == [None, None] + +#TODO: fix me ! somehow during tests hooks don't get called on GIT +# def test_push_unlocks_repository_git(self): +# # enable locking +# r = Repository.get_by_repo_name(GIT_REPO) +# r.enable_locking = True +# Session().add(r) +# Session().commit() +# #clone some temp +# DEST = _get_tmp_dir() +# clone_url = _construct_url(GIT_REPO, dest=DEST) +# stdout, stderr = Command('/tmp').execute('git clone', clone_url) +# +# #check for lock repo after clone +# r = Repository.get_by_repo_name(GIT_REPO) +# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id +# +# #push is ok and repo is now unlocked +# stdout, stderr = _add_files_and_push('git', DEST) +# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout +# #we need to cleanup the Session Here ! +# Session.remove() +# r = Repository.get_by_repo_name(GIT_REPO) +# assert r.locked == [None, None] + + def test_ip_restriction_hg(self): + user_model = UserModel() + try: + user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') + Session().commit() + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: HTTP Error 403: Forbidden' in stderr + finally: + #release IP restrictions + for ip in UserIpMap.getAll(): + UserIpMap.delete(ip.ip_id) + Session().commit() + + time.sleep(2) + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' + + def test_ip_restriction_git(self): + user_model = UserModel() + try: + user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') + Session().commit() + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + msg = ("""The requested URL returned error: 403""") + assert msg in stderr + finally: + #release IP restrictions + for ip in UserIpMap.getAll(): + UserIpMap.delete(ip.ip_id) + Session().commit() + + time.sleep(2) + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + assert 'Cloning into' in stdout + assert stderr == '' diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/scripts/test_vcs_operations.py --- a/rhodecode/tests/scripts/test_vcs_operations.py Wed May 08 17:43:59 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,507 +0,0 @@ -# -*- coding: utf-8 -*- -""" - rhodecode.tests.test_scm_operations - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Test suite for making push/pull operations. - Run using after doing paster serve test.ini:: - RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py - - You must have git > 1.8.1 for tests to work fine - - :created_on: Dec 30, 2010 - :author: marcink - :copyright: (C) 2010-2012 Marcin Kuzminski - :license: GPLv3, see COPYING for more details. -""" -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import tempfile -import unittest -import time -from os.path import join as jn -from os.path import dirname as dn - -from tempfile import _RandomNameSequence -from subprocess import Popen, PIPE - -from rhodecode.tests import * -from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\ - CacheInvalidation -from rhodecode.model.meta import Session -from rhodecode.model.repo import RepoModel -from rhodecode.model.user import UserModel - -DEBUG = True -HOST = '127.0.0.1:5000' # test host - - -class Command(object): - - def __init__(self, cwd): - self.cwd = cwd - - def execute(self, cmd, *args): - """ - Runs command on the system with given ``args``. - """ - - command = cmd + ' ' + ' '.join(args) - if DEBUG: - print '*** CMD %s ***' % command - p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) - stdout, stderr = p.communicate() - if DEBUG: - print stdout, stderr - return stdout, stderr - - -def _get_tmp_dir(): - return tempfile.mkdtemp(prefix='rc_integration_test') - - -def _construct_url(repo, dest=None, **kwargs): - if dest is None: - #make temp clone - dest = _get_tmp_dir() - params = { - 'user': TEST_USER_ADMIN_LOGIN, - 'passwd': TEST_USER_ADMIN_PASS, - 'host': HOST, - 'cloned_repo': repo, - 'dest': dest - } - params.update(**kwargs) - if params['user'] and params['passwd']: - _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params - else: - _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params - return _url - - -def _add_files_and_push(vcs, DEST, **kwargs): - """ - Generate some files, add it to DEST repo and push back - vcs is git or hg and defines what VCS we want to make those files for - - :param vcs: - :param DEST: - """ - # commit some stuff into this repo - cwd = path = jn(DEST) - #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) - added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) - Command(cwd).execute('touch %s' % added_file) - Command(cwd).execute('%s add %s' % (vcs, added_file)) - - for i in xrange(kwargs.get('files_no', 3)): - cmd = """echo 'added_line%s' >> %s""" % (i, added_file) - Command(cwd).execute(cmd) - author_str = 'Marcin Kuźminski ' - if vcs == 'hg': - cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( - i, author_str, added_file - ) - elif vcs == 'git': - cmd = """git commit -m 'commited new %s' --author '%s' %s """ % ( - i, author_str, added_file - ) - Command(cwd).execute(cmd) - # PUSH it back - if vcs == 'hg': - _REPO = HG_REPO - elif vcs == 'git': - _REPO = GIT_REPO - - kwargs['dest'] = '' - clone_url = _construct_url(_REPO, **kwargs) - if 'clone_url' in kwargs: - clone_url = kwargs['clone_url'] - if vcs == 'hg': - stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) - elif vcs == 'git': - stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master") - - return stdout, stderr - - -def set_anonymous_access(enable=True): - user = User.get_by_username(User.DEFAULT_USER) - user.active = enable - Session().add(user) - Session().commit() - print '\tanonymous access is now:', enable - if enable != User.get_by_username(User.DEFAULT_USER).active: - raise Exception('Cannot set anonymous access') - - -#============================================================================== -# TESTS -#============================================================================== - -class TestVCSOperations(unittest.TestCase): - - @classmethod - def setup_class(cls): - #DISABLE ANONYMOUS ACCESS - set_anonymous_access(False) - - def setUp(self): - r = Repository.get_by_repo_name(GIT_REPO) - Repository.unlock(r) - r.enable_locking = False - Session().add(r) - Session().commit() - - r = Repository.get_by_repo_name(HG_REPO) - Repository.unlock(r) - r.enable_locking = False - Session().add(r) - Session().commit() - - def test_clone_hg_repo_by_admin(self): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - assert 'requesting all changes' in stdout - assert 'adding changesets' in stdout - assert 'adding manifests' in stdout - assert 'adding file changes' in stdout - - assert stderr == '' - - def test_clone_git_repo_by_admin(self): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - assert 'Cloning into' in stdout - assert stderr == '' - - def test_clone_wrong_credentials_hg(self): - clone_url = _construct_url(HG_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'abort: authorization failed' in stderr - - def test_clone_wrong_credentials_git(self): - clone_url = _construct_url(GIT_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'fatal: Authentication failed' in stderr - - def test_clone_git_dir_as_hg(self): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - def test_clone_hg_repo_as_git(self): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found:' in stderr - - def test_clone_non_existing_path_hg(self): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - def test_clone_non_existing_path_git(self): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found:' in stderr - - def test_push_new_file_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST) - - assert 'pushing to' in stdout - assert 'Repository size' in stdout - assert 'Last revision is now' in stdout - - def test_push_new_file_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST) - - #WTF git stderr ?! - assert 'master -> master' in stderr - - def test_push_invalidates_cache_hg(self): - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).one() - key.cache_active = True - Session().add(key) - Session().commit() - - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).one() - self.assertEqual(key.cache_active, False) - - def test_push_invalidates_cache_git(self): - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).one() - key.cache_active = True - Session().add(key) - Session().commit() - - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - # commit some stuff into this repo - stdout, stderr = _add_files_and_push('git', DEST, files_no=1) - - key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).one() - self.assertEqual(key.cache_active, False) - - def test_push_wrong_credentials_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, user='bad', - passwd='name') - - assert 'abort: authorization failed' in stderr - - def test_push_wrong_credentials_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - stdout, stderr = _add_files_and_push('git', DEST, user='bad', - passwd='name') - - assert 'fatal: Authentication failed' in stderr - - def test_push_back_to_wrong_url_hg(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - stdout, stderr = _add_files_and_push('hg', DEST, - clone_url='http://127.0.0.1:5000/tmp',) - - assert 'HTTP Error 404: Not Found' in stderr - - def test_push_back_to_wrong_url_git(self): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - stdout, stderr = _add_files_and_push('git', DEST, - clone_url='http://127.0.0.1:5000/tmp',) - - assert 'not found:' in stderr - - def test_clone_and_create_lock_hg(self): - # enable locking - r = Repository.get_by_repo_name(HG_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - # clone - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #check if lock was made - r = Repository.get_by_repo_name(HG_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - def test_clone_and_create_lock_git(self): - # enable locking - r = Repository.get_by_repo_name(GIT_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - # clone - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - #check if lock was made - r = Repository.get_by_repo_name(GIT_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - def test_clone_after_repo_was_locked_hg(self): - #lock repo - r = Repository.get_by_repo_name(HG_REPO) - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - #pull fails since repo is locked - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" - % (HG_REPO, TEST_USER_ADMIN_LOGIN)) - assert msg in stderr - - def test_clone_after_repo_was_locked_git(self): - #lock repo - r = Repository.get_by_repo_name(GIT_REPO) - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - #pull fails since repo is locked - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - msg = ("""The requested URL returned error: 423""") - assert msg in stderr - - def test_push_on_locked_repo_by_other_user_hg(self): - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #lock repo - r = Repository.get_by_repo_name(HG_REPO) - # let this user actually push ! - RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, - perm='repository.write') - Session().commit() - Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) - - #push fails repo is locked by other user ! - stdout, stderr = _add_files_and_push('hg', DEST, - user=TEST_USER_REGULAR_LOGIN, - passwd=TEST_USER_REGULAR_PASS) - msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" - % (HG_REPO, TEST_USER_ADMIN_LOGIN)) - assert msg in stderr - -#TODO: fix me ! somehow during tests hooks don't get called on GIT -# def test_push_on_locked_repo_by_other_user_git(self): -# #clone some temp -# DEST = _get_tmp_dir() -# clone_url = _construct_url(GIT_REPO, dest=DEST) -# stdout, stderr = Command('/tmp').execute('git clone', clone_url) -# -# #lock repo -# r = Repository.get_by_repo_name(GIT_REPO) -# # let this user actually push ! -# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, -# perm='repository.write') -# Session().commit() -# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) -# -# #push fails repo is locked by other user ! -# stdout, stderr = _add_files_and_push('git', DEST, -# user=TEST_USER_REGULAR_LOGIN, -# passwd=TEST_USER_REGULAR_PASS) -# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" -# % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) -# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw -# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ -# msg = "405 Method Not Allowed" -# assert msg in stderr - - def test_push_unlocks_repository_hg(self): - # enable locking - r = Repository.get_by_repo_name(HG_REPO) - r.enable_locking = True - Session().add(r) - Session().commit() - #clone some temp - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - #check for lock repo after clone - r = Repository.get_by_repo_name(HG_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id - - #push is ok and repo is now unlocked - stdout, stderr = _add_files_and_push('hg', DEST) - assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout - #we need to cleanup the Session Here ! - Session.remove() - r = Repository.get_by_repo_name(HG_REPO) - assert r.locked == [None, None] - -#TODO: fix me ! somehow during tests hooks don't get called on GIT -# def test_push_unlocks_repository_git(self): -# # enable locking -# r = Repository.get_by_repo_name(GIT_REPO) -# r.enable_locking = True -# Session().add(r) -# Session().commit() -# #clone some temp -# DEST = _get_tmp_dir() -# clone_url = _construct_url(GIT_REPO, dest=DEST) -# stdout, stderr = Command('/tmp').execute('git clone', clone_url) -# -# #check for lock repo after clone -# r = Repository.get_by_repo_name(GIT_REPO) -# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id -# -# #push is ok and repo is now unlocked -# stdout, stderr = _add_files_and_push('git', DEST) -# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout -# #we need to cleanup the Session Here ! -# Session.remove() -# r = Repository.get_by_repo_name(GIT_REPO) -# assert r.locked == [None, None] - - def test_ip_restriction_hg(self): - user_model = UserModel() - try: - user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') - Session().commit() - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'abort: HTTP Error 403: Forbidden' in stderr - finally: - #release IP restrictions - for ip in UserIpMap.getAll(): - UserIpMap.delete(ip.ip_id) - Session().commit() - - time.sleep(2) - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - assert 'requesting all changes' in stdout - assert 'adding changesets' in stdout - assert 'adding manifests' in stdout - assert 'adding file changes' in stdout - - assert stderr == '' - - def test_ip_restriction_git(self): - user_model = UserModel() - try: - user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') - Session().commit() - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - msg = ("""The requested URL returned error: 403""") - assert msg in stderr - finally: - #release IP restrictions - for ip in UserIpMap.getAll(): - UserIpMap.delete(ip.ip_id) - Session().commit() - - time.sleep(2) - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - assert 'Cloning into' in stdout - assert stderr == '' diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/test_libs.py --- a/rhodecode/tests/test_libs.py Wed May 08 17:43:59 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,295 +0,0 @@ -# -*- coding: utf-8 -*- -""" - rhodecode.tests.test_libs - ~~~~~~~~~~~~~~~~~~~~~~~~~ - - - Package for testing various lib/helper functions in rhodecode - - :created_on: Jun 9, 2011 - :copyright: (C) 2011-2012 Marcin Kuzminski - :license: GPLv3, see COPYING for more details. -""" -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -from __future__ import with_statement -import unittest -import datetime -import hashlib -import mock -from rhodecode.tests import * - -proto = 'http' -TEST_URLS = [ - ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'], - '%s://127.0.0.1:8080' % proto), - ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'], - '%s://domain.org' % proto), - ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org', - '8080'], - '%s://domain.org:8080' % proto), -] - -proto = 'https' -TEST_URLS += [ - ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], - '%s://127.0.0.1' % proto), - ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'], - '%s://127.0.0.1:8080' % proto), - ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'], - '%s://domain.org' % proto), - ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org', - '8080'], - '%s://domain.org:8080' % proto), -] - - -class TestLibs(unittest.TestCase): - - @parameterized.expand(TEST_URLS) - def test_uri_filter(self, test_url, expected, expected_creds): - from rhodecode.lib.utils2 import uri_filter - self.assertEqual(uri_filter(test_url), expected) - - @parameterized.expand(TEST_URLS) - def test_credentials_filter(self, test_url, expected, expected_creds): - from rhodecode.lib.utils2 import credentials_filter - self.assertEqual(credentials_filter(test_url), expected_creds) - - @parameterized.expand([('t', True), - ('true', True), - ('y', True), - ('yes', True), - ('on', True), - ('1', True), - ('Y', True), - ('yeS', True), - ('Y', True), - ('TRUE', True), - ('T', True), - ('False', False), - ('F', False), - ('FALSE', False), - ('0', False), - ('-1', False), - ('', False) - ]) - def test_str2bool(self, str_bool, expected): - from rhodecode.lib.utils2 import str2bool - self.assertEqual(str2bool(str_bool), expected) - - def test_mention_extractor(self): - from rhodecode.lib.utils2 import extract_mentioned_users - sample = ( - "@first hi there @marcink here's my email marcin@email.com " - "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three " - "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl " - "@marian.user just do it @marco-polo and next extract @marco_polo " - "user.dot hej ! not-needed maril@domain.org" - ) - - s = sorted([ - 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john', - 'marian.user', 'marco-polo', 'marco_polo' - ], key=lambda k: k.lower()) - self.assertEqual(s, extract_mentioned_users(sample)) - - @parameterized.expand([ - (dict(), u'just now'), - (dict(seconds= -1), u'1 second ago'), - (dict(seconds= -60 * 2), u'2 minutes ago'), - (dict(hours= -1), u'1 hour ago'), - (dict(hours= -24), u'1 day ago'), - (dict(hours= -24 * 5), u'5 days ago'), - (dict(months= -1), u'1 month ago'), - (dict(months= -1, days= -2), u'1 month and 2 days ago'), - (dict(years= -1, months= -1), u'1 year and 1 month ago'), - ]) - def test_age(self, age_args, expected): - from rhodecode.lib.utils2 import age - from dateutil import relativedelta - n = datetime.datetime(year=2012, month=5, day=17) - delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) - self.assertEqual(age(n + delt(**age_args), now=n), expected) - - @parameterized.expand([ - - (dict(), u'just now'), - (dict(seconds=1), u'in 1 second'), - (dict(seconds=60 * 2), u'in 2 minutes'), - (dict(hours=1), u'in 1 hour'), - (dict(hours=24), u'in 1 day'), - (dict(hours=24 * 5), u'in 5 days'), - (dict(months=1), u'in 1 month'), - (dict(months=1, days=1), u'in 1 month and 1 day'), - (dict(years=1, months=1), u'in 1 year and 1 month') - ]) - def test_age_in_future(self, age_args, expected): - from rhodecode.lib.utils2 import age - from dateutil import relativedelta - n = datetime.datetime(year=2012, month=5, day=17) - delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) - self.assertEqual(age(n + delt(**age_args), now=n), expected) - - def test_tag_exctrator(self): - sample = ( - "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]" - "[requires] [stale] [see<>=>] [see => http://url.com]" - "[requires => url] [lang => python] [just a tag]" - "[,d] [ => ULR ] [obsolete] [desc]]" - ) - from rhodecode.lib.helpers import desc_stylize - res = desc_stylize(sample) - self.assertTrue('
tag
' in res) - self.assertTrue('
obsolete
' in res) - self.assertTrue('
stale
' in res) - self.assertTrue('
python
' in res) - self.assertTrue('
requires => url
' in res) - self.assertTrue('
tag
' in res) - - def test_alternative_gravatar(self): - from rhodecode.lib.helpers import gravatar_url - _md5 = lambda s: hashlib.md5(s).hexdigest() - - def fake_conf(**kwargs): - from pylons import config - config['app_conf'] = {} - config['app_conf']['use_gravatar'] = True - config['app_conf'].update(kwargs) - return config - - class fake_url(): - @classmethod - def current(cls, *args, **kwargs): - return 'https://server.com' - - with mock.patch('pylons.url', fake_url): - fake = fake_conf(alternative_gravatar_url='http://test.com/{email}') - with mock.patch('pylons.config', fake): - from pylons import url - assert url.current() == 'https://server.com' - grav = gravatar_url(email_address='test@foo.com', size=24) - assert grav == 'http://test.com/test@foo.com' - - fake = fake_conf(alternative_gravatar_url='http://test.com/{email}') - with mock.patch('pylons.config', fake): - grav = gravatar_url(email_address='test@foo.com', size=24) - assert grav == 'http://test.com/test@foo.com' - - fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}') - with mock.patch('pylons.config', fake): - em = 'test@foo.com' - grav = gravatar_url(email_address=em, size=24) - assert grav == 'http://test.com/%s' % (_md5(em)) - - fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}') - with mock.patch('pylons.config', fake): - em = 'test@foo.com' - grav = gravatar_url(email_address=em, size=24) - assert grav == 'http://test.com/%s/%s' % (_md5(em), 24) - - fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}') - with mock.patch('pylons.config', fake): - em = 'test@foo.com' - grav = gravatar_url(email_address=em, size=24) - assert grav == 'https://server.com/%s/%s' % (_md5(em), 24) - - def _quick_url(self, text, tmpl="""%s""", url_=None): - """ - Changes `some text url[foo]` => `some text foo - - :param text: - """ - import re - # quickly change expected url[] into a link - URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])') - - def url_func(match_obj): - _url = match_obj.groups()[0] - return tmpl % (url_ or '/some-url', _url) - return URL_PAT.sub(url_func, text) - - @parameterized.expand([ - ("", - ""), - ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68", - "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"), - ("from rev 000000000000", - "from rev url[000000000000]"), - ("from rev 000000000000123123 also rev 000000000000", - "from rev url[000000000000123123] also rev url[000000000000]"), - ("this should-000 00", - "this should-000 00"), - ("longtextffffffffff rev 123123123123", - "longtextffffffffff rev url[123123123123]"), - ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff", - "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"), - ("ffffffffffff some text traalaa", - "url[ffffffffffff] some text traalaa"), - ("""Multi line - 123123123123 - some text 123123123123 - sometimes ! - """, - """Multi line - url[123123123123] - some text url[123123123123] - sometimes ! - """) - ]) - def test_urlify_changesets(self, sample, expected): - def fake_url(self, *args, **kwargs): - return '/some-url' - - expected = self._quick_url(expected) - - with mock.patch('pylons.url', fake_url): - from rhodecode.lib.helpers import urlify_changesets - self.assertEqual(urlify_changesets(sample, 'repo_name'), expected) - - @parameterized.expand([ - ("", - "", - ""), - ("https://svn.apache.org/repos", - "url[https://svn.apache.org/repos]", - "https://svn.apache.org/repos"), - ("http://svn.apache.org/repos", - "url[http://svn.apache.org/repos]", - "http://svn.apache.org/repos"), - ("from rev a also rev http://google.com", - "from rev a also rev url[http://google.com]", - "http://google.com"), - ("""Multi line - https://foo.bar.com - some text lalala""", - """Multi line - url[https://foo.bar.com] - some text lalala""", - "https://foo.bar.com") - ]) - def test_urlify_test(self, sample, expected, url_): - from rhodecode.lib.helpers import urlify_text - expected = self._quick_url(expected, - tmpl="""%s""", url_=url_) - self.assertEqual(urlify_text(sample), expected) diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/test_validators.py --- a/rhodecode/tests/test_validators.py Wed May 08 17:43:59 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,255 +0,0 @@ -# -*- coding: utf-8 -*- -import unittest -import formencode - -from rhodecode.tests import * - -from rhodecode.model import validators as v -from rhodecode.model.users_group import UserGroupModel - -from rhodecode.model.meta import Session -from rhodecode.model.repos_group import ReposGroupModel -from rhodecode.config.routing import ADMIN_PREFIX -from rhodecode.model.db import ChangesetStatus, Repository -from rhodecode.model.changeset_status import ChangesetStatusModel -from rhodecode.tests.fixture import Fixture - -fixture = Fixture() - - -class TestReposGroups(unittest.TestCase): - - def setUp(self): - pass - - def tearDown(self): - Session.remove() - - def test_Message_extractor(self): - validator = v.ValidUsername() - self.assertRaises(formencode.Invalid, validator.to_python, 'default') - - class StateObj(object): - pass - - self.assertRaises(formencode.Invalid, - validator.to_python, 'default', StateObj) - - def test_ValidUsername(self): - validator = v.ValidUsername() - - self.assertRaises(formencode.Invalid, validator.to_python, 'default') - self.assertRaises(formencode.Invalid, validator.to_python, 'new_user') - self.assertRaises(formencode.Invalid, validator.to_python, '.,') - self.assertRaises(formencode.Invalid, validator.to_python, - TEST_USER_ADMIN_LOGIN) - self.assertEqual('test', validator.to_python('test')) - - validator = v.ValidUsername(edit=True, old_data={'user_id': 1}) - - def test_ValidRepoUser(self): - validator = v.ValidRepoUser() - self.assertRaises(formencode.Invalid, validator.to_python, 'nouser') - self.assertEqual(TEST_USER_ADMIN_LOGIN, - validator.to_python(TEST_USER_ADMIN_LOGIN)) - - def test_ValidUserGroup(self): - validator = v.ValidUserGroup() - self.assertRaises(formencode.Invalid, validator.to_python, 'default') - self.assertRaises(formencode.Invalid, validator.to_python, '.,') - - gr = fixture.create_user_group('test') - gr2 = fixture.create_user_group('tes2') - Session().commit() - self.assertRaises(formencode.Invalid, validator.to_python, 'test') - assert gr.users_group_id != None - validator = v.ValidUserGroup(edit=True, - old_data={'users_group_id': - gr2.users_group_id}) - - self.assertRaises(formencode.Invalid, validator.to_python, 'test') - self.assertRaises(formencode.Invalid, validator.to_python, 'TesT') - self.assertRaises(formencode.Invalid, validator.to_python, 'TEST') - UserGroupModel().delete(gr) - UserGroupModel().delete(gr2) - Session().commit() - - def test_ValidReposGroup(self): - validator = v.ValidReposGroup() - model = ReposGroupModel() - self.assertRaises(formencode.Invalid, validator.to_python, - {'group_name': HG_REPO, }) - gr = model.create(group_name='test_gr', group_description='desc', - parent=None, - just_db=True, - owner=TEST_USER_ADMIN_LOGIN) - self.assertRaises(formencode.Invalid, - validator.to_python, {'group_name': gr.group_name, }) - - validator = v.ValidReposGroup(edit=True, - old_data={'group_id': gr.group_id}) - self.assertRaises(formencode.Invalid, - validator.to_python, { - 'group_name': gr.group_name + 'n', - 'group_parent_id': gr.group_id - }) - model.delete(gr) - - def test_ValidPassword(self): - validator = v.ValidPassword() - self.assertEqual('lol', validator.to_python('lol')) - self.assertEqual(None, validator.to_python(None)) - self.assertRaises(formencode.Invalid, validator.to_python, 'ąćżź') - - def test_ValidPasswordsMatch(self): - validator = v.ValidPasswordsMatch() - self.assertRaises(formencode.Invalid, - validator.to_python, {'password': 'pass', - 'password_confirmation': 'pass2'}) - - self.assertRaises(formencode.Invalid, - validator.to_python, {'new_password': 'pass', - 'password_confirmation': 'pass2'}) - - self.assertEqual({'new_password': 'pass', - 'password_confirmation': 'pass'}, - validator.to_python({'new_password': 'pass', - 'password_confirmation': 'pass'})) - - self.assertEqual({'password': 'pass', - 'password_confirmation': 'pass'}, - validator.to_python({'password': 'pass', - 'password_confirmation': 'pass'})) - - def test_ValidAuth(self): - validator = v.ValidAuth() - valid_creds = { - 'username': TEST_USER_REGULAR2_LOGIN, - 'password': TEST_USER_REGULAR2_PASS, - } - invalid_creds = { - 'username': 'err', - 'password': 'err', - } - self.assertEqual(valid_creds, validator.to_python(valid_creds)) - self.assertRaises(formencode.Invalid, - validator.to_python, invalid_creds) - - def test_ValidAuthToken(self): - validator = v.ValidAuthToken() - # this is untestable without a threadlocal -# self.assertRaises(formencode.Invalid, -# validator.to_python, 'BadToken') - validator - - def test_ValidRepoName(self): - validator = v.ValidRepoName() - - self.assertRaises(formencode.Invalid, - validator.to_python, {'repo_name': ''}) - - self.assertRaises(formencode.Invalid, - validator.to_python, {'repo_name': HG_REPO}) - - gr = ReposGroupModel().create(group_name='group_test', - group_description='desc', - parent=None, - owner=TEST_USER_ADMIN_LOGIN) - self.assertRaises(formencode.Invalid, - validator.to_python, {'repo_name': gr.group_name}) - - #TODO: write an error case for that ie. create a repo withinh a group -# self.assertRaises(formencode.Invalid, -# validator.to_python, {'repo_name': 'some', -# 'repo_group': gr.group_id}) - - def test_ValidForkName(self): - # this uses ValidRepoName validator - assert True - - @parameterized.expand([ - ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'), - ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'), - ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'), - ('/]re po', 're-po')]) - def test_SlugifyName(self, name, expected): - validator = v.SlugifyName() - self.assertEqual(expected, validator.to_python(name)) - - def test_ValidCloneUri(self): - #TODO: write this one - pass - - def test_ValidForkType(self): - validator = v.ValidForkType(old_data={'repo_type': 'hg'}) - self.assertEqual('hg', validator.to_python('hg')) - self.assertRaises(formencode.Invalid, validator.to_python, 'git') - - def test_ValidPerms(self): - #TODO: write this one - pass - - def test_ValidSettings(self): - validator = v.ValidSettings() - self.assertEqual({'pass': 'pass'}, - validator.to_python(value={'user': 'test', - 'pass': 'pass'})) - - self.assertEqual({'user2': 'test', 'pass': 'pass'}, - validator.to_python(value={'user2': 'test', - 'pass': 'pass'})) - - def test_ValidPath(self): - validator = v.ValidPath() - self.assertEqual(TESTS_TMP_PATH, - validator.to_python(TESTS_TMP_PATH)) - self.assertRaises(formencode.Invalid, validator.to_python, - '/no_such_dir') - - def test_UniqSystemEmail(self): - validator = v.UniqSystemEmail(old_data={}) - - self.assertEqual('mail@python.org', - validator.to_python('MaiL@Python.org')) - - email = TEST_USER_REGULAR2_EMAIL - self.assertRaises(formencode.Invalid, validator.to_python, email) - - def test_ValidSystemEmail(self): - validator = v.ValidSystemEmail() - email = TEST_USER_REGULAR2_EMAIL - - self.assertEqual(email, validator.to_python(email)) - self.assertRaises(formencode.Invalid, validator.to_python, 'err') - - def test_LdapLibValidator(self): - if ldap_lib_installed: - validator = v.LdapLibValidator() - self.assertEqual("DN", validator.to_python('DN')) - else: - validator = v.LdapLibValidator() - self.assertRaises(v.LdapImportError, validator.to_python, 'err') - - def test_AttrLoginValidator(self): - validator = v.AttrLoginValidator() - self.assertEqual('DN_attr', validator.to_python('DN_attr')) - - def test_NotReviewedRevisions(self): - repo_id = Repository.get_by_repo_name(HG_REPO).repo_id - validator = v.NotReviewedRevisions(repo_id) - rev = '0' * 40 - # add status for a rev, that should throw an error because it is already - # reviewed - new_status = ChangesetStatus() - new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN) - new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO) - new_status.status = ChangesetStatus.STATUS_APPROVED - new_status.comment = None - new_status.revision = rev - Session().add(new_status) - Session().commit() - try: - self.assertRaises(formencode.Invalid, validator.to_python, [rev]) - finally: - Session().delete(new_status) - Session().commit() diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/vcs_test_git.tar.gz Binary file rhodecode/tests/vcs_test_git.tar.gz has changed diff -r 972ad33c5610 -r a5746b83123f rhodecode/tests/vcs_test_hg.tar.gz Binary file rhodecode/tests/vcs_test_hg.tar.gz has changed