Mercurial > kallithea
changeset 5471:a041321d2aa1
security: apply CSRF check to all non-GET requests
The automatic CSRF protection was broken for POST requests with no
request payload parameters (but possibly containing request URI
parameters); a security hole was narrowly avoided because the code
base quite consistently checks the request method in the same way,
and because of browser protection against PUT/DELETE CSRF attacks.
Since explicit is better than implicit, the better way of checking
the HTTP request method is to simply check request.method, instead
of checking if request.POST is non-empty, which is subtly different
(it doesn't catch POST requests if all parameters are in the query
string) and non-obvious (because it also applies to PUT requests).
The commit also fixes some tests which relied on the CSRF protection
being broken. It does not fix all the controllers that still does
the misleading request.POST check, but since the CSRF check has now
been tightened, those are no longer a potential security issue.
author | Søren Løvborg <sorenl@unity3d.com> |
---|---|
date | Fri, 04 Sep 2015 00:01:20 +0200 |
parents | 4b9370a01c4d |
children | 3598e2a4e051 |
files | kallithea/lib/auth.py kallithea/tests/functional/test_admin_defaults.py kallithea/tests/functional/test_admin_gists.py kallithea/tests/functional/test_admin_notifications.py kallithea/tests/functional/test_admin_repos.py kallithea/tests/functional/test_admin_user_groups.py kallithea/tests/functional/test_admin_users.py kallithea/tests/functional/test_changeset_comments.py kallithea/tests/functional/test_forks.py kallithea/tests/functional/test_my_account.py |
diffstat | 10 files changed, 60 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/lib/auth.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/lib/auth.py Fri Sep 04 00:01:20 2015 +0200 @@ -760,8 +760,13 @@ log.warning('API access to %s is not allowed', loc) return abort(403) - # CSRF protection - POSTs with session auth must contain correct token - if request.POST and user.is_authenticated: + # CSRF protection: Whenever a request has ambient authority (whether + # through a session cookie or its origin IP address), it must include + # the correct token, unless the HTTP method is GET or HEAD (and thus + # guaranteed to be side effect free. + # Note that the 'is_authenticated' flag is True for anonymous users too, + # but not when the user is authenticated by API key. + if user.is_authenticated and request.method not in ['GET', 'HEAD']: token = request.POST.get(secure_form.token_key) if not token or token != secure_form.authentication_token(): log.error('CSRF check failed')
--- a/kallithea/tests/functional/test_admin_defaults.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_defaults.py Fri Sep 04 00:01:20 2015 +0200 @@ -16,7 +16,8 @@ response = self.app.get(url('formatted_defaults', format='xml')) def test_create(self): - response = self.app.post(url('defaults')) + response = self.app.post(url('defaults'), + {'_authentication_token': self.authentication_token()}) def test_new(self): response = self.app.get(url('new_default')) @@ -62,7 +63,8 @@ response = self.app.post(url('default', id=1), params=dict(_method='put', _authentication_token=self.authentication_token())) def test_delete(self): - response = self.app.delete(url('default', id=1)) + # Not possible due to CSRF protection. + response = self.app.delete(url('default', id=1), status=403) def test_delete_browser_fakeout(self): response = self.app.post(url('default', id=1), params=dict(_method='delete', _authentication_token=self.authentication_token()))
--- a/kallithea/tests/functional/test_admin_gists.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_gists.py Fri Sep 04 00:01:20 2015 +0200 @@ -136,19 +136,20 @@ def test_delete(self): self.log_user() gist = _create_gist('delete-me') - response = self.app.delete(url('gist', gist_id=gist.gist_id)) - self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id) + response = self.app.post(url('gist', gist_id=gist.gist_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) def test_delete_normal_user_his_gist(self): self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) gist = _create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN) - response = self.app.delete(url('gist', gist_id=gist.gist_id)) - self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id) + response = self.app.post(url('gist', gist_id=gist.gist_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) def test_delete_normal_user_not_his_own_gist(self): self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) gist = _create_gist('delete-me') - response = self.app.delete(url('gist', gist_id=gist.gist_id), status=403) + response = self.app.post(url('gist', gist_id=gist.gist_id), status=403, + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) def test_show(self): gist = _create_gist('gist-show-me')
--- a/kallithea/tests/functional/test_admin_notifications.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_notifications.py Fri Sep 04 00:01:20 2015 +0200 @@ -56,9 +56,9 @@ self.assertEqual(get_notif(u2.notifications), [notification]) cur_usr_id = cur_user.user_id - response = self.app.delete(url('notification', - notification_id= - notification.notification_id)) + response = self.app.post( + url('notification', notification_id=notification.notification_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.assertEqual(response.body, 'ok') cur_user = User.get(cur_usr_id)
--- a/kallithea/tests/functional/test_admin_repos.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_repos.py Fri Sep 04 00:01:20 2015 +0200 @@ -398,7 +398,8 @@ except vcs.exceptions.VCSError: self.fail('no repo %s in filesystem' % repo_name) - response = self.app.delete(url('delete_repo', repo_name=repo_name)) + response = self.app.post(url('delete_repo', repo_name=repo_name), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) @@ -450,7 +451,8 @@ except vcs.exceptions.VCSError: self.fail('no repo %s in filesystem' % repo_name) - response = self.app.delete(url('delete_repo', repo_name=repo_name)) + response = self.app.post(url('delete_repo', repo_name=repo_name), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode)) response.follow()
--- a/kallithea/tests/functional/test_admin_user_groups.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_user_groups.py Fri Sep 04 00:01:20 2015 +0200 @@ -32,7 +32,7 @@ response = self.app.get(url('new_users_group')) def test_update(self): - response = self.app.put(url('users_group', id=1)) + response = self.app.put(url('users_group', id=1), status=403) def test_update_browser_fakeout(self): response = self.app.post(url('users_group', id=1), @@ -54,7 +54,8 @@ gr = Session().query(UserGroup)\ .filter(UserGroup.users_group_name == users_group_name).one() - response = self.app.delete(url('users_group', id=gr.users_group_id)) + response = self.app.post(url('users_group', id=gr.users_group_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) gr = Session().query(UserGroup)\ .filter(UserGroup.users_group_name == users_group_name).scalar() @@ -97,7 +98,8 @@ ## DISABLE REPO CREATE ON A GROUP response = self.app.put( - url('edit_user_group_default_perms', id=ug.users_group_id), {}) + url('edit_user_group_default_perms', id=ug.users_group_id), + params={'_authentication_token': self.authentication_token()}) response.follow() ug = UserGroup.get_by_group_name(users_group_name) @@ -119,7 +121,8 @@ # DELETE ! ug = UserGroup.get_by_group_name(users_group_name) ugid = ug.users_group_id - response = self.app.delete(url('users_group', id=ug.users_group_id)) + response = self.app.post(url('users_group', id=ug.users_group_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) response = response.follow() gr = Session().query(UserGroup)\ .filter(UserGroup.users_group_name == users_group_name).scalar() @@ -167,8 +170,8 @@ [ug.users_group_id, p3.permission_id]])) ## DISABLE REPO CREATE ON A GROUP - response = self.app.put( - url('edit_user_group_default_perms', id=ug.users_group_id), {}) + response = self.app.put(url('edit_user_group_default_perms', id=ug.users_group_id), + params={'_authentication_token': self.authentication_token()}) response.follow() ug = UserGroup.get_by_group_name(users_group_name) @@ -189,7 +192,8 @@ # DELETE ! ug = UserGroup.get_by_group_name(users_group_name) ugid = ug.users_group_id - response = self.app.delete(url('users_group', id=ug.users_group_id)) + response = self.app.post(url('users_group', id=ug.users_group_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) response = response.follow() gr = Session().query(UserGroup)\ .filter(UserGroup.users_group_name ==
--- a/kallithea/tests/functional/test_admin_users.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_admin_users.py Fri Sep 04 00:01:20 2015 +0200 @@ -167,7 +167,8 @@ new_user = Session().query(User)\ .filter(User.username == username).one() - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') @@ -181,16 +182,19 @@ new_user = Session().query(User)\ .filter(User.username == username).one() - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 repositories and cannot be removed. ' 'Switch owners or remove those repositories: ' '%s' % (username, reponame)) - response = self.app.delete(url('delete_repo', repo_name=reponame)) + response = self.app.post(url('delete_repo', repo_name=reponame), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Deleted repository %s' % reponame) - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_delete_repo_group_err(self): @@ -203,7 +207,8 @@ new_user = Session().query(User)\ .filter(User.username == username).one() - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 repository groups and cannot be removed. ' 'Switch owners or remove those repository groups: ' @@ -213,10 +218,12 @@ # rg = RepoGroup.get_by_group_name(group_name=groupname) # response = self.app.get(url('repos_groups', id=rg.group_id)) - response = self.app.delete(url('delete_repo_group', group_name=groupname)) + response = self.app.post(url('delete_repo_group', group_name=groupname), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Removed repository group %s' % groupname) - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_delete_user_group_err(self): @@ -229,7 +236,8 @@ new_user = Session().query(User)\ .filter(User.username == username).one() - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'User "%s" still ' 'owns 1 user groups and cannot be removed. ' 'Switch owners or remove those user groups: ' @@ -241,7 +249,8 @@ fixture.destroy_user_group(ug.users_group_id) - response = self.app.delete(url('user', id=new_user.user_id)) + response = self.app.post(url('user', id=new_user.user_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Successfully deleted user') def test_show(self):
--- a/kallithea/tests/functional/test_changeset_comments.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_changeset_comments.py Fri Sep 04 00:01:20 2015 +0200 @@ -132,10 +132,11 @@ self.assertEqual(len(comments), 1) comment_id = comments[0].comment_id - self.app.delete(url(controller='changeset', + self.app.post(url(controller='changeset', action='delete_comment', repo_name=HG_REPO, - comment_id=comment_id)) + comment_id=comment_id), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) comments = ChangesetComment.query().all() self.assertEqual(len(comments), 0)
--- a/kallithea/tests/functional/test_forks.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_forks.py Fri Sep 04 00:01:20 2015 +0200 @@ -96,7 +96,8 @@ ) # remove this fork - response = self.app.delete(url('delete_repo', repo_name=fork_name)) + response = self.app.post(url('delete_repo', repo_name=fork_name), + params={'_method': 'delete', '_authentication_token': self.authentication_token()}) def test_fork_create_into_group(self): self.log_user()
--- a/kallithea/tests/functional/test_my_account.py Mon Aug 31 17:42:56 2015 +0200 +++ b/kallithea/tests/functional/test_my_account.py Fri Sep 04 00:01:20 2015 +0200 @@ -57,7 +57,8 @@ self.log_user() response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') - response = self.app.post(url('my_account_emails'),) + response = self.app.post(url('my_account_emails'), + {'_authentication_token': self.authentication_token()}) self.checkSessionFlash(response, 'Please enter an email address') def test_my_account_my_emails_add_remove(self):