Mercurial > kallithea
changeset 1628:de71a4bde097 beta
Some code cleanups and fixes
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Mon, 31 Oct 2011 21:42:41 +0200 |
parents | c622e3a85499 |
children | 2196aa27954b |
files | development.ini production.ini rhodecode/controllers/login.py rhodecode/lib/auth.py rhodecode/lib/base.py rhodecode/model/forms.py rhodecode/model/user.py rhodecode/tests/__init__.py rhodecode/tests/_test_concurency.py rhodecode/tests/test_concurency.py test.ini |
diffstat | 11 files changed, 259 insertions(+), 241 deletions(-) [+] |
line wrap: on
line diff
--- a/development.ini Mon Oct 31 20:52:33 2011 +0200 +++ b/development.ini Mon Oct 31 21:42:41 2011 +0200 @@ -53,6 +53,8 @@ force_https = false commit_parse_limit = 25 use_gravatar = true +container_auth_enabled = false +proxypass_auth_enabled = false #################################### ### CELERY CONFIG ####
--- a/production.ini Mon Oct 31 20:52:33 2011 +0200 +++ b/production.ini Mon Oct 31 21:42:41 2011 +0200 @@ -53,6 +53,8 @@ force_https = false commit_parse_limit = 50 use_gravatar = true +container_auth_enabled = false +proxypass_auth_enabled = false #################################### ### CELERY CONFIG ####
--- a/rhodecode/controllers/login.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/controllers/login.py Mon Oct 31 21:42:41 2011 +0200 @@ -49,7 +49,7 @@ super(LoginController, self).__before__() def index(self): - #redirect if already logged in + # redirect if already logged in c.came_from = request.GET.get('came_from', None) if self.rhodecode_user.is_authenticated \ @@ -62,7 +62,7 @@ login_form = LoginForm() try: c.form_result = login_form.to_python(dict(request.POST)) - #form checks for username/password, now we're authenticated + # form checks for username/password, now we're authenticated username = c.form_result['username'] user = User.get_by_username(username, case_insensitive=True) auth_user = AuthUser(user.user_id)
--- a/rhodecode/lib/auth.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/lib/auth.py Mon Oct 31 21:42:41 2011 +0200 @@ -125,16 +125,23 @@ def check_password(password, hashed): return RhodeCodeCrypto.hash_check(password, hashed) - -def generate_api_key(username, salt=None): +def generate_api_key(str_, salt=None): + """ + Generates API KEY from given string + + :param str_: + :param salt: + """ + if salt is None: salt = _RandomNameSequence().next() - return hashlib.sha1(username + salt).hexdigest() + return hashlib.sha1(str_ + salt).hexdigest() def authfunc(environ, username, password): - """Dummy authentication function used in Mercurial/Git/ and access control, + """ + Dummy authentication function used in Mercurial/Git/ and access control, :param environ: needed only for using in Basic auth """ @@ -142,7 +149,8 @@ def authenticate(username, password): - """Authentication function used for access control, + """ + Authentication function used for access control, firstly checks for db authentication then if ldap is enabled for ldap authentication, also creates ldap user if not in database @@ -228,33 +236,35 @@ if user is None: user_model = UserModel() user_attrs = { - 'name': username, - 'lastname': None, - 'email': None, - } - if not user_model.create_for_container_auth(username, user_attrs): + 'name': username, + 'lastname': None, + 'email': None, + } + user = user_model.create_for_container_auth(username, user_attrs) + if not user: return None - user = User.get_by_username(username) log.info('User %s was created by container authentication', username) if not user.active: return None user.update_lastlogin() - log.debug('User %s is now logged in by container authentication', user.username) + log.debug('User %s is now logged in by container authentication', + user.username) return user -def get_container_username(environ, cfg=config): +def get_container_username(environ, cfg): from paste.httpheaders import REMOTE_USER from paste.deploy.converters import asbool + proxy_pass_enabled = asbool(cfg.get('proxypass_auth_enabled', False)) username = REMOTE_USER(environ) - - if not username and asbool(cfg.get('proxypass_auth_enabled', False)): + + if not username and proxy_pass_enabled: username = environ.get('HTTP_X_FORWARDED_USER') - if username: - #Removing realm and domain from username + if username and proxy_pass_enabled: + # Removing realm and domain from username username = username.partition('@')[0] username = username.rpartition('\\')[2] log.debug('Received username %s from container', username) @@ -276,7 +286,7 @@ self.user_id = user_id self.api_key = None self.username = username - + self.name = '' self.lastname = '' self.email = '' @@ -290,14 +300,17 @@ user_model = UserModel() self.anonymous_user = User.get_by_username('default') is_user_loaded = False + + # try go get user by api key if self._api_key and self._api_key != self.anonymous_user.api_key: - #try go get user by api key log.debug('Auth User lookup by API KEY %s', self._api_key) is_user_loaded = user_model.fill_data(self, api_key=self._api_key) - elif self.user_id is not None \ - and self.user_id != self.anonymous_user.user_id: + # lookup by userid + elif (self.user_id is not None and + self.user_id != self.anonymous_user.user_id): log.debug('Auth User lookup by USER ID %s', self.user_id) is_user_loaded = user_model.fill_data(self, user_id=self.user_id) + # lookup by username elif self.username: log.debug('Auth User lookup by USER NAME %s', self.username) dbuser = login_container_auth(self.username) @@ -308,10 +321,10 @@ is_user_loaded = True if not is_user_loaded: + # if we cannot authenticate user try anonymous if self.anonymous_user.active is True: - user_model.fill_data(self, - user_id=self.anonymous_user.user_id) - #then we set this user is logged in + user_model.fill_data(self,user_id=self.anonymous_user.user_id) + # then we set this user is logged in self.is_authenticated = True else: self.user_id = None @@ -337,13 +350,13 @@ self.is_authenticated) def set_authenticated(self, authenticated=True): - if self.user_id != self.anonymous_user.user_id: self.is_authenticated = authenticated def set_available_permissions(config): - """This function will propagate pylons globals with all available defined + """ + This function will propagate pylons globals with all available defined permission given in db. We don't want to check each time from db for new permissions since adding a new permission also requires application restart ie. to decorate new views with the newly created permission @@ -474,7 +487,7 @@ return redirect(url('login_home', came_from=p)) else: - #redirect with forbidden ret code + # redirect with forbidden ret code return abort(403) def check_permissions(self): @@ -661,3 +674,4 @@ return True log.debug('permission denied') return False +
--- a/rhodecode/lib/base.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/lib/base.py Mon Oct 31 21:42:41 2011 +0200 @@ -33,8 +33,6 @@ self.sa = meta.Session() self.scm_model = ScmModel(self.sa) - #c.unread_journal = scm_model.get_unread_journal() - def __call__(self, environ, start_response): """Invoke the Controller""" # WSGIController.__call__ dispatches to the Controller method @@ -42,15 +40,15 @@ # available in environ['pylons.routes_dict'] start = time.time() try: - # putting this here makes sure that we update permissions each time + # make sure that we update permissions each time we call controller api_key = request.GET.get('api_key') user_id = getattr(session.get('rhodecode_user'), 'user_id', None) if asbool(config.get('container_auth_enabled', False)): username = get_container_username(environ) else: username = None - - self.rhodecode_user = c.rhodecode_user = AuthUser(user_id, api_key, username) + auth_user = AuthUser(user_id, api_key, username) + self.rhodecode_user = c.rhodecode_user = auth_user if not self.rhodecode_user.is_authenticated and \ self.rhodecode_user.user_id is not None: self.rhodecode_user.set_authenticated( @@ -66,11 +64,13 @@ class BaseRepoController(BaseController): """ - Base class for controllers responsible for loading all needed data - for those controllers, loaded items are + Base class for controllers responsible for loading all needed data for + repository loaded items are - c.rhodecode_repo: instance of scm repository (taken from cache) - + c.rhodecode_repo: instance of scm repository + c.rhodecode_db_repo: instance of db + c.repository_followers: number of followers + c.repository_forks: number of forks """ def __before__(self): @@ -86,7 +86,6 @@ redirect(url('home')) - c.repository_followers = \ - self.scm_model.get_followers(c.repo_name) + c.repository_followers = self.scm_model.get_followers(c.repo_name) c.repository_forks = self.scm_model.get_forks(c.repo_name)
--- a/rhodecode/model/forms.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/model/forms.py Mon Oct 31 21:42:41 2011 +0200 @@ -208,7 +208,7 @@ password = value['password'] username = value['username'] user = User.get_by_username(username) - + if authenticate(username, password): return value else:
--- a/rhodecode/model/user.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/model/user.py Mon Oct 31 21:42:41 2011 +0200 @@ -106,20 +106,20 @@ new_user.password = None new_user.api_key = generate_api_key(username) new_user.email = attrs['email'] - new_user.active = True + new_user.active = attrs.get('active', True) new_user.name = attrs['name'] new_user.lastname = attrs['lastname'] self.sa.add(new_user) self.sa.commit() - return True + return new_user except (DatabaseError,): log.error(traceback.format_exc()) self.sa.rollback() raise - log.debug('User %s already exists. Skipping creation of account for container auth.', - username) - return False + log.debug('User %s already exists. Skipping creation of account' + ' for container auth.', username) + return None def create_ldap(self, username, password, user_dn, attrs): """ @@ -141,21 +141,21 @@ new_user.password = get_crypt_password(password) new_user.api_key = generate_api_key(username) new_user.email = attrs['email'] - new_user.active = attrs.get('active',True) + new_user.active = attrs.get('active', True) new_user.ldap_dn = safe_unicode(user_dn) new_user.name = attrs['name'] new_user.lastname = attrs['lastname'] self.sa.add(new_user) self.sa.commit() - return True + return new_user except (DatabaseError,): log.error(traceback.format_exc()) self.sa.rollback() raise log.debug('this %s user exists skipping creation of ldap account', username) - return False + return None def create_registration(self, form_data): from rhodecode.lib.celerylib import tasks, run_task
--- a/rhodecode/tests/__init__.py Mon Oct 31 20:52:33 2011 +0200 +++ b/rhodecode/tests/__init__.py Mon Oct 31 21:42:41 2011 +0200 @@ -21,7 +21,6 @@ from rhodecode.model import meta import logging - log = logging.getLogger(__name__) import pylons.test @@ -31,7 +30,7 @@ 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS' ] # Invoke websetup with the current config file -#SetupCommand('setup-app').run([config_file]) +# SetupCommand('setup-app').run([config_file]) ##RUNNING DESIRED TESTS # nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account @@ -69,7 +68,7 @@ response = self.app.post(url(controller='login', action='index'), {'username':username, 'password':password}) - + if 'invalid user name' in response.body: self.fail('could not login using %s %s' % (username, password))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rhodecode/tests/_test_concurency.py Mon Oct 31 21:42:41 2011 +0200 @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_hg_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations + + :created_on: Dec 30, 2010 + :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com> + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import sys +import shutil +import logging +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from paste.deploy import appconfig +from pylons import config +from sqlalchemy import engine_from_config + +from rhodecode.lib.utils import add_cache +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import User, Repository +from rhodecode.lib.auth import get_crypt_password + +from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO +from rhodecode.config.environment import load_environment + +rel_path = dn(dn(dn(os.path.abspath(__file__)))) +conf = appconfig('config:development.ini', relative_to=rel_path) +load_environment(conf.global_conf, conf.local_conf) + +add_cache(conf) + +USER = 'test_admin' +PASS = 'test12' +HOST = '127.0.0.1:5000' +DEBUG = True +log = logging.getLogger(__name__) + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + log.debug('Executing %s' % command) + if DEBUG: + print command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + +def get_session(): + engine = engine_from_config(conf, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session() + return sa + + +def create_test_user(force=True): + print 'creating test user' + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + + if force and user is not None: + print 'removing current user' + for repo in sa.query(Repository).filter(Repository.user == user).all(): + sa.delete(repo) + sa.delete(user) + sa.commit() + + if user is None or force: + print 'creating new one' + new_usr = User() + new_usr.username = USER + new_usr.password = get_crypt_password(PASS) + new_usr.email = 'mail@mail.com' + new_usr.name = 'test' + new_usr.lastname = 'lasttestname' + new_usr.active = True + new_usr.admin = True + sa.add(new_usr) + sa.commit() + + print 'done' + + +def create_test_repo(force=True): + print 'creating test repo' + from rhodecode.model.repo import RepoModel + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + if user is None: + raise Exception('user not found') + + + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print 'repo not found creating' + + form_data = {'repo_name':HG_REPO, + 'repo_type':'hg', + 'private':False, + 'clone_uri':'' } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) + + print 'done' + +def set_anonymous_access(enable=True): + sa = get_session() + user = sa.query(User).filter(User.username == 'default').one() + user.active = enable + sa.add(user) + sa.commit() + +def get_anonymous_access(): + sa = get_session() + return sa.query(User).filter(User.username == 'default').one().active + + +#============================================================================== +# TESTS +#============================================================================== +def test_clone_with_credentials(no_errors=False, repo=HG_REPO): + cwd = path = jn(TESTS_TMP_PATH, repo) + + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':repo, + 'dest':path + _RandomNameSequence().next()} + + stdout, stderr = Command(cwd).execute('hg clone', clone_url) + + if no_errors is False: + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + +if __name__ == '__main__': + try: + create_test_user(force=False) + + for i in range(int(sys.argv[2])): + test_clone_with_credentials(repo=sys.argv[1]) + + except Exception, e: + sys.exit('stop on %s' % e)
--- a/rhodecode/tests/test_concurency.py Mon Oct 31 20:52:33 2011 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,189 +0,0 @@ -# -*- coding: utf-8 -*- -""" - rhodecode.tests.test_hg_operations - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Test suite for making push/pull operations - - :created_on: Dec 30, 2010 - :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com> - :license: GPLv3, see COPYING for more details. -""" -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import sys -import shutil -import logging -from os.path import join as jn -from os.path import dirname as dn - -from tempfile import _RandomNameSequence -from subprocess import Popen, PIPE - -from paste.deploy import appconfig -from pylons import config -from sqlalchemy import engine_from_config - -from rhodecode.lib.utils import add_cache -from rhodecode.model import init_model -from rhodecode.model import meta -from rhodecode.model.db import User, Repository -from rhodecode.lib.auth import get_crypt_password - -from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO -from rhodecode.config.environment import load_environment - -rel_path = dn(dn(dn(os.path.abspath(__file__)))) -conf = appconfig('config:development.ini', relative_to=rel_path) -load_environment(conf.global_conf, conf.local_conf) - -add_cache(conf) - -USER = 'test_admin' -PASS = 'test12' -HOST = '127.0.0.1:5000' -DEBUG = True -log = logging.getLogger(__name__) - - -class Command(object): - - def __init__(self, cwd): - self.cwd = cwd - - def execute(self, cmd, *args): - """Runs command on the system with given ``args``. - """ - - command = cmd + ' ' + ' '.join(args) - log.debug('Executing %s' % command) - if DEBUG: - print command - p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) - stdout, stderr = p.communicate() - if DEBUG: - print stdout, stderr - return stdout, stderr - -def get_session(): - engine = engine_from_config(conf, 'sqlalchemy.db1.') - init_model(engine) - sa = meta.Session() - return sa - - -def create_test_user(force=True): - print 'creating test user' - sa = get_session() - - user = sa.query(User).filter(User.username == USER).scalar() - - if force and user is not None: - print 'removing current user' - for repo in sa.query(Repository).filter(Repository.user == user).all(): - sa.delete(repo) - sa.delete(user) - sa.commit() - - if user is None or force: - print 'creating new one' - new_usr = User() - new_usr.username = USER - new_usr.password = get_crypt_password(PASS) - new_usr.email = 'mail@mail.com' - new_usr.name = 'test' - new_usr.lastname = 'lasttestname' - new_usr.active = True - new_usr.admin = True - sa.add(new_usr) - sa.commit() - - print 'done' - - -def create_test_repo(force=True): - print 'creating test repo' - from rhodecode.model.repo import RepoModel - sa = get_session() - - user = sa.query(User).filter(User.username == USER).scalar() - if user is None: - raise Exception('user not found') - - - repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() - - if repo is None: - print 'repo not found creating' - - form_data = {'repo_name':HG_REPO, - 'repo_type':'hg', - 'private':False, - 'clone_uri':'' } - rm = RepoModel(sa) - rm.base_path = '/home/hg' - rm.create(form_data, user) - - print 'done' - -def set_anonymous_access(enable=True): - sa = get_session() - user = sa.query(User).filter(User.username == 'default').one() - user.active = enable - sa.add(user) - sa.commit() - -def get_anonymous_access(): - sa = get_session() - return sa.query(User).filter(User.username == 'default').one().active - - -#============================================================================== -# TESTS -#============================================================================== -def test_clone_with_credentials(no_errors=False, repo=HG_REPO): - cwd = path = jn(TESTS_TMP_PATH, repo) - - - try: - shutil.rmtree(path, ignore_errors=True) - os.makedirs(path) - #print 'made dirs %s' % jn(path) - except OSError: - raise - - - clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ - {'user':USER, - 'pass':PASS, - 'host':HOST, - 'cloned_repo':repo, - 'dest':path + _RandomNameSequence().next()} - - stdout, stderr = Command(cwd).execute('hg clone', clone_url) - - if no_errors is False: - assert """adding file changes""" in stdout, 'no messages about cloning' - assert """abort""" not in stderr , 'got error from clone' - -if __name__ == '__main__': - try: - create_test_user(force=False) - - for i in range(int(sys.argv[2])): - test_clone_with_credentials(repo=sys.argv[1]) - - except Exception, e: - sys.exit('stop on %s' % e)