Mercurial > kallithea
diff rhodecode/lib/auth.py @ 1117:6eb5bb24a948 beta
Major rewrite of auth objects. Moved parts of filling user data into user model.
Rewrote AuthUser adding access by api key.
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 09 Mar 2011 16:34:29 +0100 |
parents | 716911af91e1 |
children | b0e2c949c34b |
line wrap: on
line diff
--- a/rhodecode/lib/auth.py Sun Mar 06 00:06:28 2011 +0100 +++ b/rhodecode/lib/auth.py Wed Mar 09 16:34:29 2011 +0100 @@ -42,19 +42,11 @@ from rhodecode.model import meta from rhodecode.model.user import UserModel -from rhodecode.model.db import User, RepoToPerm, Repository, Permission, \ - UserToPerm, UsersGroupToPerm, UsersGroupMember +from rhodecode.model.db import Permission log = logging.getLogger(__name__) - -PERM_WEIGHTS = {'repository.none':0, - 'repository.read':1, - 'repository.write':3, - 'repository.admin':3} - - class PasswordGenerator(object): """This is a simple class for generating password from different sets of characters @@ -185,21 +177,66 @@ return False class AuthUser(object): - """A simple object that handles a mercurial username for authentication + """ + A simple object that handles all attributes of user in RhodeCode + + It does lookup based on API key,given user, or user present in session + Then it fills all required information for such user. It also checks if + anonymous access is enabled and if so, it returns default user as logged + in """ - def __init__(self): + def __init__(self, user_id=None, api_key=None): + + self.user_id = user_id + self.api_key = api_key + self.username = 'None' self.name = '' self.lastname = '' self.email = '' - self.user_id = None self.is_authenticated = False - self.is_admin = False + self.admin = False self.permissions = {} + self.propagate_data() + + + def propagate_data(self): + user_model = UserModel() + if self.api_key: + #try go get user by api key + log.debug('Auth User lookup by API KEY %s', self.api_key) + user_model.fill_data(self, api_key=self.api_key) + else: + log.debug('Auth User lookup by USER ID %s', self.user_id) + self.anonymous_user = user_model.get_by_username('default', cache=True) + + if self.user_id is not None and self.user_id != self.anonymous_user.user_id: + user_model.fill_data(self, user_id=self.user_id) + else: + if self.anonymous_user.active is True: + user_model.fill_data(self, user_id=self.anonymous_user.user_id) + #then we set this user is logged in + self.is_authenticated = True + else: + self.is_authenticated = False + + log.debug('Auth User is now %s', self) + user_model.fill_perms(self) + + @property + def is_admin(self): + return self.admin def __repr__(self): - return "<AuthUser('id:%s:%s')>" % (self.user_id, self.username) + return "<AuthUser('id:%s:%s|%s')>" % (self.user_id, self.username, + self.is_authenticated) + + def set_authenticated(self, authenticated=True): + + if self.user_id != self.anonymous_user.user_id: + self.is_authenticated = authenticated + def set_available_permissions(config): """This function will propagate pylons globals with all available defined @@ -221,144 +258,42 @@ config['available_permissions'] = [x.permission_name for x in all_perms] -def fill_perms(user): - """Fills user permission attribute with permissions taken from database - works for permissions given for repositories, and for permissions that - as part of beeing group member - - :param user: user instance to fill his perms - """ - - sa = meta.Session() - user.permissions['repositories'] = {} - user.permissions['global'] = set() - - #=========================================================================== - # fetch default permissions - #=========================================================================== - default_user = UserModel().get_by_username('default', cache=True) - - default_perms = sa.query(RepoToPerm, Repository, Permission)\ - .join((Repository, RepoToPerm.repository_id == Repository.repo_id))\ - .join((Permission, RepoToPerm.permission_id == Permission.permission_id))\ - .filter(RepoToPerm.user == default_user).all() - - if user.is_admin: - #======================================================================= - # #admin have all default rights set to admin - #======================================================================= - user.permissions['global'].add('hg.admin') - - for perm in default_perms: - p = 'repository.admin' - user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p - - else: - #======================================================================= - # set default permissions - #======================================================================= - - #default global - default_global_perms = sa.query(UserToPerm)\ - .filter(UserToPerm.user == sa.query(User)\ - .filter(User.username == 'default').one()) - - for perm in default_global_perms: - user.permissions['global'].add(perm.permission.permission_name) - - #default for repositories - for perm in default_perms: - if perm.Repository.private and not perm.Repository.user_id == user.user_id: - #disable defaults for private repos, - p = 'repository.none' - elif perm.Repository.user_id == user.user_id: - #set admin if owner - p = 'repository.admin' - else: - p = perm.Permission.permission_name - - user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p - - #======================================================================= - # overwrite default with user permissions if any - #======================================================================= - user_perms = sa.query(RepoToPerm, Permission, Repository)\ - .join((Repository, RepoToPerm.repository_id == Repository.repo_id))\ - .join((Permission, RepoToPerm.permission_id == Permission.permission_id))\ - .filter(RepoToPerm.user_id == user.user_id).all() - - for perm in user_perms: - if perm.Repository.user_id == user.user_id:#set admin if owner - p = 'repository.admin' - else: - p = perm.Permission.permission_name - user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p - - - #======================================================================= - # check if user is part of groups for this repository and fill in - # (or replace with higher) permissions - #======================================================================= - user_perms_from_users_groups = sa.query(UsersGroupToPerm, Permission, Repository,)\ - .join((Repository, UsersGroupToPerm.repository_id == Repository.repo_id))\ - .join((Permission, UsersGroupToPerm.permission_id == Permission.permission_id))\ - .join((UsersGroupMember, UsersGroupToPerm.users_group_id == UsersGroupMember.users_group_id))\ - .filter(UsersGroupMember.user_id == user.user_id).all() - - for perm in user_perms_from_users_groups: - p = perm.Permission.permission_name - cur_perm = user.permissions['repositories'][perm.UsersGroupToPerm.repository.repo_name] - #overwrite permission only if it's greater than permission given from other sources - if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]: - user.permissions['repositories'][perm.UsersGroupToPerm.repository.repo_name] = p - - meta.Session.remove() - return user - -def get_user(session): - """Gets user from session, and wraps permissions into user - - :param session: - """ - user = session.get('rhodecode_user', AuthUser()) - #if the user is not logged in we check for anonymous access - #if user is logged and it's a default user check if we still have anonymous - #access enabled - if user.user_id is None or user.username == 'default': - anonymous_user = UserModel().get_by_username('default', cache=True) - if anonymous_user.active is True: - #then we set this user is logged in - user.is_authenticated = True - user.user_id = anonymous_user.user_id - else: - user.is_authenticated = False - - if user.is_authenticated: - user = UserModel().fill_data(user) - - user = fill_perms(user) - session['rhodecode_user'] = user - session.save() - return user - #=============================================================================== # CHECK DECORATORS #=============================================================================== class LoginRequired(object): - """Must be logged in to execute this function else - redirect to login page""" + """ + Must be logged in to execute this function else + redirect to login page + + :param api_access: if enabled this checks only for valid auth token + and grants access based on valid token + """ + + def __init__(self, api_access=False): + self.api_access = api_access def __call__(self, func): return decorator(self.__wrapper, func) def __wrapper(self, func, *fargs, **fkwargs): - user = session.get('rhodecode_user', AuthUser()) - log.debug('Checking login required for user:%s', user.username) - if user.is_authenticated: + cls = fargs[0] + user = cls.rhodecode_user + + api_access_ok = False + if self.api_access: + log.debug('Checking API KEY access for %s', cls) + if user.api_key == request.GET.get('api_key'): + api_access_ok = True + else: + log.debug("API KEY token not valid") + + log.debug('Checking if %s is authenticated @ %s', user.username, cls) + if user.is_authenticated or api_access_ok: log.debug('user %s is authenticated', user.username) return func(*fargs, **fkwargs) else: - log.warn('user %s not authenticated', user.username) + log.warn('user %s NOT authenticated', user.username) p = '' if request.environ.get('SCRIPT_NAME') != '/': @@ -379,10 +314,12 @@ return decorator(self.__wrapper, func) def __wrapper(self, func, *fargs, **fkwargs): - user = session.get('rhodecode_user', AuthUser()) - log.debug('Checking if user is not anonymous') + cls = fargs[0] + self.user = cls.rhodecode_user - anonymous = user.username == 'default' + log.debug('Checking if user is not anonymous @%s', cls) + + anonymous = self.user.username == 'default' if anonymous: p = '' @@ -401,7 +338,7 @@ return func(*fargs, **fkwargs) class PermsDecorator(object): - """Base class for decorators""" + """Base class for controller decorators""" def __init__(self, *required_perms): available_perms = config['available_permissions'] @@ -416,22 +353,19 @@ def __wrapper(self, func, *fargs, **fkwargs): -# _wrapper.__name__ = func.__name__ -# _wrapper.__dict__.update(func.__dict__) -# _wrapper.__doc__ = func.__doc__ - self.user = session.get('rhodecode_user', AuthUser()) + cls = fargs[0] + self.user = cls.rhodecode_user self.user_perms = self.user.permissions log.debug('checking %s permissions %s for %s %s', - self.__class__.__name__, self.required_perms, func.__name__, + self.__class__.__name__, self.required_perms, cls, self.user) if self.check_permissions(): - log.debug('Permission granted for %s %s', func.__name__, self.user) - + log.debug('Permission granted for %s %s', cls, self.user) return func(*fargs, **fkwargs) else: - log.warning('Permission denied for %s %s', func.__name__, self.user) + log.warning('Permission denied for %s %s', cls, self.user) #redirect with forbidden ret code return abort(403) @@ -516,18 +450,18 @@ if not user: return False self.user_perms = user.permissions - self.granted_for = user.username + self.granted_for = user log.debug('checking %s %s %s', self.__class__.__name__, self.required_perms, user) if self.check_permissions(): - log.debug('Permission granted for %s @ %s %s', self.granted_for, - check_Location, user) + log.debug('Permission granted %s @ %s', self.granted_for, + check_Location or 'unspecified location') return True else: - log.warning('Permission denied for %s @ %s %s', self.granted_for, - check_Location, user) + log.warning('Permission denied for %s @ %s', self.granted_for, + check_Location or 'unspecified location') return False def check_permissions(self): @@ -595,14 +529,9 @@ self.required_perms = set(perms) def __call__(self, user, repo_name): - usr = AuthUser() - usr.user_id = user.user_id - usr.username = user.username - usr.is_admin = user.admin - + usr = AuthUser(user.user_id) try: - self.user_perms = set([fill_perms(usr)\ - .permissions['repositories'][repo_name]]) + self.user_perms = set([usr.permissions['repositories'][repo_name]]) except: self.user_perms = set() self.granted_for = ''