# HG changeset patch # User Marcin Kuzminski # Date 1346138889 -7200 # Node ID 6d904a0cd48de3fb0aeea047a680925177b2b564 # Parent e291f25ea87f888504f9109b22ee24d0e3049134 added new suite of tests for VCS operations - locking tests - push pull tests - added new ENV option to bypass re-creation of test enviroment diff -r e291f25ea87f -r 6d904a0cd48d rhodecode/config/environment.py --- a/rhodecode/config/environment.py Tue Aug 28 09:05:27 2012 +0200 +++ b/rhodecode/config/environment.py Tue Aug 28 09:28:09 2012 +0200 @@ -78,7 +78,10 @@ from rhodecode.lib.utils import create_test_env, create_test_index from rhodecode.tests import TESTS_TMP_PATH - create_test_env(TESTS_TMP_PATH, config) + # set RC_NO_TMP_PATH=1 to disable re-creating the database and + # test repos + if not int(os.environ.get('RC_NO_TMP_PATH', 0)): + create_test_env(TESTS_TMP_PATH, config) # set RC_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests if not int(os.environ.get('RC_WHOOSH_TEST_DISABLE', 0)): create_test_index(TESTS_TMP_PATH, config, True) diff -r e291f25ea87f -r 6d904a0cd48d rhodecode/lib/base.py --- a/rhodecode/lib/base.py Tue Aug 28 09:05:27 2012 +0200 +++ b/rhodecode/lib/base.py Tue Aug 28 09:28:09 2012 +0200 @@ -171,7 +171,7 @@ do the locking. Think about this as signals passed to hooks what to do. """ - locked = False + locked = False # defines that locked error should be thrown to user make_lock = None repo = Repository.get_by_repo_name(repo) user = User.get(user_id) @@ -187,7 +187,7 @@ #check if it's already locked !, if it is compare users user_id, _date = repo.locked if user.user_id == user_id: - log.debug('Got push from user, now unlocking' % (user)) + log.debug('Got push from user %s, now unlocking' % (user)) # unlock if we have push from user who locked make_lock = False else: @@ -202,7 +202,8 @@ else: log.debug('Repository %s do not have locking enabled' % (repo)) - + log.debug('FINAL locking values make_lock:%s,locked:%s,locked_by:%s' + % (make_lock, locked, locked_by)) return make_lock, locked, locked_by def __call__(self, environ, start_response): diff -r e291f25ea87f -r 6d904a0cd48d rhodecode/tests/scripts/test_scm_operations.py --- a/rhodecode/tests/scripts/test_scm_operations.py Tue Aug 28 09:05:27 2012 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,242 +0,0 @@ -# -*- coding: utf-8 -*- -""" - rhodecode.tests.test_scm_operations - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Test suite for making push/pull operations. - Run using:: - - RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py - - :created_on: Dec 30, 2010 - :author: marcink - :copyright: (C) 2010-2012 Marcin Kuzminski - :license: GPLv3, see COPYING for more details. -""" -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import tempfile -from os.path import join as jn -from os.path import dirname as dn - -from tempfile import _RandomNameSequence -from subprocess import Popen, PIPE - -from rhodecode.tests import * -from rhodecode.model.db import User, Repository, UserLog -from rhodecode.model.meta import Session - -DEBUG = True -HOST = '127.0.0.1:5000' # test host - - -class Command(object): - - def __init__(self, cwd): - self.cwd = cwd - - def execute(self, cmd, *args): - """ - Runs command on the system with given ``args``. - """ - - command = cmd + ' ' + ' '.join(args) - if DEBUG: - print '*** CMD %s ***' % command - p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) - stdout, stderr = p.communicate() - if DEBUG: - print stdout, stderr - return stdout, stderr - - -def _get_tmp_dir(): - return tempfile.mkdtemp(prefix='rc_integration_test') - - -def _construct_url(repo, dest=None, **kwargs): - if dest is None: - #make temp clone - dest = _get_tmp_dir() - params = { - 'user': TEST_USER_ADMIN_LOGIN, - 'passwd': TEST_USER_ADMIN_PASS, - 'host': HOST, - 'cloned_repo': repo, - 'dest': dest - } - params.update(**kwargs) - if params['user'] and params['passwd']: - _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params - else: - _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params - return _url - - -def set_anonymous_access(enable=True): - user = User.get_by_username(User.DEFAULT_USER) - user.active = enable - Session().add(user) - Session().commit() - print '\tanonymous access is now:', enable - if enable != User.get_by_username(User.DEFAULT_USER).active: - raise Exception('Cannot set anonymous access') - - -def setup_module(): - #DISABLE ANONYMOUS ACCESS - set_anonymous_access(False) - - -def test_clone_hg_repo_by_admin(): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - assert 'requesting all changes' in stdout - assert 'adding changesets' in stdout - assert 'adding manifests' in stdout - assert 'adding file changes' in stdout - - assert stderr == '' - - -def test_clone_git_repo_by_admin(): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - assert 'Cloning into' in stdout - assert stderr == '' - - -def test_clone_wrong_credentials_hg(): - clone_url = _construct_url(HG_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'abort: authorization failed' in stderr - - -def test_clone_wrong_credentials_git(): - clone_url = _construct_url(GIT_REPO, passwd='bad!') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'fatal: Authentication failed' in stderr - - -def test_clone_git_dir_as_hg(): - clone_url = _construct_url(GIT_REPO) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - -def test_clone_hg_repo_as_git(): - clone_url = _construct_url(HG_REPO) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found: did you run git update-server-info on the server' in stderr - - -def test_clone_non_existing_path_hg(): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - assert 'HTTP Error 404: Not Found' in stderr - - -def test_clone_non_existing_path_git(): - clone_url = _construct_url('trololo') - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found: did you run git update-server-info on the server' in stderr - - -def test_push_new_file_hg(): - DEST = _get_tmp_dir() - clone_url = _construct_url(HG_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('hg clone', clone_url) - - # commit some stuff into this repo - cwd = path = jn(DEST) - added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) - Command(cwd).execute('touch %s' % added_file) - Command(cwd).execute('hg add %s' % added_file) - - for i in xrange(3): - cmd = """echo 'added_line%s' >> %s""" % (i, added_file) - Command(cwd).execute(cmd) - - cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % ( - i, - 'Marcin Kuźminski ', - added_file - ) - Command(cwd).execute(cmd) - # PUSH it back - clone_url = _construct_url(HG_REPO, dest='') - stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) - - assert 'pushing to' in stdout - assert 'Repository size' in stdout - assert 'Last revision is now' in stdout - - -def test_push_new_file_git(): - DEST = _get_tmp_dir() - clone_url = _construct_url(GIT_REPO, dest=DEST) - stdout, stderr = Command('/tmp').execute('git clone', clone_url) - - # commit some stuff into this repo - cwd = path = jn(DEST) - added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) - Command(cwd).execute('touch %s' % added_file) - Command(cwd).execute('git add %s' % added_file) - - for i in xrange(3): - cmd = """echo 'added_line%s' >> %s""" % (i, added_file) - Command(cwd).execute(cmd) - - cmd = """git ci -m 'commited new %s' --author '%s' %s """ % ( - i, - 'Marcin Kuźminski ', - added_file - ) - Command(cwd).execute(cmd) - # PUSH it back - clone_url = _construct_url(GIT_REPO, dest='') - stdout, stderr = Command(cwd).execute('git push --verbose', clone_url) - - #WTF git stderr ?! - assert 'master -> master' in stderr - - -def test_push_modify_existing_file_hg(): - assert 0 - - -def test_push_modify_existing_file_git(): - assert 0 - - -def test_push_wrong_credentials_hg(): - assert 0 - - -def test_push_wrong_credentials_git(): - assert 0 - - -def test_push_back_to_wrong_url_hg(): - assert 0 - - -def test_push_back_to_wrong_url_git(): - assert 0 - - -#TODO: write all locking tests diff -r e291f25ea87f -r 6d904a0cd48d rhodecode/tests/scripts/test_vcs_operations.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/scripts/test_vcs_operations.py Tue Aug 28 09:28:09 2012 +0200 @@ -0,0 +1,425 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_scm_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations. + Run using:: + + RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py + + :created_on: Dec 30, 2010 + :author: marcink + :copyright: (C) 2010-2012 Marcin Kuzminski + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import tempfile +import unittest +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from rhodecode.tests import * +from rhodecode.model.db import User, Repository, UserLog +from rhodecode.model.meta import Session +from rhodecode.model.repo import RepoModel + +DEBUG = True +HOST = '127.0.0.1:5000' # test host + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """ + Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + if DEBUG: + print '*** CMD %s ***' % command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def _get_tmp_dir(): + return tempfile.mkdtemp(prefix='rc_integration_test') + + +def _construct_url(repo, dest=None, **kwargs): + if dest is None: + #make temp clone + dest = _get_tmp_dir() + params = { + 'user': TEST_USER_ADMIN_LOGIN, + 'passwd': TEST_USER_ADMIN_PASS, + 'host': HOST, + 'cloned_repo': repo, + 'dest': dest + } + params.update(**kwargs) + if params['user'] and params['passwd']: + _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params + else: + _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params + return _url + + +def _add_files_and_push(vcs, DEST, **kwargs): + """ + Generate some files, add it to DEST repo and push back + vcs is git or hg and defines what VCS we want to make those files for + + :param vcs: + :param DEST: + """ + # commit some stuff into this repo + cwd = path = jn(DEST) + #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) + added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) + Command(cwd).execute('touch %s' % added_file) + Command(cwd).execute('%s add %s' % (vcs, added_file)) + + for i in xrange(3): + cmd = """echo 'added_line%s' >> %s""" % (i, added_file) + Command(cwd).execute(cmd) + if vcs == 'hg': + cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( + i, 'Marcin Kuźminski ', added_file + ) + elif vcs == 'git': + cmd = """git ci -m 'commited new %s' --author '%s' %s """ % ( + i, 'Marcin Kuźminski ', added_file + ) + Command(cwd).execute(cmd) + # PUSH it back + if vcs == 'hg': + _REPO = HG_REPO + elif vcs == 'git': + _REPO = GIT_REPO + + kwargs['dest'] = '' + clone_url = _construct_url(_REPO, **kwargs) + if 'clone_url' in kwargs: + clone_url = kwargs['clone_url'] + if vcs == 'hg': + stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) + elif vcs == 'git': + stdout, stderr = Command(cwd).execute('git push', clone_url + " master") + + return stdout, stderr + + +def set_anonymous_access(enable=True): + user = User.get_by_username(User.DEFAULT_USER) + user.active = enable + Session().add(user) + Session().commit() + print '\tanonymous access is now:', enable + if enable != User.get_by_username(User.DEFAULT_USER).active: + raise Exception('Cannot set anonymous access') + + +#============================================================================== +# TESTS +#============================================================================== + +class TestVCSOperations(unittest.TestCase): + + @classmethod + def setup_class(cls): + #DISABLE ANONYMOUS ACCESS + set_anonymous_access(False) + + def setUp(self): + r = Repository.get_by_repo_name(GIT_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + r = Repository.get_by_repo_name(HG_REPO) + Repository.unlock(r) + r.enable_locking = False + Session().add(r) + Session().commit() + + def test_clone_hg_repo_by_admin(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + assert 'requesting all changes' in stdout + assert 'adding changesets' in stdout + assert 'adding manifests' in stdout + assert 'adding file changes' in stdout + + assert stderr == '' + + def test_clone_git_repo_by_admin(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + assert 'Cloning into' in stdout + assert stderr == '' + + def test_clone_wrong_credentials_hg(self): + clone_url = _construct_url(HG_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'abort: authorization failed' in stderr + + def test_clone_wrong_credentials_git(self): + clone_url = _construct_url(GIT_REPO, passwd='bad!') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'fatal: Authentication failed' in stderr + + def test_clone_git_dir_as_hg(self): + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_hg_repo_as_git(self): + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found:' in stderr + + def test_clone_non_existing_path_hg(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + assert 'HTTP Error 404: Not Found' in stderr + + def test_clone_non_existing_path_git(self): + clone_url = _construct_url('trololo') + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + assert 'not found:' in stderr + + def test_push_new_file_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST) + + assert 'pushing to' in stdout + assert 'Repository size' in stdout + assert 'Last revision is now' in stdout + + def test_push_new_file_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + # commit some stuff into this repo + stdout, stderr = _add_files_and_push('git', DEST) + + #WTF git stderr ?! + assert 'master -> master' in stderr + + def test_push_wrong_credentials_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, user='bad', + passwd='name') + + assert 'abort: authorization failed' in stderr + + def test_push_wrong_credentials_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, user='bad', + passwd='name') + + assert 'fatal: Authentication failed' in stderr + + def test_push_back_to_wrong_url_hg(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + stdout, stderr = _add_files_and_push('hg', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'HTTP Error 404: Not Found' in stderr + + def test_push_back_to_wrong_url_git(self): + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + stdout, stderr = _add_files_and_push('git', DEST, + clone_url='http://127.0.0.1:5000/tmp',) + + assert 'not found:' in stderr + + def test_clone_and_create_lock_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_and_create_lock_git(self): + # enable locking + r = Repository.get_by_repo_name(GIT_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + # clone + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #check if lock was made + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + def test_clone_after_repo_was_locked_hg(self): + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(HG_REPO) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + + def test_clone_after_repo_was_locked_git(self): + #lock repo + r = Repository.get_by_repo_name(GIT_REPO) + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + #pull fails since repo is locked + clone_url = _construct_url(GIT_REPO) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) + #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw + # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ + msg = "405 Method Not Allowed" + assert msg in stderr + + def test_push_on_locked_repo_by_other_user_hg(self): + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #lock repo + r = Repository.get_by_repo_name(HG_REPO) + # let this user actually push ! + RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, + perm='repository.write') + Session().commit() + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + + #push fails repo is locked by other user ! + stdout, stderr = _add_files_and_push('hg', DEST, + user=TEST_USER_REGULAR_LOGIN, + passwd=TEST_USER_REGULAR_PASS) + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (HG_REPO, TEST_USER_ADMIN_LOGIN)) + assert msg in stderr + +#TODO: fix me ! somehow during tests hooks don't get called on GIT +# def test_push_on_locked_repo_by_other_user_git(self): +# #clone some temp +# DEST = _get_tmp_dir() +# clone_url = _construct_url(GIT_REPO, dest=DEST) +# stdout, stderr = Command('/tmp').execute('git clone', clone_url) +# +# #lock repo +# r = Repository.get_by_repo_name(GIT_REPO) +# # let this user actually push ! +# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, +# perm='repository.write') +# Session().commit() +# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) +# +# #push fails repo is locked by other user ! +# stdout, stderr = _add_files_and_push('git', DEST, +# user=TEST_USER_REGULAR_LOGIN, +# passwd=TEST_USER_REGULAR_PASS) +# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" +# % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) +# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw +# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ +# msg = "405 Method Not Allowed" +# assert msg in stderr + + def test_push_unlocks_repository_hg(self): + # enable locking + r = Repository.get_by_repo_name(HG_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(HG_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('hg clone', clone_url) + + #check for lock repo after clone + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + #push is ok and repo is now unlocked + stdout, stderr = _add_files_and_push('hg', DEST) + assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout + #we need to cleanup the Session Here ! + Session.remove() + r = Repository.get_by_repo_name(HG_REPO) + assert r.locked == [None, None] + +#TODO: fix me ! somehow during tests hooks don't get called on GIT +# def test_push_unlocks_repository_git(self): +# # enable locking +# r = Repository.get_by_repo_name(GIT_REPO) +# r.enable_locking = True +# Session().add(r) +# Session().commit() +# #clone some temp +# DEST = _get_tmp_dir() +# clone_url = _construct_url(GIT_REPO, dest=DEST) +# stdout, stderr = Command('/tmp').execute('git clone', clone_url) +# +# #check for lock repo after clone +# r = Repository.get_by_repo_name(GIT_REPO) +# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id +# +# #push is ok and repo is now unlocked +# stdout, stderr = _add_files_and_push('git', DEST) +# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout +# #we need to cleanup the Session Here ! +# Session.remove() +# r = Repository.get_by_repo_name(GIT_REPO) +# assert r.locked == [None, None]