changeset 5471:a041321d2aa1

security: apply CSRF check to all non-GET requests The automatic CSRF protection was broken for POST requests with no request payload parameters (but possibly containing request URI parameters); a security hole was narrowly avoided because the code base quite consistently checks the request method in the same way, and because of browser protection against PUT/DELETE CSRF attacks. Since explicit is better than implicit, the better way of checking the HTTP request method is to simply check request.method, instead of checking if request.POST is non-empty, which is subtly different (it doesn't catch POST requests if all parameters are in the query string) and non-obvious (because it also applies to PUT requests). The commit also fixes some tests which relied on the CSRF protection being broken. It does not fix all the controllers that still does the misleading request.POST check, but since the CSRF check has now been tightened, those are no longer a potential security issue.
author Søren Løvborg <sorenl@unity3d.com>
date Fri, 04 Sep 2015 00:01:20 +0200
parents 4b9370a01c4d
children 3598e2a4e051
files kallithea/lib/auth.py kallithea/tests/functional/test_admin_defaults.py kallithea/tests/functional/test_admin_gists.py kallithea/tests/functional/test_admin_notifications.py kallithea/tests/functional/test_admin_repos.py kallithea/tests/functional/test_admin_user_groups.py kallithea/tests/functional/test_admin_users.py kallithea/tests/functional/test_changeset_comments.py kallithea/tests/functional/test_forks.py kallithea/tests/functional/test_my_account.py
diffstat 10 files changed, 60 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/lib/auth.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/lib/auth.py	Fri Sep 04 00:01:20 2015 +0200
@@ -760,8 +760,13 @@
                 log.warning('API access to %s is not allowed', loc)
                 return abort(403)
 
-        # CSRF protection - POSTs with session auth must contain correct token
-        if request.POST and user.is_authenticated:
+        # CSRF protection: Whenever a request has ambient authority (whether
+        # through a session cookie or its origin IP address), it must include
+        # the correct token, unless the HTTP method is GET or HEAD (and thus
+        # guaranteed to be side effect free.
+        # Note that the 'is_authenticated' flag is True for anonymous users too,
+        # but not when the user is authenticated by API key.
+        if user.is_authenticated and request.method not in ['GET', 'HEAD']:
             token = request.POST.get(secure_form.token_key)
             if not token or token != secure_form.authentication_token():
                 log.error('CSRF check failed')
--- a/kallithea/tests/functional/test_admin_defaults.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_defaults.py	Fri Sep 04 00:01:20 2015 +0200
@@ -16,7 +16,8 @@
         response = self.app.get(url('formatted_defaults', format='xml'))
 
     def test_create(self):
-        response = self.app.post(url('defaults'))
+        response = self.app.post(url('defaults'),
+            {'_authentication_token': self.authentication_token()})
 
     def test_new(self):
         response = self.app.get(url('new_default'))
@@ -62,7 +63,8 @@
         response = self.app.post(url('default', id=1), params=dict(_method='put', _authentication_token=self.authentication_token()))
 
     def test_delete(self):
-        response = self.app.delete(url('default', id=1))
+        # Not possible due to CSRF protection.
+        response = self.app.delete(url('default', id=1), status=403)
 
     def test_delete_browser_fakeout(self):
         response = self.app.post(url('default', id=1), params=dict(_method='delete', _authentication_token=self.authentication_token()))
--- a/kallithea/tests/functional/test_admin_gists.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_gists.py	Fri Sep 04 00:01:20 2015 +0200
@@ -136,19 +136,20 @@
     def test_delete(self):
         self.log_user()
         gist = _create_gist('delete-me')
-        response = self.app.delete(url('gist', gist_id=gist.gist_id))
-        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
+        response = self.app.post(url('gist', gist_id=gist.gist_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
     def test_delete_normal_user_his_gist(self):
         self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
         gist = _create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN)
-        response = self.app.delete(url('gist', gist_id=gist.gist_id))
-        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
+        response = self.app.post(url('gist', gist_id=gist.gist_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
     def test_delete_normal_user_not_his_own_gist(self):
         self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
         gist = _create_gist('delete-me')
-        response = self.app.delete(url('gist', gist_id=gist.gist_id), status=403)
+        response = self.app.post(url('gist', gist_id=gist.gist_id), status=403,
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
     def test_show(self):
         gist = _create_gist('gist-show-me')
--- a/kallithea/tests/functional/test_admin_notifications.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_notifications.py	Fri Sep 04 00:01:20 2015 +0200
@@ -56,9 +56,9 @@
         self.assertEqual(get_notif(u2.notifications), [notification])
         cur_usr_id = cur_user.user_id
 
-        response = self.app.delete(url('notification',
-                                       notification_id=
-                                       notification.notification_id))
+        response = self.app.post(
+            url('notification', notification_id=notification.notification_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.assertEqual(response.body, 'ok')
 
         cur_user = User.get(cur_usr_id)
--- a/kallithea/tests/functional/test_admin_repos.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_repos.py	Fri Sep 04 00:01:20 2015 +0200
@@ -398,7 +398,8 @@
         except vcs.exceptions.VCSError:
             self.fail('no repo %s in filesystem' % repo_name)
 
-        response = self.app.delete(url('delete_repo', repo_name=repo_name))
+        response = self.app.post(url('delete_repo', repo_name=repo_name),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
         self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 
@@ -450,7 +451,8 @@
         except vcs.exceptions.VCSError:
             self.fail('no repo %s in filesystem' % repo_name)
 
-        response = self.app.delete(url('delete_repo', repo_name=repo_name))
+        response = self.app.post(url('delete_repo', repo_name=repo_name),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
         response.follow()
 
--- a/kallithea/tests/functional/test_admin_user_groups.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_user_groups.py	Fri Sep 04 00:01:20 2015 +0200
@@ -32,7 +32,7 @@
         response = self.app.get(url('new_users_group'))
 
     def test_update(self):
-        response = self.app.put(url('users_group', id=1))
+        response = self.app.put(url('users_group', id=1), status=403)
 
     def test_update_browser_fakeout(self):
         response = self.app.post(url('users_group', id=1),
@@ -54,7 +54,8 @@
         gr = Session().query(UserGroup)\
             .filter(UserGroup.users_group_name == users_group_name).one()
 
-        response = self.app.delete(url('users_group', id=gr.users_group_id))
+        response = self.app.post(url('users_group', id=gr.users_group_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
         gr = Session().query(UserGroup)\
             .filter(UserGroup.users_group_name == users_group_name).scalar()
@@ -97,7 +98,8 @@
 
         ## DISABLE REPO CREATE ON A GROUP
         response = self.app.put(
-            url('edit_user_group_default_perms', id=ug.users_group_id), {})
+            url('edit_user_group_default_perms', id=ug.users_group_id),
+            params={'_authentication_token': self.authentication_token()})
 
         response.follow()
         ug = UserGroup.get_by_group_name(users_group_name)
@@ -119,7 +121,8 @@
         # DELETE !
         ug = UserGroup.get_by_group_name(users_group_name)
         ugid = ug.users_group_id
-        response = self.app.delete(url('users_group', id=ug.users_group_id))
+        response = self.app.post(url('users_group', id=ug.users_group_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         response = response.follow()
         gr = Session().query(UserGroup)\
             .filter(UserGroup.users_group_name == users_group_name).scalar()
@@ -167,8 +170,8 @@
                     [ug.users_group_id, p3.permission_id]]))
 
         ## DISABLE REPO CREATE ON A GROUP
-        response = self.app.put(
-            url('edit_user_group_default_perms', id=ug.users_group_id), {})
+        response = self.app.put(url('edit_user_group_default_perms', id=ug.users_group_id),
+            params={'_authentication_token': self.authentication_token()})
 
         response.follow()
         ug = UserGroup.get_by_group_name(users_group_name)
@@ -189,7 +192,8 @@
         # DELETE !
         ug = UserGroup.get_by_group_name(users_group_name)
         ugid = ug.users_group_id
-        response = self.app.delete(url('users_group', id=ug.users_group_id))
+        response = self.app.post(url('users_group', id=ug.users_group_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         response = response.follow()
         gr = Session().query(UserGroup)\
                            .filter(UserGroup.users_group_name ==
--- a/kallithea/tests/functional/test_admin_users.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_admin_users.py	Fri Sep 04 00:01:20 2015 +0200
@@ -167,7 +167,8 @@
 
         new_user = Session().query(User)\
             .filter(User.username == username).one()
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
         self.checkSessionFlash(response, 'Successfully deleted user')
 
@@ -181,16 +182,19 @@
 
         new_user = Session().query(User)\
             .filter(User.username == username).one()
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'User "%s" still '
                                'owns 1 repositories and cannot be removed. '
                                'Switch owners or remove those repositories: '
                                '%s' % (username, reponame))
 
-        response = self.app.delete(url('delete_repo', repo_name=reponame))
+        response = self.app.post(url('delete_repo', repo_name=reponame),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Successfully deleted user')
 
     def test_delete_repo_group_err(self):
@@ -203,7 +207,8 @@
 
         new_user = Session().query(User)\
             .filter(User.username == username).one()
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'User "%s" still '
                                'owns 1 repository groups and cannot be removed. '
                                'Switch owners or remove those repository groups: '
@@ -213,10 +218,12 @@
         # rg = RepoGroup.get_by_group_name(group_name=groupname)
         # response = self.app.get(url('repos_groups', id=rg.group_id))
 
-        response = self.app.delete(url('delete_repo_group', group_name=groupname))
+        response = self.app.post(url('delete_repo_group', group_name=groupname),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
 
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Successfully deleted user')
 
     def test_delete_user_group_err(self):
@@ -229,7 +236,8 @@
 
         new_user = Session().query(User)\
             .filter(User.username == username).one()
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'User "%s" still '
                                'owns 1 user groups and cannot be removed. '
                                'Switch owners or remove those user groups: '
@@ -241,7 +249,8 @@
 
         fixture.destroy_user_group(ug.users_group_id)
 
-        response = self.app.delete(url('user', id=new_user.user_id))
+        response = self.app.post(url('user', id=new_user.user_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Successfully deleted user')
 
     def test_show(self):
--- a/kallithea/tests/functional/test_changeset_comments.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_changeset_comments.py	Fri Sep 04 00:01:20 2015 +0200
@@ -132,10 +132,11 @@
         self.assertEqual(len(comments), 1)
         comment_id = comments[0].comment_id
 
-        self.app.delete(url(controller='changeset',
+        self.app.post(url(controller='changeset',
                                     action='delete_comment',
                                     repo_name=HG_REPO,
-                                    comment_id=comment_id))
+                                    comment_id=comment_id),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
         comments = ChangesetComment.query().all()
         self.assertEqual(len(comments), 0)
--- a/kallithea/tests/functional/test_forks.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_forks.py	Fri Sep 04 00:01:20 2015 +0200
@@ -96,7 +96,8 @@
         )
 
         # remove this fork
-        response = self.app.delete(url('delete_repo', repo_name=fork_name))
+        response = self.app.post(url('delete_repo', repo_name=fork_name),
+            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
     def test_fork_create_into_group(self):
         self.log_user()
--- a/kallithea/tests/functional/test_my_account.py	Mon Aug 31 17:42:56 2015 +0200
+++ b/kallithea/tests/functional/test_my_account.py	Fri Sep 04 00:01:20 2015 +0200
@@ -57,7 +57,8 @@
         self.log_user()
         response = self.app.get(url('my_account_emails'))
         response.mustcontain('No additional emails specified')
-        response = self.app.post(url('my_account_emails'),)
+        response = self.app.post(url('my_account_emails'),
+            {'_authentication_token': self.authentication_token()})
         self.checkSessionFlash(response, 'Please enter an email address')
 
     def test_my_account_my_emails_add_remove(self):