view rhodecode/lib/vcs/utils/helpers.py @ 3092:cd25fa6f4008 beta

remove unused function get_repo_paths It should probably handle removed repositories somehow if it should live on.
author Mads Kiilerich <madski@unity3d.com>
date Wed, 12 Dec 2012 18:13:31 +0100
parents 291be8fa4d4f
children ba2e2514a01a
line wrap: on
line source

"""
Utitlites aimed to help achieve mostly basic tasks.
"""
from __future__ import division

import re
import time
import datetime
import os.path
from subprocess import Popen, PIPE
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.utils.paths import abspath

ALIASES = ['hg', 'git']


def get_scm(path, search_up=False, explicit_alias=None):
    """
    Returns one of alias from ``ALIASES`` (in order of precedence same as
    shortcuts given in ``ALIASES``) and top working dir path for the given
    argument. If no scm-specific directory is found or more than one scm is
    found at that directory, ``VCSError`` is raised.

    :param search_up: if set to ``True``, this function would try to
      move up to parent directory every time no scm is recognized for the
      currently checked path. Default: ``False``.
    :param explicit_alias: can be one of available backend aliases, when given
      it will return given explicit alias in repositories under more than one
      version control, if explicit_alias is different than found it will raise
      VCSError
    """
    if not os.path.isdir(path):
        raise VCSError("Given path %s is not a directory" % path)

    def get_scms(path):
        return [(scm, path) for scm in get_scms_for_path(path)]

    found_scms = get_scms(path)
    while not found_scms and search_up:
        newpath = abspath(path, '..')
        if newpath == path:
            break
        path = newpath
        found_scms = get_scms(path)

    if len(found_scms) > 1:
        for scm in found_scms:
            if scm[0] == explicit_alias:
                return scm
        raise VCSError('More than one [%s] scm found at given path %s'
                       % (','.join((x[0] for x in found_scms)), path))

    if len(found_scms) is 0:
        raise VCSError('No scm found at given path %s' % path)

    return found_scms[0]


def get_scms_for_path(path):
    """
    Returns all scm's found at the given path. If no scm is recognized
    - empty list is returned.

    :param path: path to directory which should be checked. May be callable.

    :raises VCSError: if given ``path`` is not a directory
    """
    from rhodecode.lib.vcs.backends import get_backend
    if hasattr(path, '__call__'):
        path = path()
    if not os.path.isdir(path):
        raise VCSError("Given path %r is not a directory" % path)

    result = []
    for key in ALIASES:
        dirname = os.path.join(path, '.' + key)
        if os.path.isdir(dirname):
            result.append(key)
            continue
        dirname = os.path.join(path, 'rm__.' + key)
        if os.path.isdir(dirname):
            return [None]
        # We still need to check if it's not bare repository as
        # bare repos don't have working directories
        try:
            get_backend(key)(path)
            result.append(key)
            continue
        except RepositoryError:
            # Wrong backend
            pass
        except VCSError:
            # No backend at all
            pass
    return result


def run_command(cmd, *args):
    """
    Runs command on the system with given ``args``.
    """
    command = ' '.join((cmd, args))
    p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)
    stdout, stderr = p.communicate()
    return p.retcode, stdout, stderr


def get_highlighted_code(name, code, type='terminal'):
    """
    If pygments are available on the system
    then returned output is colored. Otherwise
    unchanged content is returned.
    """
    import logging
    try:
        import pygments
        pygments
    except ImportError:
        return code
    from pygments import highlight
    from pygments.lexers import guess_lexer_for_filename, ClassNotFound
    from pygments.formatters import TerminalFormatter

    try:
        lexer = guess_lexer_for_filename(name, code)
        formatter = TerminalFormatter()
        content = highlight(code, lexer, formatter)
    except ClassNotFound:
        logging.debug("Couldn't guess Lexer, will not use pygments.")
        content = code
    return content

def parse_changesets(text):
    """
    Returns dictionary with *start*, *main* and *end* ids.

    Examples::

        >>> parse_changesets('aaabbb')
        {'start': None, 'main': 'aaabbb', 'end': None}
        >>> parse_changesets('aaabbb..cccddd')
        {'start': 'aaabbb', 'main': None, 'end': 'cccddd'}

    """
    text = text.strip()
    CID_RE = r'[a-zA-Z0-9]+'
    if not '..' in text:
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
        if m:
            return {
                'start': None,
                'main': text,
                'end': None,
            }
    else:
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
        m = re.match(RE, text)
        if m:
            result = m.groupdict()
            result['main'] = None
            return result
    raise ValueError("IDs not recognized")

def parse_datetime(text):
    """
    Parses given text and returns ``datetime.datetime`` instance or raises
    ``ValueError``.

    :param text: string of desired date/datetime or something more verbose,
      like *yesterday*, *2weeks 3days*, etc.
    """

    text = text.strip().lower()

    INPUT_FORMATS = (
        '%Y-%m-%d %H:%M:%S',
        '%Y-%m-%d %H:%M',
        '%Y-%m-%d',
        '%m/%d/%Y %H:%M:%S',
        '%m/%d/%Y %H:%M',
        '%m/%d/%Y',
        '%m/%d/%y %H:%M:%S',
        '%m/%d/%y %H:%M',
        '%m/%d/%y',
    )
    for format in INPUT_FORMATS:
        try:
            return datetime.datetime(*time.strptime(text, format)[:6])
        except ValueError:
            pass

    # Try descriptive texts
    if text == 'tomorrow':
        future = datetime.datetime.now() + datetime.timedelta(days=1)
        args = future.timetuple()[:3] + (23, 59, 59)
        return datetime.datetime(*args)
    elif text == 'today':
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
    elif text == 'now':
        return datetime.datetime.now()
    elif text == 'yesterday':
        past = datetime.datetime.now() - datetime.timedelta(days=1)
        return datetime.datetime(*past.timetuple()[:3])
    else:
        days = 0
        matched = re.match(
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
        if matched:
            groupdict = matched.groupdict()
            if groupdict['days']:
                days += int(matched.groupdict()['days'])
            if groupdict['weeks']:
                days += int(matched.groupdict()['weeks']) * 7
            past = datetime.datetime.now() - datetime.timedelta(days=days)
            return datetime.datetime(*past.timetuple()[:3])

    raise ValueError('Wrong date: "%s"' % text)


def get_dict_for_attrs(obj, attrs):
    """
    Returns dictionary for each attribute from given ``obj``.
    """
    data = {}
    for attr in attrs:
        data[attr] = getattr(obj, attr)
    return data


def get_total_seconds(timedelta):
    """
    Backported for Python 2.5.

    See http://docs.python.org/library/datetime.html.
    """
    return ((timedelta.microseconds + (
            timedelta.seconds +
            timedelta.days * 24 * 60 * 60
        ) * 10**6) / 10**6)