changeset 4395:a6dfd14d4b20

Escape shell command parts for Git backend Python 2.x uses pipes.quote and Python 3.3+ uses shlex.quote in GitRepository._run_git_command()
author Daniel Anderson <daniel@dattrix.com>
date Fri, 18 Jul 2014 14:10:58 -0700
parents dc4a768927eb
children df33fcf96df2
files kallithea/lib/vcs/backends/git/repository.py kallithea/tests/vcs/test_git.py
diffstat 2 files changed, 37 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/lib/vcs/backends/git/repository.py	Mon Jul 07 01:45:19 2014 +0900
+++ b/kallithea/lib/vcs/backends/git/repository.py	Fri Jul 18 14:10:58 2014 -0700
@@ -17,6 +17,12 @@
 import logging
 import posixpath
 import string
+try:
+    # Python <=2.7
+    from pipes import quote
+except ImportError:
+    # Python 3.3+
+    from shlex import quote
 
 from dulwich.objects import Tag
 from dulwich.repo import Repo, NotGitRepository
@@ -654,7 +660,7 @@
             cmd.append('--bare')
         elif not update_after_clone:
             cmd.append('--no-checkout')
-        cmd += ['--', '"%s"' % url, '"%s"' % self.path]
+        cmd += ['--', quote(url), self.path]
         cmd = ' '.join(cmd)
         # If error occurs run_git_command raises RepositoryError already
         self.run_git_command(cmd)
@@ -664,7 +670,7 @@
         Tries to pull changes from external location.
         """
         url = self._get_url(url)
-        cmd = ['pull', "--ff-only", url]
+        cmd = ['pull', "--ff-only", quote(url)]
         cmd = ' '.join(cmd)
         # If error occurs run_git_command raises RepositoryError already
         self.run_git_command(cmd)
@@ -674,13 +680,13 @@
         Tries to pull changes from external location.
         """
         url = self._get_url(url)
-        so, se = self.run_git_command('ls-remote -h %s' % url)
+        so, se = self.run_git_command('ls-remote -h %s' % quote(url))
         refs = []
         for line in (x for x in so.splitlines()):
             sha, ref = line.split('\t')
             refs.append(ref)
         refs = ' '.join(('+%s:%s' % (r, r) for r in refs))
-        cmd = '''fetch %s -- %s''' % (url, refs)
+        cmd = '''fetch %s -- %s''' % (quote(url), refs)
         self.run_git_command(cmd)
 
     def _update_server_info(self):
--- a/kallithea/tests/vcs/test_git.py	Mon Jul 07 01:45:19 2014 +0900
+++ b/kallithea/tests/vcs/test_git.py	Fri Jul 18 14:10:58 2014 -0700
@@ -3,6 +3,7 @@
 import os
 import mock
 import datetime
+import urllib2
 from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
 from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
@@ -26,6 +27,32 @@
         wrong_repo_path = '/tmp/errorrepo'
         self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 
+    def test_git_cmd_injection(self):
+        remote_repo_url = 'https://github.com/codeinn/vcs.git'
+        inject_remote = '%s;%s' % (remote_repo_url, '; echo "Cake";')
+        with self.assertRaises(urllib2.URLError):
+            # Should fail because URL will be: https://github.com/codeinn/vcs.git%3B%3B%20echo%20%22Cake%22%3B
+            urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=inject_remote, update_after_clone=True, create=True)
+
+        with self.assertRaises(RepositoryError):
+            # Should fail on direct clone call, which as of this writing does not happen outside of class
+            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
+            clone_fail_repo.clone(inject_remote, update_after_clone=True,)
+
+        successfully_cloned = GitRepository(get_new_dir('injection-repo'), src_url=remote_repo_url, update_after_clone=True, create=True)
+        # Repo should have been created
+        self.assertFalse(successfully_cloned._repo.bare)
+
+        with self.assertRaises(RepositoryError):
+            # Should fail because URL will be invalid repo
+            inject_remote_var = '%s;%s' % (remote_repo_url, '; echo $PATH;')
+            successfully_cloned.fetch(inject_remote_var)
+
+        with self.assertRaises(RepositoryError):
+            # Should fail because URL will be invalid repo
+            inject_remote_ls = '%s;%s' % (remote_repo_url, '; ls -1 ~;')
+            successfully_cloned.pull(inject_remote_ls)
+
     def test_repo_clone(self):
         self.__check_for_existing_repo()
         repo = GitRepository(TEST_GIT_REPO)