Mercurial > kallithea
view kallithea/lib/vcs/backends/git/repository.py @ 8918:da519b9782e8 stable
hg: support Mercurial 6.1 without util.url
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Mon, 07 Mar 2022 17:47:13 +0100 |
parents | de59ad8185e1 |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- """ vcs.backends.git.repository ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Git repository implementation. :created_on: Apr 8, 2010 :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak. """ import errno import logging import os import re import time import urllib.error import urllib.parse import urllib.request from collections import OrderedDict try: from mercurial.utils.urlutil import url as hg_url except ImportError: # urlutil was introduced in Mercurial 5.8 from mercurial.util import url as hg_url from dulwich.client import SubprocessGitClient from dulwich.config import ConfigFile from dulwich.objects import Tag from dulwich.repo import NotGitRepository, Repo from dulwich.server import update_server_info from kallithea.lib.vcs import subprocessio from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator from kallithea.lib.vcs.conf import settings from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError) from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers from kallithea.lib.vcs.utils.lazy import LazyProperty from kallithea.lib.vcs.utils.paths import abspath, get_user_home from . import changeset, inmemory, workdir SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$') log = logging.getLogger(__name__) class GitRepository(BaseRepository): """ Git repository backend. """ DEFAULT_BRANCH_NAME = 'master' scm = 'git' def __init__(self, repo_path, create=False, src_url=None, update_after_clone=False, bare=False, baseui=None): baseui # unused self.path = abspath(repo_path) self.repo = self._get_repo(create, src_url, update_after_clone, bare) self.bare = self.repo.bare @property def _config_files(self): return [ self.bare and abspath(self.path, 'config') or abspath(self.path, '.git', 'config'), abspath(get_user_home(), '.gitconfig'), ] @property def _repo(self): return self.repo @property def head(self): try: return self._repo.head() except KeyError: return None @property def _empty(self): """ Checks if repository is empty ie. without any changesets """ try: self.revisions[0] except (KeyError, IndexError): return True return False @LazyProperty def revisions(self): """ Returns list of revisions' ids, in ascending order. Being lazy attribute allows external tools to inject shas from cache. """ return self._get_all_revisions() @classmethod def _run_git_command(cls, cmd, cwd=None): """ Runs given ``cmd`` as git command and returns output bytes in a tuple (stdout, stderr) ... or raise RepositoryError. :param cmd: git command to be executed :param cwd: passed directly to subprocess """ # need to clean fix GIT_DIR ! gitenv = dict(os.environ) gitenv.pop('GIT_DIR', None) gitenv['GIT_CONFIG_NOGLOBAL'] = '1' assert isinstance(cmd, list), cmd cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd try: p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False) except (EnvironmentError, OSError) as err: # output from the failing process is in str(EnvironmentError) msg = ("Couldn't run git command %s.\n" "Subprocess failed with '%s': %s\n" % (cmd, type(err).__name__, err) ).strip() log.error(msg) raise RepositoryError(msg) try: stdout = b''.join(p.output) stderr = b''.join(p.error) finally: p.close() # TODO: introduce option to make commands fail if they have any stderr output? if stderr: log.debug('stderr from %s:\n%s', cmd, stderr) else: log.debug('stderr from %s: None', cmd) return stdout, stderr def run_git_command(self, cmd): """ Runs given ``cmd`` as git command with cwd set to current repo. Returns stdout as unicode str ... or raise RepositoryError. """ cwd = None if os.path.isdir(self.path): cwd = self.path stdout, _stderr = self._run_git_command(cmd, cwd=cwd) return safe_str(stdout) @staticmethod def _check_url(url): r""" Raise URLError if url doesn't seem like a valid safe Git URL. We only allow http, https, git, and ssh URLs. For http and https URLs, make a connection and probe to see if it is valid. >>> GitRepository._check_url('git://example.com/my%20fine repo') >>> GitRepository._check_url('http://example.com:65537/repo') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Error parsing URL: 'http://example.com:65537/repo'> >>> GitRepository._check_url('foo') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Unsupported protocol in URL 'foo'> >>> GitRepository._check_url('file:///repo') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Unsupported protocol in URL 'file:///repo'> >>> GitRepository._check_url('git+http://example.com/repo') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Unsupported protocol in URL 'git+http://example.com/repo'> >>> GitRepository._check_url('git://example.com/%09') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Invalid escape character in path: '%'> >>> GitRepository._check_url('git://example.com/%x00') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Invalid escape character in path: '%'> >>> GitRepository._check_url(r'git://example.com/\u0009') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Invalid escape character in path: '\'> >>> GitRepository._check_url(r'git://example.com/\t') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Invalid escape character in path: '\'> >>> GitRepository._check_url('git://example.com/\t') Traceback (most recent call last): ... urllib.error.URLError: <urlopen error Invalid ...> The failure above will be one of, depending on the level of WhatWG support: urllib.error.URLError: <urlopen error Invalid whitespace character in path: '\t'> urllib.error.URLError: <urlopen error Invalid url: 'git://example.com/ ' normalizes to 'git://example.com/'> """ try: parsed_url = urllib.parse.urlparse(url) parsed_url.port # trigger netloc parsing which might raise ValueError except ValueError: raise urllib.error.URLError("Error parsing URL: %r" % url) # check first if it's not an local url if os.path.isabs(url) and os.path.isdir(url): return unparsed_url = urllib.parse.urlunparse(parsed_url) if unparsed_url != url: raise urllib.error.URLError("Invalid url: '%s' normalizes to '%s'" % (url, unparsed_url)) if parsed_url.scheme == 'git': # Mitigate problems elsewhere with incorrect handling of encoded paths. # Don't trust urllib.parse.unquote but be prepared for more flexible implementations elsewhere. # Space is the only allowed whitespace character - directly or % encoded. No other % or \ is allowed. for c in parsed_url.path.replace('%20', ' '): if c in '%\\': raise urllib.error.URLError("Invalid escape character in path: '%s'" % c) if c.isspace() and c != ' ': raise urllib.error.URLError("Invalid whitespace character in path: %r" % c) return if parsed_url.scheme not in ['http', 'https']: raise urllib.error.URLError("Unsupported protocol in URL %r" % url) url_obj = hg_url(safe_bytes(url)) test_uri, handlers = get_urllib_request_handlers(url_obj) if not test_uri.endswith(b'info/refs'): test_uri = test_uri.rstrip(b'/') + b'/info/refs' url_obj.passwd = b'*****' cleaned_uri = str(url_obj) o = urllib.request.build_opener(*handlers) o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git req = urllib.request.Request( "%s?%s" % ( safe_str(test_uri), urllib.parse.urlencode({"service": 'git-upload-pack'}) )) try: resp = o.open(req) if resp.code != 200: raise Exception('Return Code is not 200') except Exception as e: # means it cannot be cloned raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e)) # now detect if it's proper git repo gitdata = resp.read() if b'service=git-upload-pack' not in gitdata: raise urllib.error.URLError( "url [%s] does not look like an git" % cleaned_uri) def _get_repo(self, create, src_url=None, update_after_clone=False, bare=False): if create and os.path.exists(self.path): raise RepositoryError("Location already exist") if src_url and not create: raise RepositoryError("Create should be set to True if src_url is " "given (clone operation creates repository)") try: if create and src_url: GitRepository._check_url(src_url) self.clone(src_url, update_after_clone, bare) return Repo(self.path) elif create: os.makedirs(self.path) if bare: return Repo.init_bare(self.path) else: return Repo.init(self.path) else: return Repo(self.path) except (NotGitRepository, OSError) as err: raise RepositoryError(err) def _get_all_revisions(self): # we must check if this repo is not empty, since later command # fails if it is. And it's cheaper to ask than throw the subprocess # errors try: self._repo.head() except KeyError: return [] rev_filter = settings.GIT_REV_FILTER cmd = ['rev-list', rev_filter, '--reverse', '--date-order'] try: so = self.run_git_command(cmd) except RepositoryError: # Can be raised for empty repositories return [] return so.splitlines() def _get_all_revisions2(self): # alternate implementation using dulwich includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items() if type_ != b'T'] return [c.commit.id for c in self._repo.get_walker(include=includes)] def _get_revision(self, revision): """ Given any revision identifier, returns a 40 char string with revision hash. """ if self._empty: raise EmptyRepositoryError("There are no changesets yet") if revision in (None, '', 'tip', 'HEAD', 'head', -1): revision = -1 if isinstance(revision, int): try: return self.revisions[revision] except IndexError: msg = "Revision %r does not exist for %s" % (revision, self.name) raise ChangesetDoesNotExistError(msg) if isinstance(revision, str): if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')): try: return self.revisions[int(revision)] except IndexError: msg = "Revision %r does not exist for %s" % (revision, self) raise ChangesetDoesNotExistError(msg) # get by branch/tag name _ref_revision = self._parsed_refs.get(safe_bytes(revision)) if _ref_revision: # and _ref_revision[1] in [b'H', b'RH', b'T']: return ascii_str(_ref_revision[0]) if revision in self.revisions: return revision # maybe it's a tag ? we don't have them in self.revisions if revision in self.tags.values(): return revision if SHA_PATTERN.match(revision): msg = "Revision %r does not exist for %s" % (revision, self.name) raise ChangesetDoesNotExistError(msg) raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision) def get_ref_revision(self, ref_type, ref_name): """ Returns ``GitChangeset`` object representing repository's changeset at the given ``revision``. """ return self._get_revision(ref_name) def _get_archives(self, archive_name='tip'): for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]: yield {"type": i[0], "extension": i[1], "node": archive_name} def _get_url(self, url): """ Returns normalized url. If schema is not given, would fall to filesystem (``file:///``) schema. """ if url != 'default' and '://' not in url: url = ':///'.join(('file', url)) return url @LazyProperty def name(self): return os.path.basename(self.path) @LazyProperty def last_change(self): """ Returns last change made on this repository as datetime object """ return date_fromtimestamp(self._get_mtime(), makedate()[1]) def _get_mtime(self): try: return time.mktime(self.get_changeset().date.timetuple()) except RepositoryError: idx_loc = '' if self.bare else '.git' # fallback to filesystem in_path = os.path.join(self.path, idx_loc, "index") he_path = os.path.join(self.path, idx_loc, "HEAD") if os.path.exists(in_path): return os.stat(in_path).st_mtime else: return os.stat(he_path).st_mtime @LazyProperty def description(self): return safe_str(self._repo.get_description() or b'unknown') @property def branches(self): if not self.revisions: return {} _branches = [(safe_str(key), ascii_str(sha)) for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H'] return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False)) @LazyProperty def closed_branches(self): return {} @LazyProperty def tags(self): return self._get_tags() def _get_tags(self): if not self.revisions: return {} _tags = [(safe_str(key), ascii_str(sha)) for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T'] return OrderedDict(sorted(_tags, key=(lambda ctx: ctx[0]), reverse=True)) def tag(self, name, user, revision=None, message=None, date=None, **kwargs): """ Creates and returns a tag for the given ``revision``. :param name: name for new tag :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>" :param revision: changeset id for which new tag would be created :param message: message of the tag's commit :param date: date of tag's commit :raises TagAlreadyExistError: if tag with same name already exists """ if name in self.tags: raise TagAlreadyExistError("Tag %s already exists" % name) changeset = self.get_changeset(revision) message = message or "Added tag %s for commit %s" % (name, changeset.raw_id) self._repo.refs[b"refs/tags/%s" % safe_bytes(name)] = changeset._commit.id self._parsed_refs = self._get_parsed_refs() self.tags = self._get_tags() return changeset def remove_tag(self, name, user, message=None, date=None): """ Removes tag with the given ``name``. :param name: name of the tag to be removed :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>" :param message: message of the tag's removal commit :param date: date of tag's removal commit :raises TagDoesNotExistError: if tag with given name does not exists """ if name not in self.tags: raise TagDoesNotExistError("Tag %s does not exist" % name) # self._repo.refs is a DiskRefsContainer, and .path gives the full absolute path of '.git' tagpath = os.path.join(safe_str(self._repo.refs.path), 'refs', 'tags', name) try: os.remove(tagpath) self._parsed_refs = self._get_parsed_refs() self.tags = self._get_tags() except OSError as e: raise RepositoryError(e.strerror) @LazyProperty def bookmarks(self): """ Gets bookmarks for this repository """ return {} @LazyProperty def _parsed_refs(self): return self._get_parsed_refs() def _get_parsed_refs(self): """Return refs as a dict, like: { b'v0.2.0': [b'599ba911aa24d2981225f3966eb659dfae9e9f30', b'T'] } """ _repo = self._repo refs = _repo.get_refs() keys = [(b'refs/heads/', b'H'), (b'refs/remotes/origin/', b'RH'), (b'refs/tags/', b'T')] _refs = {} for ref, sha in refs.items(): for k, type_ in keys: if ref.startswith(k): _key = ref[len(k):] if type_ == b'T': obj = _repo.get_object(sha) if isinstance(obj, Tag): sha = _repo.get_object(sha).object[1] _refs[_key] = [sha, type_] break return _refs def _heads(self, reverse=False): refs = self._repo.get_refs() heads = {} for key, val in refs.items(): for ref_key in [b'refs/heads/', b'refs/remotes/origin/']: if key.startswith(ref_key): n = key[len(ref_key):] if n not in [b'HEAD']: heads[n] = val return heads if reverse else dict((y, x) for x, y in heads.items()) def get_changeset(self, revision=None): """ Returns ``GitChangeset`` object representing commit from git repository at the given revision or head (most recent commit) if None given. """ if isinstance(revision, changeset.GitChangeset): return revision return changeset.GitChangeset(repository=self, revision=self._get_revision(revision)) def get_changesets(self, start=None, end=None, start_date=None, end_date=None, branch_name=None, reverse=False, max_revisions=None): """ Returns iterator of ``GitChangeset`` objects from start to end (both are inclusive), in ascending date order (unless ``reverse`` is set). :param start: changeset ID, as str; first returned changeset :param end: changeset ID, as str; last returned changeset :param start_date: if specified, changesets with commit date less than ``start_date`` would be filtered out from returned set :param end_date: if specified, changesets with commit date greater than ``end_date`` would be filtered out from returned set :param branch_name: if specified, changesets not reachable from given branch would be filtered out from returned set :param reverse: if ``True``, returned generator would be reversed (meaning that returned changesets would have descending date order) :raise BranchDoesNotExistError: If given ``branch_name`` does not exist. :raise ChangesetDoesNotExistError: If changeset for given ``start`` or ``end`` could not be found. """ if branch_name and branch_name not in self.branches: raise BranchDoesNotExistError("Branch '%s' not found" % branch_name) # actually we should check now if it's not an empty repo to not spaw # subprocess commands if self._empty: raise EmptyRepositoryError("There are no changesets yet") # %H at format means (full) commit hash, initial hashes are retrieved # in ascending date order cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H'] if max_revisions: cmd += ['--max-count=%s' % max_revisions] if start_date: cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')] if end_date: cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')] if branch_name: cmd.append(branch_name) else: cmd.append(settings.GIT_REV_FILTER) revs = self.run_git_command(cmd).splitlines() start_pos = 0 end_pos = len(revs) if start: _start = self._get_revision(start) try: start_pos = revs.index(_start) except ValueError: pass if end is not None: _end = self._get_revision(end) try: end_pos = revs.index(_end) except ValueError: pass if None not in [start, end] and start_pos > end_pos: raise RepositoryError('start cannot be after end') if end_pos is not None: end_pos += 1 revs = revs[start_pos:end_pos] if reverse: revs.reverse() return CollectionGenerator(self, revs) def get_diff_changesets(self, org_rev, other_repo, other_rev): """ Returns lists of changesets that can be merged from this repo @org_rev to other_repo @other_rev ... and the other way ... and the ancestors that would be used for merge :param org_rev: the revision we want our compare to be made :param other_repo: repo object, most likely the fork of org_repo. It has all changesets that we need to obtain :param other_rev: revision we want out compare to be made on other_repo """ org_changesets = [] ancestors = None if org_rev == other_rev: other_changesets = [] elif self != other_repo: gitrepo = Repo(self.path) SubprocessGitClient(thin_packs=False).fetch(other_repo.path, gitrepo) gitrepo_remote = Repo(other_repo.path) SubprocessGitClient(thin_packs=False).fetch(self.path, gitrepo_remote) revs = [ ascii_str(x.commit.id) for x in gitrepo_remote.get_walker(include=[ascii_bytes(other_rev)], exclude=[ascii_bytes(org_rev)]) ] other_changesets = [other_repo.get_changeset(rev) for rev in reversed(revs)] if other_changesets: ancestors = [other_changesets[0].parents[0].raw_id] else: # no changesets from other repo, ancestor is the other_rev ancestors = [other_rev] gitrepo.close() gitrepo_remote.close() else: so = self.run_git_command( ['log', '--reverse', '--pretty=format:%H', '-s', '%s..%s' % (org_rev, other_rev)] ) other_changesets = [self.get_changeset(cs) for cs in re.findall(r'[0-9a-fA-F]{40}', so)] so = self.run_git_command( ['merge-base', org_rev, other_rev] ) ancestors = [re.findall(r'[0-9a-fA-F]{40}', so)[0]] return other_changesets, org_changesets, ancestors def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False, context=3): """ Returns (git like) *diff*, as plain bytes text. Shows changes introduced by ``rev2`` since ``rev1``. :param rev1: Entry point from which diff is shown. Can be ``self.EMPTY_CHANGESET`` - in this case, patch showing all the changes since empty state of the repository until ``rev2`` :param rev2: Until which revision changes should be shown. :param ignore_whitespace: If set to ``True``, would not show whitespace changes. Defaults to ``False``. :param context: How many lines before/after changed lines should be shown. Defaults to ``3``. Due to limitations in Git, if value passed-in is greater than ``2**31-1`` (``2147483647``), it will be set to ``2147483647`` instead. If negative value is passed-in, it will be set to ``0`` instead. """ # Git internally uses a signed long int for storing context # size (number of lines to show before and after the # differences). This can result in integer overflow, so we # ensure the requested context is smaller by one than the # number that would cause the overflow. It is highly unlikely # that a single file will contain that many lines, so this # kind of change should not cause any realistic consequences. overflowed_long_int = 2**31 if context >= overflowed_long_int: context = overflowed_long_int - 1 # Negative context values make no sense, and will result in # errors. Ensure this does not happen. if context < 0: context = 0 flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40'] if ignore_whitespace: flags.append('-w') if hasattr(rev1, 'raw_id'): rev1 = getattr(rev1, 'raw_id') if hasattr(rev2, 'raw_id'): rev2 = getattr(rev2, 'raw_id') if rev1 == self.EMPTY_CHANGESET: rev2 = self.get_changeset(rev2).raw_id cmd = ['show'] + flags + [rev2] else: rev1 = self.get_changeset(rev1).raw_id rev2 = self.get_changeset(rev2).raw_id cmd = ['diff'] + flags + [rev1, rev2] if path: cmd += ['--', path] stdout, stderr = self._run_git_command(cmd, cwd=self.path) # If we used 'show' command, strip first few lines (until actual diff # starts) if rev1 == self.EMPTY_CHANGESET: parts = stdout.split(b'\ndiff ', 1) if len(parts) > 1: stdout = b'diff ' + parts[1] return stdout @LazyProperty def in_memory_changeset(self): """ Returns ``GitInMemoryChangeset`` object for this repository. """ return inmemory.GitInMemoryChangeset(self) def clone(self, url, update_after_clone=True, bare=False): """ Tries to clone changes from external location. :param update_after_clone: If set to ``False``, git won't checkout working directory :param bare: If set to ``True``, repository would be cloned into *bare* git repository (no working directory at all). """ url = self._get_url(url) cmd = ['clone', '-q'] if bare: cmd.append('--bare') elif not update_after_clone: cmd.append('--no-checkout') cmd += ['--', url, self.path] # If error occurs run_git_command raises RepositoryError already self.run_git_command(cmd) def pull(self, url): """ Tries to pull changes from external location. """ url = self._get_url(url) cmd = ['pull', '--ff-only', url] # If error occurs run_git_command raises RepositoryError already self.run_git_command(cmd) def fetch(self, url): """ Tries to pull changes from external location. """ url = self._get_url(url) so = self.run_git_command(['ls-remote', '-h', url]) cmd = ['fetch', url, '--'] for line in so.splitlines(): sha, ref = line.split('\t') cmd.append('+%s:%s' % (ref, ref)) self.run_git_command(cmd) def _update_server_info(self): """ runs gits update-server-info command in this repo instance """ try: update_server_info(self._repo) except OSError as e: if e.errno not in [errno.ENOENT, errno.EROFS]: raise # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e) @LazyProperty def workdir(self): """ Returns ``Workdir`` instance for this repository. """ return workdir.GitWorkdir(self) def get_config_value(self, section, name, config_file=None): """ Returns configuration value for a given [``section``] and ``name``. :param section: Section we want to retrieve value from :param name: Name of configuration we want to retrieve :param config_file: A path to file which should be used to retrieve configuration from (might also be a list of file paths) """ if config_file is None: config_file = [] elif isinstance(config_file, str): config_file = [config_file] def gen_configs(): for path in config_file + self._config_files: try: yield ConfigFile.from_path(path) except (IOError, OSError, ValueError): continue for config in gen_configs(): try: value = config.get(section, name) except KeyError: continue return None if value is None else safe_str(value) return None def get_user_name(self, config_file=None): """ Returns user's name from global configuration file. :param config_file: A path to file which should be used to retrieve configuration from (might also be a list of file paths) """ return self.get_config_value('user', 'name', config_file) def get_user_email(self, config_file=None): """ Returns user's email from global configuration file. :param config_file: A path to file which should be used to retrieve configuration from (might also be a list of file paths) """ return self.get_config_value('user', 'email', config_file)