Mercurial > kallithea
changeset 7697:226893a56a81
auth: move IP check to AuthUser.make - it is more about accepting authentication than about permissions after authentication
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Thu, 03 Jan 2019 01:22:56 +0100 |
parents | 077ba994ee03 |
children | 7977ca209b1d |
files | kallithea/controllers/api/__init__.py kallithea/controllers/login.py kallithea/lib/auth.py kallithea/lib/base.py |
diffstat | 4 files changed, 29 insertions(+), 50 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/controllers/api/__init__.py Thu Jan 03 01:22:45 2019 +0100 +++ b/kallithea/controllers/api/__init__.py Thu Jan 03 01:22:56 2019 +0100 @@ -103,7 +103,7 @@ environ = state.request.environ start = time.time() - ip_addr = request.ip_addr = self._get_ip_addr(environ) + ip_addr = self._get_ip_addr(environ) self._req_id = None if 'CONTENT_LENGTH' not in environ: log.debug("No Content-Length") @@ -146,21 +146,16 @@ # check if we can find this session using api_key try: u = User.get_by_api_key(self._req_api_key) - auth_user = AuthUser.make(dbuser=u) + auth_user = AuthUser.make(dbuser=u, ip_addr=ip_addr) if auth_user is None: raise JSONRPCErrorResponse(retid=self._req_id, message='Invalid API key') - if not AuthUser.check_ip_allowed(auth_user, ip_addr): - raise JSONRPCErrorResponse(retid=self._req_id, - message='request from IP:%s not allowed' % (ip_addr,)) - else: - log.info('Access for IP:%s allowed', ip_addr) - except Exception as e: raise JSONRPCErrorResponse(retid=self._req_id, message='Invalid API key') request.authuser = auth_user + request.ip_addr = ip_addr self._error = None try:
--- a/kallithea/controllers/login.py Thu Jan 03 01:22:45 2019 +0100 +++ b/kallithea/controllers/login.py Thu Jan 03 01:22:56 2019 +0100 @@ -102,7 +102,7 @@ # Exception itself h.flash(e, 'error') else: - auth_user = log_in_user(user, c.form_result['remember'], is_external_auth=False) + auth_user = log_in_user(user, c.form_result['remember'], is_external_auth=False, ip_addr=request.ip_addr) # TODO: handle auth_user is None as failed authentication? raise HTTPFound(location=c.came_from) else:
--- a/kallithea/lib/auth.py Thu Jan 03 01:22:45 2019 +0100 +++ b/kallithea/lib/auth.py Thu Jan 03 01:22:56 2019 +0100 @@ -399,16 +399,21 @@ """ @classmethod - def make(cls, dbuser=None, authenticating_api_key=None, is_external_auth=False): + def make(cls, dbuser=None, authenticating_api_key=None, is_external_auth=False, ip_addr=None): """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated. - Checks that a non-None dbuser is provided and is active. + Checks that a non-None dbuser is provided, is active, and that the IP address is ok. """ + assert ip_addr is not None if dbuser is None: log.info('No db user for authentication') return None if not dbuser.active: log.info('Db user %s not active', dbuser.username) return None + allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id, cache=True) + if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips): + log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips) + return None return cls(dbuser=dbuser, authenticating_api_key=authenticating_api_key, is_external_auth=is_external_auth) @@ -561,21 +566,6 @@ return [x[0] for x in self.permissions['user_groups'].iteritems() if x[1] == 'usergroup.admin'] - @staticmethod - def check_ip_allowed(user, ip_addr): - """ - Check if the given IP address (a `str`) is allowed for the given - user (an `AuthUser` or `db.User`). - """ - allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True) - if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips): - log.debug('IP:%s is in range of %s', ip_addr, allowed_ips) - return True - else: - log.info('Access for IP:%s forbidden, ' - 'not in %s' % (ip_addr, allowed_ips)) - return False - def __repr__(self): return "<AuthUser('id:%s[%s]')>" % (self.user_id, self.username) @@ -587,13 +577,14 @@ } @staticmethod - def from_cookie(cookie): + def from_cookie(cookie, ip_addr): """ Deserializes an `AuthUser` from a cookie `dict` ... or return None. """ return AuthUser.make( dbuser=UserModel().get(cookie.get('user_id')), is_external_auth=cookie.get('is_external_auth', False), + ip_addr=ip_addr, ) @classmethod
--- a/kallithea/lib/base.py Thu Jan 03 01:22:45 2019 +0100 +++ b/kallithea/lib/base.py Thu Jan 03 01:22:56 2019 +0100 @@ -109,7 +109,7 @@ return path -def log_in_user(user, remember, is_external_auth): +def log_in_user(user, remember, is_external_auth, ip_addr): """ Log a `User` in and update session and cookies. If `remember` is True, the session cookie is set to expire in a year; otherwise, it expires at @@ -120,7 +120,7 @@ # It should not be possible to explicitly log in as the default user. assert not user.is_default_user, user - auth_user = AuthUser.make(dbuser=user, is_external_auth=is_external_auth) + auth_user = AuthUser.make(dbuser=user, is_external_auth=is_external_auth, ip_addr=ip_addr) if auth_user is None: return None @@ -214,11 +214,11 @@ """ # Use anonymous access if allowed for action on repo. default_user = User.get_default_user(cache=True) - default_authuser = AuthUser.make(dbuser=default_user) + default_authuser = AuthUser.make(dbuser=default_user, ip_addr=ip_addr) if default_authuser is None: log.debug('No anonymous access at all') # move on to proper user auth else: - if self._check_permission(action, default_authuser, repo_name, ip_addr): + if self._check_permission(action, default_authuser, repo_name): return default_authuser, None log.debug('Not authorized to access this repository as anonymous user') @@ -255,10 +255,10 @@ log.error(traceback.format_exc()) return None, webob.exc.HTTPInternalServerError() - authuser = AuthUser.make(dbuser=user) + authuser = AuthUser.make(dbuser=user, ip_addr=ip_addr) if authuser is None: return None, webob.exc.HTTPForbidden() - if not self._check_permission(action, authuser, repo_name, ip_addr): + if not self._check_permission(action, authuser, repo_name): return None, webob.exc.HTTPForbidden() return user, None @@ -283,7 +283,7 @@ return '/'.join(data) - def _check_permission(self, action, authuser, repo_name, ip_addr=None): + def _check_permission(self, action, authuser, repo_name): """ Checks permissions using action (push/pull) user and repository name @@ -292,13 +292,6 @@ :param user: `User` instance :param repo_name: repository name """ - # check IP - ip_allowed = AuthUser.check_ip_allowed(authuser, ip_addr) - if ip_allowed: - log.info('Access for IP:%s allowed', ip_addr) - else: - return False - if action == 'push': if not HasPermissionAnyMiddleware('repository.write', 'repository.admin')(authuser, @@ -386,11 +379,11 @@ self.scm_model = ScmModel() @staticmethod - def _determine_auth_user(api_key, bearer_token, session_authuser): + def _determine_auth_user(api_key, bearer_token, session_authuser, ip_addr): """ Create an `AuthUser` object given the API key/bearer token (if any) and the value of the authuser session cookie. - Returns None if no valid user is found (like not active). + Returns None if no valid user is found (like not active or no access for IP). """ # Authenticate by bearer token @@ -400,7 +393,7 @@ # Authenticate by API key if api_key is not None: dbuser = User.get_by_api_key(api_key) - au = AuthUser.make(dbuser=dbuser, authenticating_api_key=api_key, is_external_auth=True) + au = AuthUser.make(dbuser=dbuser, authenticating_api_key=api_key, is_external_auth=True, ip_addr=ip_addr) if au is None or au.is_anonymous: log.warning('API key ****%s is NOT valid', api_key[-4:]) raise webob.exc.HTTPForbidden(_('Invalid API key')) @@ -412,7 +405,7 @@ # v0.3 and earlier included an 'is_authenticated' key; if present, # this must be True. if isinstance(session_authuser, dict) and session_authuser.get('is_authenticated', True): - return AuthUser.from_cookie(session_authuser) + return AuthUser.from_cookie(session_authuser, ip_addr=ip_addr) # Authenticate by auth_container plugin (if enabled) if any( @@ -428,11 +421,11 @@ if user_info is not None: username = user_info['username'] user = User.get_by_username(username, case_insensitive=True) - return log_in_user(user, remember=False, is_external_auth=True) + return log_in_user(user, remember=False, is_external_auth=True, ip_addr=ip_addr) # User is default user (if active) or anonymous default_user = User.get_default_user(cache=True) - authuser = AuthUser.make(dbuser=default_user) + authuser = AuthUser.make(dbuser=default_user, ip_addr=ip_addr) if authuser is None: # fall back to anonymous authuser = AuthUser(dbuser=default_user) # TODO: somehow use .make? return authuser @@ -469,7 +462,7 @@ def __call__(self, environ, context): try: - request.ip_addr = _get_ip_addr(environ) + ip_addr = _get_ip_addr(environ) # make sure that we update permissions each time we call controller self._basic_security_checks() @@ -490,14 +483,14 @@ request.GET.get('api_key'), bearer_token, session.get('authuser'), + ip_addr=ip_addr, ) if authuser is None: log.info('No valid user found') raise webob.exc.HTTPForbidden() - if not AuthUser.check_ip_allowed(authuser, request.ip_addr): - raise webob.exc.HTTPForbidden() request.authuser = authuser + request.ip_addr = ip_addr log.info('IP: %s User: %s accessed %s', request.ip_addr, request.authuser,