diff rhodecode/lib/utils2.py @ 2109:8ecfed1d8f8b beta

utils/conf - created temporary utils2 - made config.conf for storing some configurations - fixed some dependency import problems - code cleanup - rc-extensions now properly work for celery
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 07 Mar 2012 02:18:22 +0200
children ea5ff843b200
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/utils2.py	Wed Mar 07 02:18:22 2012 +0200
@@ -0,0 +1,405 @@
+# -*- coding: utf-8 -*-
+    rhodecode.lib.utils
+    ~~~~~~~~~~~~~~~~~~~
+    Some simple helper functions
+    :created_on: Jan 5, 2011
+    :author: marcink
+    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
+    :license: GPLv3, see COPYING for more details.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+import re
+from rhodecode.lib.vcs.utils.lazy import LazyProperty
+def __get_lem():
+    """
+    Get language extension map based on what's inside pygments lexers
+    """
+    from pygments import lexers
+    from string import lower
+    from collections import defaultdict
+    d = defaultdict(lambda: [])
+    def __clean(s):
+        s = s.lstrip('*')
+        s = s.lstrip('.')
+        if s.find('[') != -1:
+            exts = []
+            start, stop = s.find('['), s.find(']')
+            for suffix in s[start + 1:stop]:
+                exts.append(s[:s.find('[')] + suffix)
+            return map(lower, exts)
+        else:
+            return map(lower, [s])
+    for lx, t in sorted(lexers.LEXERS.items()):
+        m = map(__clean, t[-2])
+        if m:
+            m = reduce(lambda x, y: x + y, m)
+            for ext in m:
+                desc = lx.replace('Lexer', '')
+                d[ext].append(desc)
+    return dict(d)
+def str2bool(_str):
+    """
+    returs True/False value from given string, it tries to translate the
+    string into boolean
+    :param _str: string value to translate into boolean
+    :rtype: boolean
+    :returns: boolean from given string
+    """
+    if _str is None:
+        return False
+    if _str in (True, False):
+        return _str
+    _str = str(_str).strip().lower()
+    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
+def convert_line_endings(line, mode):
+    """
+    Converts a given line  "line end" accordingly to given mode
+    Available modes are::
+        0 - Unix
+        1 - Mac
+        2 - DOS
+    :param line: given line to convert
+    :param mode: mode to convert to
+    :rtype: str
+    :return: converted line according to mode
+    """
+    from string import replace
+    if mode == 0:
+            line = replace(line, '\r\n', '\n')
+            line = replace(line, '\r', '\n')
+    elif mode == 1:
+            line = replace(line, '\r\n', '\r')
+            line = replace(line, '\n', '\r')
+    elif mode == 2:
+            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
+    return line
+def detect_mode(line, default):
+    """
+    Detects line break for given line, if line break couldn't be found
+    given default value is returned
+    :param line: str line
+    :param default: default
+    :rtype: int
+    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
+    """
+    if line.endswith('\r\n'):
+        return 2
+    elif line.endswith('\n'):
+        return 0
+    elif line.endswith('\r'):
+        return 1
+    else:
+        return default
+def generate_api_key(username, salt=None):
+    """
+    Generates unique API key for given username, if salt is not given
+    it'll be generated from some random string
+    :param username: username as string
+    :param salt: salt to hash generate KEY
+    :rtype: str
+    :returns: sha1 hash from username+salt
+    """
+    from tempfile import _RandomNameSequence
+    import hashlib
+    if salt is None:
+        salt = _RandomNameSequence().next()
+    return hashlib.sha1(username + salt).hexdigest()
+def safe_unicode(str_, from_encoding=None):
+    """
+    safe unicode function. Does few trick to turn str_ into unicode
+    In case of UnicodeDecode error we try to return it with encoding detected
+    by chardet library if it fails fallback to unicode with errors replaced
+    :param str_: string to decode
+    :rtype: unicode
+    :returns: unicode object
+    """
+    if isinstance(str_, unicode):
+        return str_
+    if not from_encoding:
+        import rhodecode
+        DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
+        from_encoding = DEFAULT_ENCODING
+    try:
+        return unicode(str_)
+    except UnicodeDecodeError:
+        pass
+    try:
+        return unicode(str_, from_encoding)
+    except UnicodeDecodeError:
+        pass
+    try:
+        import chardet
+        encoding = chardet.detect(str_)['encoding']
+        if encoding is None:
+            raise Exception()
+        return str_.decode(encoding)
+    except (ImportError, UnicodeDecodeError, Exception):
+        return unicode(str_, from_encoding, 'replace')
+def safe_str(unicode_, to_encoding=None):
+    """
+    safe str function. Does few trick to turn unicode_ into string
+    In case of UnicodeEncodeError we try to return it with encoding detected
+    by chardet library if it fails fallback to string with errors replaced
+    :param unicode_: unicode to encode
+    :rtype: str
+    :returns: str object
+    """
+    # if it's not basestr cast to str
+    if not isinstance(unicode_, basestring):
+        return str(unicode_)
+    if isinstance(unicode_, str):
+        return unicode_
+    if not to_encoding:
+        import rhodecode
+        DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
+        to_encoding = DEFAULT_ENCODING
+    try:
+        return unicode_.encode(to_encoding)
+    except UnicodeEncodeError:
+        pass
+    try:
+        import chardet
+        encoding = chardet.detect(unicode_)['encoding']
+        print encoding
+        if encoding is None:
+            raise UnicodeEncodeError()
+        return unicode_.encode(encoding)
+    except (ImportError, UnicodeEncodeError):
+        return unicode_.encode(to_encoding, 'replace')
+    return safe_str
+def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
+    """
+    Custom engine_from_config functions that makes sure we use NullPool for
+    file based sqlite databases. This prevents errors on sqlite. This only
+    applies to sqlalchemy versions < 0.7.0
+    """
+    import sqlalchemy
+    from sqlalchemy import engine_from_config as efc
+    import logging
+    if int(sqlalchemy.__version__.split('.')[1]) < 7:
+        # This solution should work for sqlalchemy < 0.7.0, and should use
+        # proxy=TimerProxy() for execution time profiling
+        from sqlalchemy.pool import NullPool
+        url = configuration[prefix + 'url']
+        if url.startswith('sqlite'):
+            kwargs.update({'poolclass': NullPool})
+        return efc(configuration, prefix, **kwargs)
+    else:
+        import time
+        from sqlalchemy import event
+        from sqlalchemy.engine import Engine
+        log = logging.getLogger('sqlalchemy.engine')
+        BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
+        engine = efc(configuration, prefix, **kwargs)
+        def color_sql(sql):
+            COLOR_SEQ = "\033[1;%dm"
+            COLOR_SQL = YELLOW
+            normal = '\x1b[0m'
+            return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
+        if configuration['debug']:
+            #attach events only for debug configuration
+            def before_cursor_execute(conn, cursor, statement,
+                                    parameters, context, executemany):
+                context._query_start_time = time.time()
+                log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
+            def after_cursor_execute(conn, cursor, statement,
+                                    parameters, context, executemany):
+                total = time.time() - context._query_start_time
+                log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
+            event.listen(engine, "before_cursor_execute",
+                         before_cursor_execute)
+            event.listen(engine, "after_cursor_execute",
+                         after_cursor_execute)
+    return engine
+def age(curdate):
+    """
+    turns a datetime into an age string.
+    :param curdate: datetime object
+    :rtype: unicode
+    :returns: unicode words describing age
+    """
+    from datetime import datetime
+    from webhelpers.date import time_ago_in_words
+    _ = lambda s: s
+    if not curdate:
+        return ''
+    agescales = [(_(u"year"), 3600 * 24 * 365),
+                 (_(u"month"), 3600 * 24 * 30),
+                 (_(u"day"), 3600 * 24),
+                 (_(u"hour"), 3600),
+                 (_(u"minute"), 60),
+                 (_(u"second"), 1), ]
+    age = datetime.now() - curdate
+    age_seconds = (age.days * agescales[2][1]) + age.seconds
+    pos = 1
+    for scale in agescales:
+        if scale[1] <= age_seconds:
+            if pos == 6:
+                pos = 5
+            return '%s %s' % (time_ago_in_words(curdate,
+                                                agescales[pos][0]), _('ago'))
+        pos += 1
+    return _(u'just now')
+def uri_filter(uri):
+    """
+    Removes user:password from given url string
+    :param uri:
+    :rtype: unicode
+    :returns: filtered list of strings
+    """
+    if not uri:
+        return ''
+    proto = ''
+    for pat in ('https://', 'http://'):
+        if uri.startswith(pat):
+            uri = uri[len(pat):]
+            proto = pat
+            break
+    # remove passwords and username
+    uri = uri[uri.find('@') + 1:]
+    # get the port
+    cred_pos = uri.find(':')
+    if cred_pos == -1:
+        host, port = uri, None
+    else:
+        host, port = uri[:cred_pos], uri[cred_pos + 1:]
+    return filter(None, [proto, host, port])
+def credentials_filter(uri):
+    """
+    Returns a url with removed credentials
+    :param uri:
+    """
+    uri = uri_filter(uri)
+    #check if we have port
+    if len(uri) > 2 and uri[2]:
+        uri[2] = ':' + uri[2]
+    return ''.join(uri)
+def get_changeset_safe(repo, rev):
+    """
+    Safe version of get_changeset if this changeset doesn't exists for a
+    repo it returns a Dummy one instead
+    :param repo:
+    :param rev:
+    """
+    from rhodecode.lib.vcs.backends.base import BaseRepository
+    from rhodecode.lib.vcs.exceptions import RepositoryError
+    if not isinstance(repo, BaseRepository):
+        raise Exception('You must pass an Repository '
+                        'object as first argument got %s', type(repo))
+    try:
+        cs = repo.get_changeset(rev)
+    except RepositoryError:
+        from rhodecode.lib.utils import EmptyChangeset
+        cs = EmptyChangeset(requested_revision=rev)
+    return cs
+def extract_mentioned_users(s):
+    """
+    Returns unique usernames from given string s that have @mention
+    :param s: string to get mentions
+    """
+    usrs = {}
+    for username in re.findall(r'(?:^@|\s@)(\w+)', s):
+        usrs[username] = username
+    return sorted(usrs.keys())