Mercurial > kallithea
changeset 5145:7187c1707eda
tests: restrict pytest test collection to kallithea/tests
When the kallithea root directory contains a populated virtualenv, pytest
would also collect tests in python packages installed there.
Restrict the tests to be considered to any test_*.py file inside
kallithea/tests.
Additionally, by renaming unwanted test files in kallithea/tests/scripts to
_not_ match this pattern, we can completely get rid of the 'norecursedirs'
option.
author | Thomas De Schampheleire <thomas.de.schampheleire@gmail.com> |
---|---|
date | Thu, 14 May 2015 21:42:26 +0200 |
parents | 7a5db341a942 |
children | 08ad393e6866 |
files | kallithea/tests/scripts/manual_test_concurrency.py kallithea/tests/scripts/manual_test_crawler.py kallithea/tests/scripts/test_concurency.py kallithea/tests/scripts/test_crawler.py setup.cfg |
diffstat | 5 files changed, 413 insertions(+), 412 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kallithea/tests/scripts/manual_test_concurrency.py Thu May 14 21:42:26 2015 +0200 @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +kallithea.tests.test_hg_operations +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Test suite for making push/pull operations + +This file was forked by the Kallithea project in July 2014. +Original author and date, and relevant copyright and licensing information is below: +:created_on: Dec 30, 2010 +:author: marcink +:copyright: (c) 2013 RhodeCode GmbH, and others. +:license: GPLv3, see LICENSE.md for more details. + +""" + +import os +import sys +import shutil +import logging +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from paste.deploy import appconfig +from sqlalchemy import engine_from_config + +from kallithea.lib.utils import add_cache +from kallithea.model import init_model +from kallithea.model import meta +from kallithea.model.db import User, Repository +from kallithea.lib.auth import get_crypt_password + +from kallithea.tests import TESTS_TMP_PATH, HG_REPO +from kallithea.config.environment import load_environment + +rel_path = dn(dn(dn(dn(os.path.abspath(__file__))))) +conf = appconfig('config:development.ini', relative_to=rel_path) +load_environment(conf.global_conf, conf.local_conf) + +add_cache(conf) + +USER = 'test_admin' +PASS = 'test12' +HOST = 'server.local' +METHOD = 'pull' +DEBUG = True +log = logging.getLogger(__name__) + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + log.debug('Executing %s' % command) + if DEBUG: + print command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def get_session(): + engine = engine_from_config(conf, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session + return sa + + +def create_test_user(force=True): + print 'creating test user' + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + + if force and user is not None: + print 'removing current user' + for repo in sa.query(Repository).filter(Repository.user == user).all(): + sa.delete(repo) + sa.delete(user) + sa.commit() + + if user is None or force: + print 'creating new one' + new_usr = User() + new_usr.username = USER + new_usr.password = get_crypt_password(PASS) + new_usr.email = 'mail@mail.com' + new_usr.name = 'test' + new_usr.lastname = 'lasttestname' + new_usr.active = True + new_usr.admin = True + sa.add(new_usr) + sa.commit() + + print 'done' + + +def create_test_repo(force=True): + print 'creating test repo' + from kallithea.model.repo import RepoModel + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + if user is None: + raise Exception('user not found') + + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print 'repo not found creating' + + form_data = {'repo_name': HG_REPO, + 'repo_type': 'hg', + 'private':False, + 'clone_uri': '' } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) + + print 'done' + + +def set_anonymous_access(enable=True): + sa = get_session() + user = sa.query(User).filter(User.username == 'default').one() + user.active = enable + sa.add(user) + sa.commit() + + +def get_anonymous_access(): + sa = get_session() + return sa.query(User).filter(User.username == 'default').one().active + + +#============================================================================== +# TESTS +#============================================================================== +def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, + seq=None, backend='hg'): + cwd = path = jn(TESTS_TMP_PATH, repo) + + if seq is None: + seq = _RandomNameSequence().next() + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ + {'user': USER, + 'pass': PASS, + 'host': HOST, + 'cloned_repo': repo, } + + dest = path + seq + if method == 'pull': + stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url) + else: + stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest) + print stdout,'sdasdsadsa' + if not no_errors: + if backend == 'hg': + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + elif backend == 'git': + assert """Cloning into""" in stdout, 'no messages about cloning' + +if __name__ == '__main__': + try: + create_test_user(force=False) + seq = None + import time + + try: + METHOD = sys.argv[3] + except IndexError: + pass + + try: + backend = sys.argv[4] + except IndexError: + backend = 'hg' + + if METHOD == 'pull': + seq = _RandomNameSequence().next() + test_clone_with_credentials(repo=sys.argv[1], method='clone', + seq=seq, backend=backend) + s = time.time() + for i in range(1, int(sys.argv[2]) + 1): + print 'take', i + test_clone_with_credentials(repo=sys.argv[1], method=METHOD, + seq=seq, backend=backend) + print 'time taken %.3f' % (time.time() - s) + except Exception, e: + sys.exit('stop on %s' % e)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kallithea/tests/scripts/manual_test_crawler.py Thu May 14 21:42:26 2015 +0200 @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +kallithea.tests.test_crawer +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Test for crawling a project for memory usage +This should be runned just as regular script together +with a watch script that will show memory usage. + +watch -n1 ./kallithea/tests/mem_watch + +This file was forked by the Kallithea project in July 2014. +Original author and date, and relevant copyright and licensing information is below: +:created_on: Apr 21, 2010 +:author: marcink +:copyright: (c) 2013 RhodeCode GmbH, and others. +:license: GPLv3, see LICENSE.md for more details. +""" + + +import cookielib +import urllib +import urllib2 +import time +import os +import sys +from os.path import join as jn +from os.path import dirname as dn + +__here__ = os.path.abspath(__file__) +__root__ = dn(dn(dn(__here__))) +sys.path.append(__root__) + +from kallithea.lib import vcs +from kallithea.lib.compat import OrderedSet +from kallithea.lib.vcs.exceptions import RepositoryError + +PASES = 3 +HOST = 'http://127.0.0.1' +PORT = 5000 +BASE_URI = '%s:%s/' % (HOST, PORT) + +if len(sys.argv) == 2: + BASE_URI = sys.argv[1] + +if not BASE_URI.endswith('/'): + BASE_URI += '/' + +print 'Crawling @ %s' % BASE_URI +BASE_URI += '%s' +PROJECT_PATH = jn('/', 'home', 'username', 'repos') +PROJECTS = [ + #'linux-magx-pbranch', + 'CPython', + 'kallithea', +] + + +cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt') +o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) +o.addheaders = [ + ('User-agent', 'kallithea-crawler'), + ('Accept-Language', 'en - us, en;q = 0.5') +] + +urllib2.install_opener(o) + + +def _get_repo(proj): + if isinstance(proj, basestring): + repo = vcs.get_repo(jn(PROJECT_PATH, proj)) + proj = proj + else: + repo = proj + proj = repo.name + + return repo, proj + + +def test_changelog_walk(proj, pages=100): + repo, proj = _get_repo(proj) + + total_time = 0 + for i in range(1, pages): + + page = '/'.join((proj, 'changelog',)) + + full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page': i}) + s = time.time() + f = o.open(full_uri) + + assert f.url == full_uri, 'URL:%s does not match %s' % (f.url, full_uri) + + size = len(f.read()) + e = time.time() - s + total_time += e + print 'visited %s size:%s req:%s ms' % (full_uri, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(pages) + + +def test_changeset_walk(proj, limit=None): + repo, proj = _get_repo(proj) + + print 'processing', jn(PROJECT_PATH, proj) + total_time = 0 + + cnt = 0 + for i in repo: + cnt += 1 + raw_cs = '/'.join((proj, 'changeset', i.raw_id)) + if limit and limit == cnt: + break + + full_uri = (BASE_URI % raw_cs) + print '%s visiting %s\%s' % (cnt, full_uri, i) + s = time.time() + f = o.open(full_uri) + size = len(f.read()) + e = time.time() - s + total_time += e + print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(cnt) + + +def test_files_walk(proj, limit=100): + repo, proj = _get_repo(proj) + + print 'processing', jn(PROJECT_PATH, proj) + total_time = 0 + + paths_ = OrderedSet(['']) + try: + tip = repo.get_changeset('tip') + for topnode, dirs, files in tip.walk('/'): + + for dir in dirs: + paths_.add(dir.path) + for f in dir: + paths_.add(f.path) + + for f in files: + paths_.add(f.path) + + except RepositoryError, e: + pass + + cnt = 0 + for f in paths_: + cnt += 1 + if limit and limit == cnt: + break + + file_path = '/'.join((proj, 'files', 'tip', f)) + full_uri = (BASE_URI % file_path) + print '%s visiting %s' % (cnt, full_uri) + s = time.time() + f = o.open(full_uri) + size = len(f.read()) + e = time.time() - s + total_time += e + print '%s visited OK size:%s req:%s ms' % (cnt, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(cnt) + +if __name__ == '__main__': + for path in PROJECTS: + repo = vcs.get_repo(jn(PROJECT_PATH, path)) + for i in range(PASES): + print 'PASS %s/%s' % (i, PASES) + test_changelog_walk(repo, pages=80) + test_changeset_walk(repo, limit=100) + test_files_walk(repo, limit=100)
--- a/kallithea/tests/scripts/test_concurency.py Tue May 12 23:32:41 2015 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,222 +0,0 @@ -# -*- coding: utf-8 -*- -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -kallithea.tests.test_hg_operations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Test suite for making push/pull operations - -This file was forked by the Kallithea project in July 2014. -Original author and date, and relevant copyright and licensing information is below: -:created_on: Dec 30, 2010 -:author: marcink -:copyright: (c) 2013 RhodeCode GmbH, and others. -:license: GPLv3, see LICENSE.md for more details. - -""" - -import os -import sys -import shutil -import logging -from os.path import join as jn -from os.path import dirname as dn - -from tempfile import _RandomNameSequence -from subprocess import Popen, PIPE - -from paste.deploy import appconfig -from sqlalchemy import engine_from_config - -from kallithea.lib.utils import add_cache -from kallithea.model import init_model -from kallithea.model import meta -from kallithea.model.db import User, Repository -from kallithea.lib.auth import get_crypt_password - -from kallithea.tests import TESTS_TMP_PATH, HG_REPO -from kallithea.config.environment import load_environment - -rel_path = dn(dn(dn(dn(os.path.abspath(__file__))))) -conf = appconfig('config:development.ini', relative_to=rel_path) -load_environment(conf.global_conf, conf.local_conf) - -add_cache(conf) - -USER = 'test_admin' -PASS = 'test12' -HOST = 'server.local' -METHOD = 'pull' -DEBUG = True -log = logging.getLogger(__name__) - - -class Command(object): - - def __init__(self, cwd): - self.cwd = cwd - - def execute(self, cmd, *args): - """Runs command on the system with given ``args``. - """ - - command = cmd + ' ' + ' '.join(args) - log.debug('Executing %s' % command) - if DEBUG: - print command - p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) - stdout, stderr = p.communicate() - if DEBUG: - print stdout, stderr - return stdout, stderr - - -def get_session(): - engine = engine_from_config(conf, 'sqlalchemy.db1.') - init_model(engine) - sa = meta.Session - return sa - - -def create_test_user(force=True): - print 'creating test user' - sa = get_session() - - user = sa.query(User).filter(User.username == USER).scalar() - - if force and user is not None: - print 'removing current user' - for repo in sa.query(Repository).filter(Repository.user == user).all(): - sa.delete(repo) - sa.delete(user) - sa.commit() - - if user is None or force: - print 'creating new one' - new_usr = User() - new_usr.username = USER - new_usr.password = get_crypt_password(PASS) - new_usr.email = 'mail@mail.com' - new_usr.name = 'test' - new_usr.lastname = 'lasttestname' - new_usr.active = True - new_usr.admin = True - sa.add(new_usr) - sa.commit() - - print 'done' - - -def create_test_repo(force=True): - print 'creating test repo' - from kallithea.model.repo import RepoModel - sa = get_session() - - user = sa.query(User).filter(User.username == USER).scalar() - if user is None: - raise Exception('user not found') - - repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() - - if repo is None: - print 'repo not found creating' - - form_data = {'repo_name': HG_REPO, - 'repo_type': 'hg', - 'private':False, - 'clone_uri': '' } - rm = RepoModel(sa) - rm.base_path = '/home/hg' - rm.create(form_data, user) - - print 'done' - - -def set_anonymous_access(enable=True): - sa = get_session() - user = sa.query(User).filter(User.username == 'default').one() - user.active = enable - sa.add(user) - sa.commit() - - -def get_anonymous_access(): - sa = get_session() - return sa.query(User).filter(User.username == 'default').one().active - - -#============================================================================== -# TESTS -#============================================================================== -def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, - seq=None, backend='hg'): - cwd = path = jn(TESTS_TMP_PATH, repo) - - if seq is None: - seq = _RandomNameSequence().next() - - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - #print 'made dirs %s' % jn(path) - except OSError: - raise - - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ - {'user': USER, - 'pass': PASS, - 'host': HOST, - 'cloned_repo': repo, } - - dest = path + seq - if method == 'pull': - stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url) - else: - stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest) - print stdout,'sdasdsadsa' - if not no_errors: - if backend == 'hg': - assert """adding file changes""" in stdout, 'no messages about cloning' - assert """abort""" not in stderr , 'got error from clone' - elif backend == 'git': - assert """Cloning into""" in stdout, 'no messages about cloning' - -if __name__ == '__main__': - try: - create_test_user(force=False) - seq = None - import time - - try: - METHOD = sys.argv[3] - except IndexError: - pass - - try: - backend = sys.argv[4] - except IndexError: - backend = 'hg' - - if METHOD == 'pull': - seq = _RandomNameSequence().next() - test_clone_with_credentials(repo=sys.argv[1], method='clone', - seq=seq, backend=backend) - s = time.time() - for i in range(1, int(sys.argv[2]) + 1): - print 'take', i - test_clone_with_credentials(repo=sys.argv[1], method=METHOD, - seq=seq, backend=backend) - print 'time taken %.3f' % (time.time() - s) - except Exception, e: - sys.exit('stop on %s' % e)
--- a/kallithea/tests/scripts/test_crawler.py Tue May 12 23:32:41 2015 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,189 +0,0 @@ -# -*- coding: utf-8 -*- -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -kallithea.tests.test_crawer -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Test for crawling a project for memory usage -This should be runned just as regular script together -with a watch script that will show memory usage. - -watch -n1 ./kallithea/tests/mem_watch - -This file was forked by the Kallithea project in July 2014. -Original author and date, and relevant copyright and licensing information is below: -:created_on: Apr 21, 2010 -:author: marcink -:copyright: (c) 2013 RhodeCode GmbH, and others. -:license: GPLv3, see LICENSE.md for more details. -""" - - -import cookielib -import urllib -import urllib2 -import time -import os -import sys -from os.path import join as jn -from os.path import dirname as dn - -__here__ = os.path.abspath(__file__) -__root__ = dn(dn(dn(__here__))) -sys.path.append(__root__) - -from kallithea.lib import vcs -from kallithea.lib.compat import OrderedSet -from kallithea.lib.vcs.exceptions import RepositoryError - -PASES = 3 -HOST = 'http://127.0.0.1' -PORT = 5000 -BASE_URI = '%s:%s/' % (HOST, PORT) - -if len(sys.argv) == 2: - BASE_URI = sys.argv[1] - -if not BASE_URI.endswith('/'): - BASE_URI += '/' - -print 'Crawling @ %s' % BASE_URI -BASE_URI += '%s' -PROJECT_PATH = jn('/', 'home', 'username', 'repos') -PROJECTS = [ - #'linux-magx-pbranch', - 'CPython', - 'kallithea', -] - - -cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt') -o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) -o.addheaders = [ - ('User-agent', 'kallithea-crawler'), - ('Accept-Language', 'en - us, en;q = 0.5') -] - -urllib2.install_opener(o) - - -def _get_repo(proj): - if isinstance(proj, basestring): - repo = vcs.get_repo(jn(PROJECT_PATH, proj)) - proj = proj - else: - repo = proj - proj = repo.name - - return repo, proj - - -def test_changelog_walk(proj, pages=100): - repo, proj = _get_repo(proj) - - total_time = 0 - for i in range(1, pages): - - page = '/'.join((proj, 'changelog',)) - - full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page': i}) - s = time.time() - f = o.open(full_uri) - - assert f.url == full_uri, 'URL:%s does not match %s' % (f.url, full_uri) - - size = len(f.read()) - e = time.time() - s - total_time += e - print 'visited %s size:%s req:%s ms' % (full_uri, size, e) - - print 'total_time', total_time - print 'average on req', total_time / float(pages) - - -def test_changeset_walk(proj, limit=None): - repo, proj = _get_repo(proj) - - print 'processing', jn(PROJECT_PATH, proj) - total_time = 0 - - cnt = 0 - for i in repo: - cnt += 1 - raw_cs = '/'.join((proj, 'changeset', i.raw_id)) - if limit and limit == cnt: - break - - full_uri = (BASE_URI % raw_cs) - print '%s visiting %s\%s' % (cnt, full_uri, i) - s = time.time() - f = o.open(full_uri) - size = len(f.read()) - e = time.time() - s - total_time += e - print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e) - - print 'total_time', total_time - print 'average on req', total_time / float(cnt) - - -def test_files_walk(proj, limit=100): - repo, proj = _get_repo(proj) - - print 'processing', jn(PROJECT_PATH, proj) - total_time = 0 - - paths_ = OrderedSet(['']) - try: - tip = repo.get_changeset('tip') - for topnode, dirs, files in tip.walk('/'): - - for dir in dirs: - paths_.add(dir.path) - for f in dir: - paths_.add(f.path) - - for f in files: - paths_.add(f.path) - - except RepositoryError, e: - pass - - cnt = 0 - for f in paths_: - cnt += 1 - if limit and limit == cnt: - break - - file_path = '/'.join((proj, 'files', 'tip', f)) - full_uri = (BASE_URI % file_path) - print '%s visiting %s' % (cnt, full_uri) - s = time.time() - f = o.open(full_uri) - size = len(f.read()) - e = time.time() - s - total_time += e - print '%s visited OK size:%s req:%s ms' % (cnt, size, e) - - print 'total_time', total_time - print 'average on req', total_time / float(cnt) - -if __name__ == '__main__': - for path in PROJECTS: - repo = vcs.get_repo(jn(PROJECT_PATH, path)) - for i in range(PASES): - print 'PASS %s/%s' % (i, PASES) - test_changelog_walk(repo, pages=80) - test_changeset_walk(repo, limit=100) - test_files_walk(repo, limit=100)
--- a/setup.cfg Tue May 12 23:32:41 2015 +0200 +++ b/setup.cfg Thu May 14 21:42:26 2015 +0200 @@ -11,7 +11,8 @@ nologcapture = 1 [pytest] -norecursedirs = .* *.egg kallithea/tests/scripts +# only look for tests in kallithea/tests +python_files = kallithea/tests/**/test_*.py [compile_catalog] domain = kallithea