view kallithea/lib/utils2.py @ 5227:148360f533a4

Merge stable
author Mads Kiilerich <madski@unity3d.com>
date Mon, 13 Jul 2015 19:37:39 +0200
parents c082a017b366 9a02f9ef28d7
children fa8b8df51ad0
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.lib.utils
~~~~~~~~~~~~~~~~~~~

Some simple helper functions

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Jan 5, 2011
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""


import os
import re
import sys
import time
import uuid
import datetime
import urllib
import binascii

import webob
import urlobject

from pylons.i18n.translation import _, ungettext
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.compat import json


def __get_lem():
    """
    Get language extension map based on what's inside pygments lexers
    """
    from pygments import lexers
    from string import lower
    from collections import defaultdict

    d = defaultdict(lambda: [])

    def __clean(s):
        s = s.lstrip('*')
        s = s.lstrip('.')

        if s.find('[') != -1:
            exts = []
            start, stop = s.find('['), s.find(']')

            for suffix in s[start + 1:stop]:
                exts.append(s[:s.find('[')] + suffix)
            return map(lower, exts)
        else:
            return map(lower, [s])

    for lx, t in sorted(lexers.LEXERS.items()):
        m = map(__clean, t[-2])
        if m:
            m = reduce(lambda x, y: x + y, m)
            for ext in m:
                desc = lx.replace('Lexer', '')
                d[ext].append(desc)

    return dict(d)


def str2bool(_str):
    """
    returs True/False value from given string, it tries to translate the
    string into boolean

    :param _str: string value to translate into boolean
    :rtype: boolean
    :returns: boolean from given string
    """
    if _str is None:
        return False
    if _str in (True, False):
        return _str
    _str = str(_str).strip().lower()
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')


def aslist(obj, sep=None, strip=True):
    """
    Returns given string separated by sep as list

    :param obj:
    :param sep:
    :param strip:
    """
    if isinstance(obj, (basestring)):
        lst = obj.split(sep)
        if strip:
            lst = [v.strip() for v in lst]
        return lst
    elif isinstance(obj, (list, tuple)):
        return obj
    elif obj is None:
        return []
    else:
        return [obj]


def convert_line_endings(line, mode):
    """
    Converts a given line  "line end" according to given mode

    Available modes are::
        0 - Unix
        1 - Mac
        2 - DOS

    :param line: given line to convert
    :param mode: mode to convert to
    :rtype: str
    :return: converted line according to mode
    """
    from string import replace

    if mode == 0:
            line = replace(line, '\r\n', '\n')
            line = replace(line, '\r', '\n')
    elif mode == 1:
            line = replace(line, '\r\n', '\r')
            line = replace(line, '\n', '\r')
    elif mode == 2:
            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
    return line


def detect_mode(line, default):
    """
    Detects line break for given line, if line break couldn't be found
    given default value is returned

    :param line: str line
    :param default: default
    :rtype: int
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
    """
    if line.endswith('\r\n'):
        return 2
    elif line.endswith('\n'):
        return 0
    elif line.endswith('\r'):
        return 1
    else:
        return default


def generate_api_key():
    """
    Generates a random (presumably unique) API key.
    """
    return binascii.hexlify(os.urandom(20))


def safe_int(val, default=None):
    """
    Returns int() of val if val is not convertable to int use default
    instead

    :param val:
    :param default:
    """

    try:
        val = int(val)
    except (ValueError, TypeError):
        val = default

    return val


def safe_unicode(str_, from_encoding=None):
    """
    safe unicode function. Does few trick to turn str_ into unicode

    In case of UnicodeDecode error we try to return it with encoding detected
    by chardet library if it fails fallback to unicode with errors replaced

    :param str_: string to decode
    :rtype: unicode
    :returns: unicode object
    """
    if isinstance(str_, unicode):
        return str_

    if not from_encoding:
        import kallithea
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
                                                        'utf8'), sep=',')
        from_encoding = DEFAULT_ENCODINGS

    if not isinstance(from_encoding, (list, tuple)):
        from_encoding = [from_encoding]

    try:
        return unicode(str_)
    except UnicodeDecodeError:
        pass

    for enc in from_encoding:
        try:
            return unicode(str_, enc)
        except UnicodeDecodeError:
            pass

    try:
        import chardet
        encoding = chardet.detect(str_)['encoding']
        if encoding is None:
            raise Exception()
        return str_.decode(encoding)
    except (ImportError, UnicodeDecodeError, Exception):
        return unicode(str_, from_encoding[0], 'replace')


def safe_str(unicode_, to_encoding=None):
    """
    safe str function. Does few trick to turn unicode_ into string

    In case of UnicodeEncodeError we try to return it with encoding detected
    by chardet library if it fails fallback to string with errors replaced

    :param unicode_: unicode to encode
    :rtype: str
    :returns: str object
    """

    # if it's not basestr cast to str
    if not isinstance(unicode_, basestring):
        return str(unicode_)

    if isinstance(unicode_, str):
        return unicode_

    if not to_encoding:
        import kallithea
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
                                                        'utf8'), sep=',')
        to_encoding = DEFAULT_ENCODINGS

    if not isinstance(to_encoding, (list, tuple)):
        to_encoding = [to_encoding]

    for enc in to_encoding:
        try:
            return unicode_.encode(enc)
        except UnicodeEncodeError:
            pass

    try:
        import chardet
        encoding = chardet.detect(unicode_)['encoding']
        if encoding is None:
            raise UnicodeEncodeError()

        return unicode_.encode(encoding)
    except (ImportError, UnicodeEncodeError):
        return unicode_.encode(to_encoding[0], 'replace')


def remove_suffix(s, suffix):
    if s.endswith(suffix):
        s = s[:-1 * len(suffix)]
    return s


def remove_prefix(s, prefix):
    if s.startswith(prefix):
        s = s[len(prefix):]
    return s


def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
    """
    Custom engine_from_config functions that makes sure we use NullPool for
    file based sqlite databases. This prevents errors on sqlite. This only
    applies to sqlalchemy versions < 0.7.0

    """
    import sqlalchemy
    from sqlalchemy import engine_from_config as efc
    import logging

    if int(sqlalchemy.__version__.split('.')[1]) < 7:

        # This solution should work for sqlalchemy < 0.7.0, and should use
        # proxy=TimerProxy() for execution time profiling

        from sqlalchemy.pool import NullPool
        url = configuration[prefix + 'url']

        if url.startswith('sqlite'):
            kwargs.update({'poolclass': NullPool})
        return efc(configuration, prefix, **kwargs)
    else:
        import time
        from sqlalchemy import event

        log = logging.getLogger('sqlalchemy.engine')
        BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
        engine = efc(configuration, prefix, **kwargs)

        def color_sql(sql):
            COLOR_SEQ = "\033[1;%dm"
            COLOR_SQL = YELLOW
            normal = '\x1b[0m'
            return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])

        if configuration['debug']:
            #attach events only for debug configuration

            def before_cursor_execute(conn, cursor, statement,
                                    parameters, context, executemany):
                context._query_start_time = time.time()
                log.info(color_sql(">>>>> STARTING QUERY >>>>>"))

            def after_cursor_execute(conn, cursor, statement,
                                    parameters, context, executemany):
                total = time.time() - context._query_start_time
                log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))

            event.listen(engine, "before_cursor_execute",
                         before_cursor_execute)
            event.listen(engine, "after_cursor_execute",
                         after_cursor_execute)

    return engine


def age(prevdate, show_short_version=False, now=None):
    """
    turns a datetime into an age string.
    If show_short_version is True, then it will generate a not so accurate but shorter string,
    example: 2days ago, instead of 2 days and 23 hours ago.

    :param prevdate: datetime object
    :param show_short_version: if it should aproximate the date and return a shorter string
    :rtype: unicode
    :returns: unicode words describing age
    """
    now = now or datetime.datetime.now()
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
    deltas = {}
    future = False

    if prevdate > now:
        now, prevdate = prevdate, now
        future = True
    if future:
        prevdate = prevdate.replace(microsecond=0)
    # Get date parts deltas
    from dateutil import relativedelta
    for part in order:
        d = relativedelta.relativedelta(now, prevdate)
        deltas[part] = getattr(d, part + 's')

    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
    # not 1 hour, -59 minutes and -59 seconds)
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
        part = order[num]
        carry_part = order[num - 1]

        if deltas[part] < 0:
            deltas[part] += length
            deltas[carry_part] -= 1

    # Same thing for days except that the increment depends on the (variable)
    # number of days in the month
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
    if deltas['day'] < 0:
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
            deltas['day'] += 29
        else:
            deltas['day'] += month_lengths[prevdate.month - 1]

        deltas['month'] -= 1

    if deltas['month'] < 0:
        deltas['month'] += 12
        deltas['year'] -= 1

    # In short version, we want nicer handling of ages of more than a year
    if show_short_version:
        if deltas['year'] == 1:
            # ages between 1 and 2 years: show as months
            deltas['month'] += 12
            deltas['year'] = 0
        if deltas['year'] >= 2:
            # ages 2+ years: round
            if deltas['month'] > 6:
                deltas['year'] += 1
                deltas['month'] = 0

    # Format the result
    fmt_funcs = {
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
    }

    for i, part in enumerate(order):
        value = deltas[part]
        if value == 0:
            continue

        if i < 5:
            sub_part = order[i + 1]
            sub_value = deltas[sub_part]
        else:
            sub_value = 0

        if sub_value == 0 or show_short_version:
            if future:
                return _(u'in %s') % fmt_funcs[part](value)
            else:
                return _(u'%s ago') % fmt_funcs[part](value)
        if future:
            return _(u'in %s and %s') % (fmt_funcs[part](value),
                fmt_funcs[sub_part](sub_value))
        else:
            return _(u'%s and %s ago') % (fmt_funcs[part](value),
                fmt_funcs[sub_part](sub_value))

    return _(u'just now')


def uri_filter(uri):
    """
    Removes user:password from given url string

    :param uri:
    :rtype: unicode
    :returns: filtered list of strings
    """
    if not uri:
        return ''

    proto = ''

    for pat in ('https://', 'http://', 'git://'):
        if uri.startswith(pat):
            uri = uri[len(pat):]
            proto = pat
            break

    # remove passwords and username
    uri = uri[uri.find('@') + 1:]

    # get the port
    cred_pos = uri.find(':')
    if cred_pos == -1:
        host, port = uri, None
    else:
        host, port = uri[:cred_pos], uri[cred_pos + 1:]

    return filter(None, [proto, host, port])


def credentials_filter(uri):
    """
    Returns a url with removed credentials

    :param uri:
    """

    uri = uri_filter(uri)
    #check if we have port
    if len(uri) > 2 and uri[2]:
        uri[2] = ':' + uri[2]

    return ''.join(uri)


def get_clone_url(uri_tmpl, qualified_home_url, repo_name, repo_id, **override):
    parsed_url = urlobject.URLObject(qualified_home_url)
    decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
    args = {
        'scheme': parsed_url.scheme,
        'user': '',
        'netloc': parsed_url.netloc+decoded_path,  # path if we use proxy-prefix
        'prefix': decoded_path,
        'repo': repo_name,
        'repoid': str(repo_id)
    }
    args.update(override)
    args['user'] = urllib.quote(safe_str(args['user']))

    for k, v in args.items():
        uri_tmpl = uri_tmpl.replace('{%s}' % k, v)

    # remove leading @ sign if it's present. Case of empty user
    url_obj = urlobject.URLObject(uri_tmpl)
    url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))

    return safe_unicode(url)


def get_changeset_safe(repo, rev):
    """
    Safe version of get_changeset if this changeset doesn't exists for a
    repo it returns a Dummy one instead

    :param repo:
    :param rev:
    """
    from kallithea.lib.vcs.backends.base import BaseRepository
    from kallithea.lib.vcs.exceptions import RepositoryError
    from kallithea.lib.vcs.backends.base import EmptyChangeset
    if not isinstance(repo, BaseRepository):
        raise Exception('You must pass an Repository '
                        'object as first argument got %s', type(repo))

    try:
        cs = repo.get_changeset(rev)
    except (RepositoryError, LookupError):
        cs = EmptyChangeset(requested_revision=rev)
    return cs


def datetime_to_time(dt):
    if dt:
        return time.mktime(dt.timetuple())


def time_to_datetime(tm):
    if tm:
        if isinstance(tm, basestring):
            try:
                tm = float(tm)
            except ValueError:
                return
        return datetime.datetime.fromtimestamp(tm)

# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
# Check char before @ - it must not look like we are in an email addresses.
# Matching is gready so we don't have to look beyond the end.
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')

def extract_mentioned_users(s):
    r"""
    Returns unique usernames from given string s that have @mention

    :param s: string to get mentions

    >>> extract_mentioned_users('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'hh', 'zz']
    """
    usrs = set()
    for username in MENTIONS_REGEX.findall(s):
        usrs.add(username)

    return sorted(list(usrs), key=lambda k: k.lower())


class AttributeDict(dict):
    def __getattr__(self, attr):
        return self.get(attr, None)
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__


def fix_PATH(os_=None):
    """
    Get current active python path, and append it to PATH variable to fix issues
    of subprocess calls and different python versions
    """
    if os_ is None:
        import os
    else:
        os = os_

    cur_path = os.path.split(sys.executable)[0]
    if not os.environ['PATH'].startswith(cur_path):
        os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])


def obfuscate_url_pw(engine):
    from sqlalchemy.engine import url as sa_url
    from sqlalchemy.exc import ArgumentError
    try:
        _url = sa_url.make_url(engine or '')
    except ArgumentError:
        return engine
    if _url.password:
        _url.password = 'XXXXX'
    return str(_url)


def get_server_url(environ):
    req = webob.Request(environ)
    return req.host_url + req.script_name


def _extract_extras(env=None):
    """
    Extracts the Kallithea extras data from os.environ, and wraps it into named
    AttributeDict object
    """
    if not env:
        env = os.environ

    try:
        extras = json.loads(env['KALLITHEA_EXTRAS'])
    except KeyError:
        extras = {}

    try:
        for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
                  'action', 'ip']:
            extras[k]
    except KeyError, e:
        raise Exception('Missing key %s in os.environ %s' % (e, extras))

    return AttributeDict(extras)


def _set_extras(extras):
    # RC_SCM_DATA can probably be removed in the future, but for compatibilty now...
    os.environ['KALLITHEA_EXTRAS'] = os.environ['RC_SCM_DATA'] = json.dumps(extras)


def unique_id(hexlen=32):
    alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
    return suuid(truncate_to=hexlen, alphabet=alphabet)


def suuid(url=None, truncate_to=22, alphabet=None):
    """
    Generate and return a short URL safe UUID.

    If the url parameter is provided, set the namespace to the provided
    URL and generate a UUID.

    :param url to get the uuid for
    :truncate_to: truncate the basic 22 UUID to shorter version

    The IDs won't be universally unique any longer, but the probability of
    a collision will still be very low.
    """
    # Define our alphabet.
    _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"

    # If no URL is given, generate a random UUID.
    if url is None:
        unique_id = uuid.uuid4().int
    else:
        unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int

    alphabet_length = len(_ALPHABET)
    output = []
    while unique_id > 0:
        digit = unique_id % alphabet_length
        output.append(_ALPHABET[digit])
        unique_id = int(unique_id / alphabet_length)
    return "".join(output)[:truncate_to]


def get_current_authuser():
    """
    Gets kallithea user from threadlocal tmpl_context variable if it's
    defined, else returns None.
    """
    from pylons import tmpl_context
    if hasattr(tmpl_context, 'authuser'):
        return tmpl_context.authuser

    return None


class OptionalAttr(object):
    """
    Special Optional Option that defines other attribute. Example::

        def test(apiuser, userid=Optional(OAttr('apiuser')):
            user = Optional.extract(userid)
            # calls

    """

    def __init__(self, attr_name):
        self.attr_name = attr_name

    def __repr__(self):
        return '<OptionalAttr:%s>' % self.attr_name

    def __call__(self):
        return self

#alias
OAttr = OptionalAttr


class Optional(object):
    """
    Defines an optional parameter::

        param = param.getval() if isinstance(param, Optional) else param
        param = param() if isinstance(param, Optional) else param

    is equivalent of::

        param = Optional.extract(param)

    """

    def __init__(self, type_):
        self.type_ = type_

    def __repr__(self):
        return '<Optional:%s>' % self.type_.__repr__()

    def __call__(self):
        return self.getval()

    def getval(self):
        """
        returns value from this Optional instance
        """
        if isinstance(self.type_, OAttr):
            # use params name
            return self.type_.attr_name
        return self.type_

    @classmethod
    def extract(cls, val):
        """
        Extracts value from Optional() instance

        :param val:
        :return: original value if it's not Optional instance else
            value of instance
        """
        if isinstance(val, cls):
            return val.getval()
        return val

def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
    return _cleanstringsub('_', safe_str(s)).rstrip('_')