view kallithea/lib/vcs/backends/git/repository.py @ 8918:da519b9782e8 stable

hg: support Mercurial 6.1 without util.url
author Mads Kiilerich <mads@kiilerich.com>
date Mon, 07 Mar 2022 17:47:13 +0100
parents de59ad8185e1
children
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
    vcs.backends.git.repository
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    Git repository implementation.

    :created_on: Apr 8, 2010
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
"""

import errno
import logging
import os
import re
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import OrderedDict


try:
    from mercurial.utils.urlutil import url as hg_url
except ImportError:  # urlutil was introduced in Mercurial 5.8
    from mercurial.util import url as hg_url

from dulwich.client import SubprocessGitClient
from dulwich.config import ConfigFile
from dulwich.objects import Tag
from dulwich.repo import NotGitRepository, Repo
from dulwich.server import update_server_info

from kallithea.lib.vcs import subprocessio
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
from kallithea.lib.vcs.conf import settings
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
                                          TagDoesNotExistError)
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.vcs.utils.paths import abspath, get_user_home

from . import changeset, inmemory, workdir


SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')

log = logging.getLogger(__name__)


class GitRepository(BaseRepository):
    """
    Git repository backend.
    """
    DEFAULT_BRANCH_NAME = 'master'
    scm = 'git'

    def __init__(self, repo_path, create=False, src_url=None,
                 update_after_clone=False, bare=False, baseui=None):
        baseui  # unused
        self.path = abspath(repo_path)
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
        self.bare = self.repo.bare

    @property
    def _config_files(self):
        return [
            self.bare and abspath(self.path, 'config')
                      or abspath(self.path, '.git', 'config'),
             abspath(get_user_home(), '.gitconfig'),
         ]

    @property
    def _repo(self):
        return self.repo

    @property
    def head(self):
        try:
            return self._repo.head()
        except KeyError:
            return None

    @property
    def _empty(self):
        """
        Checks if repository is empty ie. without any changesets
        """

        try:
            self.revisions[0]
        except (KeyError, IndexError):
            return True
        return False

    @LazyProperty
    def revisions(self):
        """
        Returns list of revisions' ids, in ascending order.  Being lazy
        attribute allows external tools to inject shas from cache.
        """
        return self._get_all_revisions()

    @classmethod
    def _run_git_command(cls, cmd, cwd=None):
        """
        Runs given ``cmd`` as git command and returns output bytes in a tuple
        (stdout, stderr) ... or raise RepositoryError.

        :param cmd: git command to be executed
        :param cwd: passed directly to subprocess
        """
        # need to clean fix GIT_DIR !
        gitenv = dict(os.environ)
        gitenv.pop('GIT_DIR', None)
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'

        assert isinstance(cmd, list), cmd
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
        try:
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
        except (EnvironmentError, OSError) as err:
            # output from the failing process is in str(EnvironmentError)
            msg = ("Couldn't run git command %s.\n"
                   "Subprocess failed with '%s': %s\n" %
                   (cmd, type(err).__name__, err)
            ).strip()
            log.error(msg)
            raise RepositoryError(msg)

        try:
            stdout = b''.join(p.output)
            stderr = b''.join(p.error)
        finally:
            p.close()
        # TODO: introduce option to make commands fail if they have any stderr output?
        if stderr:
            log.debug('stderr from %s:\n%s', cmd, stderr)
        else:
            log.debug('stderr from %s: None', cmd)
        return stdout, stderr

    def run_git_command(self, cmd):
        """
        Runs given ``cmd`` as git command with cwd set to current repo.
        Returns stdout as unicode str ... or raise RepositoryError.
        """
        cwd = None
        if os.path.isdir(self.path):
            cwd = self.path
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
        return safe_str(stdout)

    @staticmethod
    def _check_url(url):
        r"""
        Raise URLError if url doesn't seem like a valid safe Git URL. We
        only allow http, https, git, and ssh URLs.

        For http and https URLs, make a connection and probe to see if it is valid.

        >>> GitRepository._check_url('git://example.com/my%20fine repo')

        >>> GitRepository._check_url('http://example.com:65537/repo')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'>
        >>> GitRepository._check_url('foo')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'>
        >>> GitRepository._check_url('file:///repo')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'file:///repo'>
        >>> GitRepository._check_url('git+http://example.com/repo')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+http://example.com/repo'>
        >>> GitRepository._check_url('git://example.com/%09')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
        >>> GitRepository._check_url('git://example.com/%x00')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Invalid escape character in path: '%'>
        >>> GitRepository._check_url(r'git://example.com/\u0009')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
        >>> GitRepository._check_url(r'git://example.com/\t')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Invalid escape character in path: '\'>
        >>> GitRepository._check_url('git://example.com/\t')
        Traceback (most recent call last):
        ...
        urllib.error.URLError: <urlopen error Invalid ...>

        The failure above will be one of, depending on the level of WhatWG support:
        urllib.error.URLError: <urlopen error Invalid whitespace character in path: '\t'>
        urllib.error.URLError: <urlopen error Invalid url: 'git://example.com/    ' normalizes to 'git://example.com/'>
        """
        try:
            parsed_url = urllib.parse.urlparse(url)
            parsed_url.port  # trigger netloc parsing which might raise ValueError
        except ValueError:
            raise urllib.error.URLError("Error parsing URL: %r" % url)

        # check first if it's not an local url
        if os.path.isabs(url) and os.path.isdir(url):
            return

        unparsed_url = urllib.parse.urlunparse(parsed_url)
        if unparsed_url != url:
            raise urllib.error.URLError("Invalid url: '%s' normalizes to '%s'" % (url, unparsed_url))

        if parsed_url.scheme == 'git':
            # Mitigate problems elsewhere with incorrect handling of encoded paths.
            # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere.
            # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed.
            for c in parsed_url.path.replace('%20', ' '):
                if c in '%\\':
                    raise urllib.error.URLError("Invalid escape character in path: '%s'" % c)
                if c.isspace() and c != ' ':
                    raise urllib.error.URLError("Invalid whitespace character in path: %r" % c)
            return

        if parsed_url.scheme not in ['http', 'https']:
            raise urllib.error.URLError("Unsupported protocol in URL %r" % url)

        url_obj = hg_url(safe_bytes(url))
        test_uri, handlers = get_urllib_request_handlers(url_obj)
        if not test_uri.endswith(b'info/refs'):
            test_uri = test_uri.rstrip(b'/') + b'/info/refs'

        url_obj.passwd = b'*****'
        cleaned_uri = str(url_obj)

        o = urllib.request.build_opener(*handlers)
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git

        req = urllib.request.Request(
            "%s?%s" % (
                safe_str(test_uri),
                urllib.parse.urlencode({"service": 'git-upload-pack'})
            ))

        try:
            resp = o.open(req)
            if resp.code != 200:
                raise Exception('Return Code is not 200')
        except Exception as e:
            # means it cannot be cloned
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))

        # now detect if it's proper git repo
        gitdata = resp.read()
        if b'service=git-upload-pack' not in gitdata:
            raise urllib.error.URLError(
                "url [%s] does not look like an git" % cleaned_uri)

    def _get_repo(self, create, src_url=None, update_after_clone=False,
                  bare=False):
        if create and os.path.exists(self.path):
            raise RepositoryError("Location already exist")
        if src_url and not create:
            raise RepositoryError("Create should be set to True if src_url is "
                                  "given (clone operation creates repository)")
        try:
            if create and src_url:
                GitRepository._check_url(src_url)
                self.clone(src_url, update_after_clone, bare)
                return Repo(self.path)
            elif create:
                os.makedirs(self.path)
                if bare:
                    return Repo.init_bare(self.path)
                else:
                    return Repo.init(self.path)
            else:
                return Repo(self.path)
        except (NotGitRepository, OSError) as err:
            raise RepositoryError(err)

    def _get_all_revisions(self):
        # we must check if this repo is not empty, since later command
        # fails if it is. And it's cheaper to ask than throw the subprocess
        # errors
        try:
            self._repo.head()
        except KeyError:
            return []

        rev_filter = settings.GIT_REV_FILTER
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
        try:
            so = self.run_git_command(cmd)
        except RepositoryError:
            # Can be raised for empty repositories
            return []
        return so.splitlines()

    def _get_all_revisions2(self):
        # alternate implementation using dulwich
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
                    if type_ != b'T']
        return [c.commit.id for c in self._repo.get_walker(include=includes)]

    def _get_revision(self, revision):
        """
        Given any revision identifier, returns a 40 char string with revision hash.
        """
        if self._empty:
            raise EmptyRepositoryError("There are no changesets yet")

        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
            revision = -1

        if isinstance(revision, int):
            try:
                return self.revisions[revision]
            except IndexError:
                msg = "Revision %r does not exist for %s" % (revision, self.name)
                raise ChangesetDoesNotExistError(msg)

        if isinstance(revision, str):
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
                try:
                    return self.revisions[int(revision)]
                except IndexError:
                    msg = "Revision %r does not exist for %s" % (revision, self)
                    raise ChangesetDoesNotExistError(msg)

            # get by branch/tag name
            _ref_revision = self._parsed_refs.get(safe_bytes(revision))
            if _ref_revision:  # and _ref_revision[1] in [b'H', b'RH', b'T']:
                return ascii_str(_ref_revision[0])

            if revision in self.revisions:
                return revision

            # maybe it's a tag ? we don't have them in self.revisions
            if revision in self.tags.values():
                return revision

            if SHA_PATTERN.match(revision):
                msg = "Revision %r does not exist for %s" % (revision, self.name)
                raise ChangesetDoesNotExistError(msg)

        raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)

    def get_ref_revision(self, ref_type, ref_name):
        """
        Returns ``GitChangeset`` object representing repository's
        changeset at the given ``revision``.
        """
        return self._get_revision(ref_name)

    def _get_archives(self, archive_name='tip'):

        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
            yield {"type": i[0], "extension": i[1], "node": archive_name}

    def _get_url(self, url):
        """
        Returns normalized url. If schema is not given, would fall to
        filesystem (``file:///``) schema.
        """
        if url != 'default' and '://' not in url:
            url = ':///'.join(('file', url))
        return url

    @LazyProperty
    def name(self):
        return os.path.basename(self.path)

    @LazyProperty
    def last_change(self):
        """
        Returns last change made on this repository as datetime object
        """
        return date_fromtimestamp(self._get_mtime(), makedate()[1])

    def _get_mtime(self):
        try:
            return time.mktime(self.get_changeset().date.timetuple())
        except RepositoryError:
            idx_loc = '' if self.bare else '.git'
            # fallback to filesystem
            in_path = os.path.join(self.path, idx_loc, "index")
            he_path = os.path.join(self.path, idx_loc, "HEAD")
            if os.path.exists(in_path):
                return os.stat(in_path).st_mtime
            else:
                return os.stat(he_path).st_mtime

    @LazyProperty
    def description(self):
        return safe_str(self._repo.get_description() or b'unknown')

    @property
    def branches(self):
        if not self.revisions:
            return {}
        _branches = [(safe_str(key), ascii_str(sha))
                     for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
        return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False))

    @LazyProperty
    def closed_branches(self):
        return {}

    @LazyProperty
    def tags(self):
        return self._get_tags()

    def _get_tags(self):
        if not self.revisions:
            return {}
        _tags = [(safe_str(key), ascii_str(sha))
                 for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T']
        return OrderedDict(sorted(_tags, key=(lambda ctx: ctx[0]), reverse=True))

    def tag(self, name, user, revision=None, message=None, date=None,
            **kwargs):
        """
        Creates and returns a tag for the given ``revision``.

        :param name: name for new tag
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
        :param revision: changeset id for which new tag would be created
        :param message: message of the tag's commit
        :param date: date of tag's commit

        :raises TagAlreadyExistError: if tag with same name already exists
        """
        if name in self.tags:
            raise TagAlreadyExistError("Tag %s already exists" % name)
        changeset = self.get_changeset(revision)
        message = message or "Added tag %s for commit %s" % (name,
            changeset.raw_id)
        self._repo.refs[b"refs/tags/%s" % safe_bytes(name)] = changeset._commit.id

        self._parsed_refs = self._get_parsed_refs()
        self.tags = self._get_tags()
        return changeset

    def remove_tag(self, name, user, message=None, date=None):
        """
        Removes tag with the given ``name``.

        :param name: name of the tag to be removed
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
        :param message: message of the tag's removal commit
        :param date: date of tag's removal commit

        :raises TagDoesNotExistError: if tag with given name does not exists
        """
        if name not in self.tags:
            raise TagDoesNotExistError("Tag %s does not exist" % name)
        # self._repo.refs is a DiskRefsContainer, and .path gives the full absolute path of '.git'
        tagpath = os.path.join(safe_str(self._repo.refs.path), 'refs', 'tags', name)
        try:
            os.remove(tagpath)
            self._parsed_refs = self._get_parsed_refs()
            self.tags = self._get_tags()
        except OSError as e:
            raise RepositoryError(e.strerror)

    @LazyProperty
    def bookmarks(self):
        """
        Gets bookmarks for this repository
        """
        return {}

    @LazyProperty
    def _parsed_refs(self):
        return self._get_parsed_refs()

    def _get_parsed_refs(self):
        """Return refs as a dict, like:
        { b'v0.2.0': [b'599ba911aa24d2981225f3966eb659dfae9e9f30', b'T'] }
        """
        _repo = self._repo
        refs = _repo.get_refs()
        keys = [(b'refs/heads/', b'H'),
                (b'refs/remotes/origin/', b'RH'),
                (b'refs/tags/', b'T')]
        _refs = {}
        for ref, sha in refs.items():
            for k, type_ in keys:
                if ref.startswith(k):
                    _key = ref[len(k):]
                    if type_ == b'T':
                        obj = _repo.get_object(sha)
                        if isinstance(obj, Tag):
                            sha = _repo.get_object(sha).object[1]
                    _refs[_key] = [sha, type_]
                    break
        return _refs

    def _heads(self, reverse=False):
        refs = self._repo.get_refs()
        heads = {}

        for key, val in refs.items():
            for ref_key in [b'refs/heads/', b'refs/remotes/origin/']:
                if key.startswith(ref_key):
                    n = key[len(ref_key):]
                    if n not in [b'HEAD']:
                        heads[n] = val

        return heads if reverse else dict((y, x) for x, y in heads.items())

    def get_changeset(self, revision=None):
        """
        Returns ``GitChangeset`` object representing commit from git repository
        at the given revision or head (most recent commit) if None given.
        """
        if isinstance(revision, changeset.GitChangeset):
            return revision
        return changeset.GitChangeset(repository=self, revision=self._get_revision(revision))

    def get_changesets(self, start=None, end=None, start_date=None,
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
        """
        Returns iterator of ``GitChangeset`` objects from start to end (both
        are inclusive), in ascending date order (unless ``reverse`` is set).

        :param start: changeset ID, as str; first returned changeset
        :param end: changeset ID, as str; last returned changeset
        :param start_date: if specified, changesets with commit date less than
          ``start_date`` would be filtered out from returned set
        :param end_date: if specified, changesets with commit date greater than
          ``end_date`` would be filtered out from returned set
        :param branch_name: if specified, changesets not reachable from given
          branch would be filtered out from returned set
        :param reverse: if ``True``, returned generator would be reversed
          (meaning that returned changesets would have descending date order)

        :raise BranchDoesNotExistError: If given ``branch_name`` does not
            exist.
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
          ``end`` could not be found.

        """
        if branch_name and branch_name not in self.branches:
            raise BranchDoesNotExistError("Branch '%s' not found"
                                          % branch_name)
        # actually we should check now if it's not an empty repo to not spaw
        # subprocess commands
        if self._empty:
            raise EmptyRepositoryError("There are no changesets yet")

        # %H at format means (full) commit hash, initial hashes are retrieved
        # in ascending date order
        cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H']
        if max_revisions:
            cmd += ['--max-count=%s' % max_revisions]
        if start_date:
            cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')]
        if end_date:
            cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')]
        if branch_name:
            cmd.append(branch_name)
        else:
            cmd.append(settings.GIT_REV_FILTER)

        revs = self.run_git_command(cmd).splitlines()
        start_pos = 0
        end_pos = len(revs)
        if start:
            _start = self._get_revision(start)
            try:
                start_pos = revs.index(_start)
            except ValueError:
                pass

        if end is not None:
            _end = self._get_revision(end)
            try:
                end_pos = revs.index(_end)
            except ValueError:
                pass

        if None not in [start, end] and start_pos > end_pos:
            raise RepositoryError('start cannot be after end')

        if end_pos is not None:
            end_pos += 1

        revs = revs[start_pos:end_pos]
        if reverse:
            revs.reverse()

        return CollectionGenerator(self, revs)

    def get_diff_changesets(self, org_rev, other_repo, other_rev):
        """
        Returns lists of changesets that can be merged from this repo @org_rev
        to other_repo @other_rev
        ... and the other way
        ... and the ancestors that would be used for merge

        :param org_rev: the revision we want our compare to be made
        :param other_repo: repo object, most likely the fork of org_repo. It has
            all changesets that we need to obtain
        :param other_rev: revision we want out compare to be made on other_repo
        """
        org_changesets = []
        ancestors = None
        if org_rev == other_rev:
            other_changesets = []
        elif self != other_repo:
            gitrepo = Repo(self.path)
            SubprocessGitClient(thin_packs=False).fetch(other_repo.path, gitrepo)

            gitrepo_remote = Repo(other_repo.path)
            SubprocessGitClient(thin_packs=False).fetch(self.path, gitrepo_remote)

            revs = [
                ascii_str(x.commit.id)
                for x in gitrepo_remote.get_walker(include=[ascii_bytes(other_rev)],
                                                   exclude=[ascii_bytes(org_rev)])
            ]
            other_changesets = [other_repo.get_changeset(rev) for rev in reversed(revs)]
            if other_changesets:
                ancestors = [other_changesets[0].parents[0].raw_id]
            else:
                # no changesets from other repo, ancestor is the other_rev
                ancestors = [other_rev]

            gitrepo.close()
            gitrepo_remote.close()

        else:
            so = self.run_git_command(
                ['log', '--reverse', '--pretty=format:%H',
                 '-s', '%s..%s' % (org_rev, other_rev)]
            )
            other_changesets = [self.get_changeset(cs)
                          for cs in re.findall(r'[0-9a-fA-F]{40}', so)]
            so = self.run_git_command(
                ['merge-base', org_rev, other_rev]
            )
            ancestors = [re.findall(r'[0-9a-fA-F]{40}', so)[0]]

        return other_changesets, org_changesets, ancestors

    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
                 context=3):
        """
        Returns (git like) *diff*, as plain bytes text. Shows changes
        introduced by ``rev2`` since ``rev1``.

        :param rev1: Entry point from which diff is shown. Can be
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
          the changes since empty state of the repository until ``rev2``
        :param rev2: Until which revision changes should be shown.
        :param ignore_whitespace: If set to ``True``, would not show whitespace
          changes. Defaults to ``False``.
        :param context: How many lines before/after changed lines should be
          shown. Defaults to ``3``. Due to limitations in Git, if
          value passed-in is greater than ``2**31-1``
          (``2147483647``), it will be set to ``2147483647``
          instead. If negative value is passed-in, it will be set to
          ``0`` instead.
        """

        # Git internally uses a signed long int for storing context
        # size (number of lines to show before and after the
        # differences). This can result in integer overflow, so we
        # ensure the requested context is smaller by one than the
        # number that would cause the overflow. It is highly unlikely
        # that a single file will contain that many lines, so this
        # kind of change should not cause any realistic consequences.
        overflowed_long_int = 2**31

        if context >= overflowed_long_int:
            context = overflowed_long_int - 1

        # Negative context values make no sense, and will result in
        # errors. Ensure this does not happen.
        if context < 0:
            context = 0

        flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
        if ignore_whitespace:
            flags.append('-w')

        if hasattr(rev1, 'raw_id'):
            rev1 = getattr(rev1, 'raw_id')

        if hasattr(rev2, 'raw_id'):
            rev2 = getattr(rev2, 'raw_id')

        if rev1 == self.EMPTY_CHANGESET:
            rev2 = self.get_changeset(rev2).raw_id
            cmd = ['show'] + flags + [rev2]
        else:
            rev1 = self.get_changeset(rev1).raw_id
            rev2 = self.get_changeset(rev2).raw_id
            cmd = ['diff'] + flags + [rev1, rev2]

        if path:
            cmd += ['--', path]

        stdout, stderr = self._run_git_command(cmd, cwd=self.path)
        # If we used 'show' command, strip first few lines (until actual diff
        # starts)
        if rev1 == self.EMPTY_CHANGESET:
            parts = stdout.split(b'\ndiff ', 1)
            if len(parts) > 1:
                stdout = b'diff ' + parts[1]
        return stdout

    @LazyProperty
    def in_memory_changeset(self):
        """
        Returns ``GitInMemoryChangeset`` object for this repository.
        """
        return inmemory.GitInMemoryChangeset(self)

    def clone(self, url, update_after_clone=True, bare=False):
        """
        Tries to clone changes from external location.

        :param update_after_clone: If set to ``False``, git won't checkout
          working directory
        :param bare: If set to ``True``, repository would be cloned into
          *bare* git repository (no working directory at all).
        """
        url = self._get_url(url)
        cmd = ['clone', '-q']
        if bare:
            cmd.append('--bare')
        elif not update_after_clone:
            cmd.append('--no-checkout')
        cmd += ['--', url, self.path]
        # If error occurs run_git_command raises RepositoryError already
        self.run_git_command(cmd)

    def pull(self, url):
        """
        Tries to pull changes from external location.
        """
        url = self._get_url(url)
        cmd = ['pull', '--ff-only', url]
        # If error occurs run_git_command raises RepositoryError already
        self.run_git_command(cmd)

    def fetch(self, url):
        """
        Tries to pull changes from external location.
        """
        url = self._get_url(url)
        so = self.run_git_command(['ls-remote', '-h', url])
        cmd = ['fetch', url, '--']
        for line in so.splitlines():
            sha, ref = line.split('\t')
            cmd.append('+%s:%s' % (ref, ref))
        self.run_git_command(cmd)

    def _update_server_info(self):
        """
        runs gits update-server-info command in this repo instance
        """
        try:
            update_server_info(self._repo)
        except OSError as e:
            if e.errno not in [errno.ENOENT, errno.EROFS]:
                raise
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)

    @LazyProperty
    def workdir(self):
        """
        Returns ``Workdir`` instance for this repository.
        """
        return workdir.GitWorkdir(self)

    def get_config_value(self, section, name, config_file=None):
        """
        Returns configuration value for a given [``section``] and ``name``.

        :param section: Section we want to retrieve value from
        :param name: Name of configuration we want to retrieve
        :param config_file: A path to file which should be used to retrieve
          configuration from (might also be a list of file paths)
        """
        if config_file is None:
            config_file = []
        elif isinstance(config_file, str):
            config_file = [config_file]

        def gen_configs():
            for path in config_file + self._config_files:
                try:
                    yield ConfigFile.from_path(path)
                except (IOError, OSError, ValueError):
                    continue

        for config in gen_configs():
            try:
                value = config.get(section, name)
            except KeyError:
                continue
            return None if value is None else safe_str(value)
        return None

    def get_user_name(self, config_file=None):
        """
        Returns user's name from global configuration file.

        :param config_file: A path to file which should be used to retrieve
          configuration from (might also be a list of file paths)
        """
        return self.get_config_value('user', 'name', config_file)

    def get_user_email(self, config_file=None):
        """
        Returns user's email from global configuration file.

        :param config_file: A path to file which should be used to retrieve
          configuration from (might also be a list of file paths)
        """
        return self.get_config_value('user', 'email', config_file)