comparison rhodecode/lib/utils2.py @ 2109:8ecfed1d8f8b beta

utils/conf - created temporary utils2 - made config.conf for storing some configurations - fixed some dependency import problems - code cleanup - rc-extensions now properly work for celery
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 07 Mar 2012 02:18:22 +0200
parents
children ea5ff843b200
comparison
equal deleted inserted replaced
2108:9e377342802c 2109:8ecfed1d8f8b
1 # -*- coding: utf-8 -*-
2 """
3 rhodecode.lib.utils
4 ~~~~~~~~~~~~~~~~~~~
5
6 Some simple helper functions
7
8 :created_on: Jan 5, 2011
9 :author: marcink
10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
12 """
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
17 #
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
22 #
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
26 import re
27 from rhodecode.lib.vcs.utils.lazy import LazyProperty
28
29
30 def __get_lem():
31 """
32 Get language extension map based on what's inside pygments lexers
33 """
34 from pygments import lexers
35 from string import lower
36 from collections import defaultdict
37
38 d = defaultdict(lambda: [])
39
40 def __clean(s):
41 s = s.lstrip('*')
42 s = s.lstrip('.')
43
44 if s.find('[') != -1:
45 exts = []
46 start, stop = s.find('['), s.find(']')
47
48 for suffix in s[start + 1:stop]:
49 exts.append(s[:s.find('[')] + suffix)
50 return map(lower, exts)
51 else:
52 return map(lower, [s])
53
54 for lx, t in sorted(lexers.LEXERS.items()):
55 m = map(__clean, t[-2])
56 if m:
57 m = reduce(lambda x, y: x + y, m)
58 for ext in m:
59 desc = lx.replace('Lexer', '')
60 d[ext].append(desc)
61
62 return dict(d)
63
64 def str2bool(_str):
65 """
66 returs True/False value from given string, it tries to translate the
67 string into boolean
68
69 :param _str: string value to translate into boolean
70 :rtype: boolean
71 :returns: boolean from given string
72 """
73 if _str is None:
74 return False
75 if _str in (True, False):
76 return _str
77 _str = str(_str).strip().lower()
78 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
79
80
81 def convert_line_endings(line, mode):
82 """
83 Converts a given line "line end" accordingly to given mode
84
85 Available modes are::
86 0 - Unix
87 1 - Mac
88 2 - DOS
89
90 :param line: given line to convert
91 :param mode: mode to convert to
92 :rtype: str
93 :return: converted line according to mode
94 """
95 from string import replace
96
97 if mode == 0:
98 line = replace(line, '\r\n', '\n')
99 line = replace(line, '\r', '\n')
100 elif mode == 1:
101 line = replace(line, '\r\n', '\r')
102 line = replace(line, '\n', '\r')
103 elif mode == 2:
104 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
105 return line
106
107
108 def detect_mode(line, default):
109 """
110 Detects line break for given line, if line break couldn't be found
111 given default value is returned
112
113 :param line: str line
114 :param default: default
115 :rtype: int
116 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
117 """
118 if line.endswith('\r\n'):
119 return 2
120 elif line.endswith('\n'):
121 return 0
122 elif line.endswith('\r'):
123 return 1
124 else:
125 return default
126
127
128 def generate_api_key(username, salt=None):
129 """
130 Generates unique API key for given username, if salt is not given
131 it'll be generated from some random string
132
133 :param username: username as string
134 :param salt: salt to hash generate KEY
135 :rtype: str
136 :returns: sha1 hash from username+salt
137 """
138 from tempfile import _RandomNameSequence
139 import hashlib
140
141 if salt is None:
142 salt = _RandomNameSequence().next()
143
144 return hashlib.sha1(username + salt).hexdigest()
145
146
147 def safe_unicode(str_, from_encoding=None):
148 """
149 safe unicode function. Does few trick to turn str_ into unicode
150
151 In case of UnicodeDecode error we try to return it with encoding detected
152 by chardet library if it fails fallback to unicode with errors replaced
153
154 :param str_: string to decode
155 :rtype: unicode
156 :returns: unicode object
157 """
158 if isinstance(str_, unicode):
159 return str_
160
161 if not from_encoding:
162 import rhodecode
163 DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
164 from_encoding = DEFAULT_ENCODING
165
166 try:
167 return unicode(str_)
168 except UnicodeDecodeError:
169 pass
170
171 try:
172 return unicode(str_, from_encoding)
173 except UnicodeDecodeError:
174 pass
175
176 try:
177 import chardet
178 encoding = chardet.detect(str_)['encoding']
179 if encoding is None:
180 raise Exception()
181 return str_.decode(encoding)
182 except (ImportError, UnicodeDecodeError, Exception):
183 return unicode(str_, from_encoding, 'replace')
184
185
186 def safe_str(unicode_, to_encoding=None):
187 """
188 safe str function. Does few trick to turn unicode_ into string
189
190 In case of UnicodeEncodeError we try to return it with encoding detected
191 by chardet library if it fails fallback to string with errors replaced
192
193 :param unicode_: unicode to encode
194 :rtype: str
195 :returns: str object
196 """
197
198 # if it's not basestr cast to str
199 if not isinstance(unicode_, basestring):
200 return str(unicode_)
201
202 if isinstance(unicode_, str):
203 return unicode_
204
205 if not to_encoding:
206 import rhodecode
207 DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
208 to_encoding = DEFAULT_ENCODING
209
210 try:
211 return unicode_.encode(to_encoding)
212 except UnicodeEncodeError:
213 pass
214
215 try:
216 import chardet
217 encoding = chardet.detect(unicode_)['encoding']
218 print encoding
219 if encoding is None:
220 raise UnicodeEncodeError()
221
222 return unicode_.encode(encoding)
223 except (ImportError, UnicodeEncodeError):
224 return unicode_.encode(to_encoding, 'replace')
225
226 return safe_str
227
228
229 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
230 """
231 Custom engine_from_config functions that makes sure we use NullPool for
232 file based sqlite databases. This prevents errors on sqlite. This only
233 applies to sqlalchemy versions < 0.7.0
234
235 """
236 import sqlalchemy
237 from sqlalchemy import engine_from_config as efc
238 import logging
239
240 if int(sqlalchemy.__version__.split('.')[1]) < 7:
241
242 # This solution should work for sqlalchemy < 0.7.0, and should use
243 # proxy=TimerProxy() for execution time profiling
244
245 from sqlalchemy.pool import NullPool
246 url = configuration[prefix + 'url']
247
248 if url.startswith('sqlite'):
249 kwargs.update({'poolclass': NullPool})
250 return efc(configuration, prefix, **kwargs)
251 else:
252 import time
253 from sqlalchemy import event
254 from sqlalchemy.engine import Engine
255
256 log = logging.getLogger('sqlalchemy.engine')
257 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
258 engine = efc(configuration, prefix, **kwargs)
259
260 def color_sql(sql):
261 COLOR_SEQ = "\033[1;%dm"
262 COLOR_SQL = YELLOW
263 normal = '\x1b[0m'
264 return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
265
266 if configuration['debug']:
267 #attach events only for debug configuration
268
269 def before_cursor_execute(conn, cursor, statement,
270 parameters, context, executemany):
271 context._query_start_time = time.time()
272 log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
273
274
275 def after_cursor_execute(conn, cursor, statement,
276 parameters, context, executemany):
277 total = time.time() - context._query_start_time
278 log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
279
280 event.listen(engine, "before_cursor_execute",
281 before_cursor_execute)
282 event.listen(engine, "after_cursor_execute",
283 after_cursor_execute)
284
285 return engine
286
287
288 def age(curdate):
289 """
290 turns a datetime into an age string.
291
292 :param curdate: datetime object
293 :rtype: unicode
294 :returns: unicode words describing age
295 """
296
297 from datetime import datetime
298 from webhelpers.date import time_ago_in_words
299
300 _ = lambda s: s
301
302 if not curdate:
303 return ''
304
305 agescales = [(_(u"year"), 3600 * 24 * 365),
306 (_(u"month"), 3600 * 24 * 30),
307 (_(u"day"), 3600 * 24),
308 (_(u"hour"), 3600),
309 (_(u"minute"), 60),
310 (_(u"second"), 1), ]
311
312 age = datetime.now() - curdate
313 age_seconds = (age.days * agescales[2][1]) + age.seconds
314 pos = 1
315 for scale in agescales:
316 if scale[1] <= age_seconds:
317 if pos == 6:
318 pos = 5
319 return '%s %s' % (time_ago_in_words(curdate,
320 agescales[pos][0]), _('ago'))
321 pos += 1
322
323 return _(u'just now')
324
325
326 def uri_filter(uri):
327 """
328 Removes user:password from given url string
329
330 :param uri:
331 :rtype: unicode
332 :returns: filtered list of strings
333 """
334 if not uri:
335 return ''
336
337 proto = ''
338
339 for pat in ('https://', 'http://'):
340 if uri.startswith(pat):
341 uri = uri[len(pat):]
342 proto = pat
343 break
344
345 # remove passwords and username
346 uri = uri[uri.find('@') + 1:]
347
348 # get the port
349 cred_pos = uri.find(':')
350 if cred_pos == -1:
351 host, port = uri, None
352 else:
353 host, port = uri[:cred_pos], uri[cred_pos + 1:]
354
355 return filter(None, [proto, host, port])
356
357
358 def credentials_filter(uri):
359 """
360 Returns a url with removed credentials
361
362 :param uri:
363 """
364
365 uri = uri_filter(uri)
366 #check if we have port
367 if len(uri) > 2 and uri[2]:
368 uri[2] = ':' + uri[2]
369
370 return ''.join(uri)
371
372
373 def get_changeset_safe(repo, rev):
374 """
375 Safe version of get_changeset if this changeset doesn't exists for a
376 repo it returns a Dummy one instead
377
378 :param repo:
379 :param rev:
380 """
381 from rhodecode.lib.vcs.backends.base import BaseRepository
382 from rhodecode.lib.vcs.exceptions import RepositoryError
383 if not isinstance(repo, BaseRepository):
384 raise Exception('You must pass an Repository '
385 'object as first argument got %s', type(repo))
386
387 try:
388 cs = repo.get_changeset(rev)
389 except RepositoryError:
390 from rhodecode.lib.utils import EmptyChangeset
391 cs = EmptyChangeset(requested_revision=rev)
392 return cs
393
394
395 def extract_mentioned_users(s):
396 """
397 Returns unique usernames from given string s that have @mention
398
399 :param s: string to get mentions
400 """
401 usrs = {}
402 for username in re.findall(r'(?:^@|\s@)(\w+)', s):
403 usrs[username] = username
404
405 return sorted(usrs.keys())