# HG changeset patch # User Marcin Kuzminski # Date 1277836914 -7200 # Node ID d6e2817734d21f5d146ae4954f148cc7e545faeb # Parent b00945765e7b01634a53654d2633baef0325bb9a Full rewrite of auth module, new functions/decorators. FIxed auth user diff -r b00945765e7b -r d6e2817734d2 pylons_app/lib/auth.py --- a/pylons_app/lib/auth.py Tue Jun 29 20:37:31 2010 +0200 +++ b/pylons_app/lib/auth.py Tue Jun 29 20:41:54 2010 +0200 @@ -2,7 +2,7 @@ # encoding: utf-8 # authentication and permission libraries # Copyright (C) 2009-2010 Marcin Kuzminski - + # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 @@ -24,15 +24,17 @@ """ from functools import wraps -from pylons import session, url, app_globals as g +from pylons import session, url, request from pylons.controllers.util import abort, redirect from pylons_app.model import meta -from pylons_app.model.db import User, Repo2Perm +from pylons_app.model.db import User, Repo2Perm, Repository, Permission +from pylons_app.lib.utils import get_repo_slug from sqlalchemy.exc import OperationalError from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound import crypt import logging -log = logging.getLogger(__name__) +from pylons import config +log = logging.getLogger(__name__) def get_crypt_password(password): """ @@ -64,16 +66,12 @@ """ A simple object that handles a mercurial username for authentication """ - username = 'None' - user_id = None - is_authenticated = False - is_admin = False - permissions = set() - group = set() - def __init__(self): - pass - + self.username = 'None' + self.user_id = None + self.is_authenticated = False + self.is_admin = False + self.permissions = {} def set_available_permissions(config): @@ -84,107 +82,304 @@ ie. to decorate new views with the newly created permission @param config: """ - from pylons_app.model.meta import Session - from pylons_app.model.db import Permission - logging.info('getting information about all available permissions') - sa = Session() + log.info('getting information about all available permissions') + sa = meta.Session all_perms = sa.query(Permission).all() - config['pylons.app_globals'].available_permissions = [x.permission_name for x in all_perms] + config['available_permissions'] = [x.permission_name for x in all_perms] + +def set_base_path(config): + config['base_path'] = config['pylons.app_globals'].base_path + +def fill_perms(user): + sa = meta.Session + user.permissions['repositories'] = {} + + #first fetch default permissions + default_perms = sa.query(Repo2Perm, Repository, Permission)\ + .join((Repository, Repo2Perm.repository == Repository.repo_name))\ + .join((Permission, Repo2Perm.permission_id == Permission.permission_id))\ + .filter(Repo2Perm.user_id == sa.query(User).filter(User.username == + 'default').one().user_id).all() + if user.is_admin: + user.permissions['global'] = set(['hg.admin']) + #admin have all rights full + for perm in default_perms: + p = 'repository.admin' + user.permissions['repositories'][perm.Repo2Perm.repository] = p + + else: + user.permissions['global'] = set() + for perm in default_perms: + if perm.Repository.private: + #disable defaults for private repos, + p = 'repository.none' + elif perm.Repository.user_id == user.user_id: + #set admin if owner + p = 'repository.admin' + else: + p = perm.Permission.permission_name + + user.permissions['repositories'][perm.Repo2Perm.repository] = p + + + user_perms = sa.query(Repo2Perm, Permission, Repository)\ + .join((Repository, Repo2Perm.repository == Repository.repo_name))\ + .join((Permission, Repo2Perm.permission_id == Permission.permission_id))\ + .filter(Repo2Perm.user_id == user.user_id).all() + #overwrite userpermissions with defaults + for perm in user_perms: + #set write if owner + if perm.Repository.user_id == user.user_id: + p = 'repository.write' + else: + p = perm.Permission.permission_name + user.permissions['repositories'][perm.Repo2Perm.repository] = p + return user + def get_user(session): """ Gets user from session, and wraps permissions into user @param session: """ user = session.get('hg_app_user', AuthUser()) + if user.is_authenticated: - sa = meta.Session - user.permissions = sa.query(Repo2Perm)\ - .filter(Repo2Perm.user_id == user.user_id).all() - + user = fill_perms(user) + + session['hg_app_user'] = user + session.save() return user #=============================================================================== -# DECORATORS +# CHECK DECORATORS #=============================================================================== class LoginRequired(object): """ Must be logged in to execute this function else redirect to login page """ - def __init__(self): - pass - + def __call__(self, func): - @wraps(func) def _wrapper(*fargs, **fkwargs): user = session.get('hg_app_user', AuthUser()) - log.info('Checking login required for user:%s', user.username) + log.debug('Checking login required for user:%s', user.username) if user.is_authenticated: - log.info('user %s is authenticated', user.username) - func(*fargs) + log.debug('user %s is authenticated', user.username) + func(*fargs) else: - logging.info('user %s not authenticated', user.username) - logging.info('redirecting to login page') + log.warn('user %s not authenticated', user.username) + log.debug('redirecting to login page') return redirect(url('login_home')) return _wrapper class PermsDecorator(object): + """ + Base class for decorators + """ - def __init__(self, *perms): - available_perms = g.available_permissions - for perm in perms: + def __init__(self, *required_perms): + available_perms = config['available_permissions'] + for perm in required_perms: if perm not in available_perms: - raise Exception("'%s' permission in not defined" % perm) - self.required_perms = set(perms) - self.user_perms = set([])#propagate this list from somewhere. + raise Exception("'%s' permission is not defined" % perm) + self.required_perms = set(required_perms) + self.user_perms = None - def __call__(self, func): + def __call__(self, func): @wraps(func) - def _wrapper(*args, **kwargs): - logging.info('checking %s permissions %s for %s', - self.__class__.__name__[-3:], self.required_perms, func.__name__) + def _wrapper(*fargs, **fkwargs): + self.user_perms = session.get('hg_app_user', AuthUser()).permissions + log.debug('checking %s permissions %s for %s', + self.__class__.__name__, self.required_perms, func.__name__) if self.check_permissions(): - logging.info('Permission granted for %s', func.__name__) - return func(*args, **kwargs) + log.debug('Permission granted for %s', func.__name__) + return func(*fargs) else: - logging.warning('Permission denied for %s', func.__name__) + log.warning('Permission denied for %s', func.__name__) #redirect with forbidden ret code - return redirect(url('access_denied'), 403) + return abort(403) return _wrapper def check_permissions(self): """ - Dummy function for overiding + Dummy function for overriding """ raise Exception('You have to write this function in child class') -class CheckPermissionAll(PermsDecorator): +class HasPermissionAllDecorator(PermsDecorator): """ Checks for access permission for all given predicates. All of them have to be meet in order to fulfill the request """ def check_permissions(self): - if self.required_perms.issubset(self.user_perms): + if self.required_perms.issubset(self.user_perms['global']): return True return False -class CheckPermissionAny(PermsDecorator): +class HasPermissionAnyDecorator(PermsDecorator): """ Checks for access permission for any of given predicates. In order to fulfill the request any of predicates must be meet """ def check_permissions(self): + if self.required_perms.intersection(self.user_perms['global']): + return True + return False + +class HasRepoPermissionAllDecorator(PermsDecorator): + """ + Checks for access permission for all given predicates for specific + repository. All of them have to be meet in order to fulfill the request + """ + + def check_permissions(self): + repo_name = get_repo_slug(request) + user_perms = set([self.user_perms['repositories'][repo_name]]) + if self.required_perms.issubset(user_perms): + return True + return False + + +class HasRepoPermissionAnyDecorator(PermsDecorator): + """ + Checks for access permission for any of given predicates for specific + repository. In order to fulfill the request any of predicates must be meet + """ + + def check_permissions(self): + repo_name = get_repo_slug(request) + + user_perms = set([self.user_perms['repositories'][repo_name]]) + if self.required_perms.intersection(user_perms): + return True + return False +#=============================================================================== +# CHECK FUNCTIONS +#=============================================================================== + +class PermsFunction(object): + """ + Base function for other check functions + """ + + def __init__(self, *perms): + available_perms = config['available_permissions'] + + for perm in perms: + if perm not in available_perms: + raise Exception("'%s' permission in not defined" % perm) + self.required_perms = set(perms) + self.user_perms = None + self.granted_for = '' + self.repo_name = None + + def __call__(self, check_Location=''): + user = session['hg_app_user'] + self.user_perms = user.permissions + self.granted_for = user.username + log.debug('checking %s %s', self.__class__.__name__, self.required_perms) + + if self.check_permissions(): + log.debug('Permission granted for %s @%s', self.granted_for, + check_Location) + return True + + else: + log.warning('Permission denied for %s @%s', self.granted_for, + check_Location) + return False + + def check_permissions(self): + """ + Dummy function for overriding + """ + raise Exception('You have to write this function in child class') + +class HasPermissionAll(PermsFunction): + def check_permissions(self): + if self.required_perms.issubset(self.user_perms['global']): + return True + return False + +class HasPermissionAny(PermsFunction): + def check_permissions(self): + if self.required_perms.intersection(self.user_perms['global']): + return True + return False + +class HasRepoPermissionAll(PermsFunction): + + def __call__(self, repo_name=None, check_Location=''): + self.repo_name = repo_name + return super(HasRepoPermissionAll, self).__call__(check_Location) + + def check_permissions(self): + if not self.repo_name: + self.repo_name = get_repo_slug(request) + + self.user_perms = set([self.user_perms['repositories']\ + .get(self.repo_name)]) + self.granted_for = self.repo_name + if self.required_perms.issubset(self.user_perms): + return True + return False + +class HasRepoPermissionAny(PermsFunction): + + + def __call__(self, repo_name=None, check_Location=''): + self.repo_name = repo_name + return super(HasRepoPermissionAny, self).__call__(check_Location) + + def check_permissions(self): + if not self.repo_name: + self.repo_name = get_repo_slug(request) + + self.user_perms = set([self.user_perms['repositories']\ + .get(self.repo_name)]) + self.granted_for = self.repo_name if self.required_perms.intersection(self.user_perms): return True return False +#=============================================================================== +# SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH +#=============================================================================== - +class HasPermissionAnyMiddleware(object): + def __init__(self, *perms): + self.required_perms = set(perms) + + def __call__(self, user, repo_name): + usr = AuthUser() + usr.user_id = user.user_id + usr.username = user.username + usr.is_admin = user.admin + + try: + self.user_perms = set([fill_perms(usr)\ + .permissions['repositories'][repo_name]]) + except: + self.user_perms = set() + self.granted_for = '' + self.username = user.username + self.repo_name = repo_name + return self.check_permissions() + + def check_permissions(self): + log.debug('checking mercurial protocol ' + 'permissions for user:%s repository:%s', + self.username, self.repo_name) + if self.required_perms.intersection(self.user_perms): + log.debug('permission granted') + return True + log.debug('permission denied') + return False