Mercurial > kallithea
comparison rhodecode/lib/utils2.py @ 2109:8ecfed1d8f8b beta
utils/conf
- created temporary utils2
- made config.conf for storing some configurations
- fixed some dependency import problems
- code cleanup
- rc-extensions now properly work for celery
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 07 Mar 2012 02:18:22 +0200 |
parents | |
children | ea5ff843b200 |
comparison
equal
deleted
inserted
replaced
2108:9e377342802c | 2109:8ecfed1d8f8b |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """ | |
3 rhodecode.lib.utils | |
4 ~~~~~~~~~~~~~~~~~~~ | |
5 | |
6 Some simple helper functions | |
7 | |
8 :created_on: Jan 5, 2011 | |
9 :author: marcink | |
10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com> | |
11 :license: GPLv3, see COPYING for more details. | |
12 """ | |
13 # This program is free software: you can redistribute it and/or modify | |
14 # it under the terms of the GNU General Public License as published by | |
15 # the Free Software Foundation, either version 3 of the License, or | |
16 # (at your option) any later version. | |
17 # | |
18 # This program is distributed in the hope that it will be useful, | |
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
21 # GNU General Public License for more details. | |
22 # | |
23 # You should have received a copy of the GNU General Public License | |
24 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
25 | |
26 import re | |
27 from rhodecode.lib.vcs.utils.lazy import LazyProperty | |
28 | |
29 | |
30 def __get_lem(): | |
31 """ | |
32 Get language extension map based on what's inside pygments lexers | |
33 """ | |
34 from pygments import lexers | |
35 from string import lower | |
36 from collections import defaultdict | |
37 | |
38 d = defaultdict(lambda: []) | |
39 | |
40 def __clean(s): | |
41 s = s.lstrip('*') | |
42 s = s.lstrip('.') | |
43 | |
44 if s.find('[') != -1: | |
45 exts = [] | |
46 start, stop = s.find('['), s.find(']') | |
47 | |
48 for suffix in s[start + 1:stop]: | |
49 exts.append(s[:s.find('[')] + suffix) | |
50 return map(lower, exts) | |
51 else: | |
52 return map(lower, [s]) | |
53 | |
54 for lx, t in sorted(lexers.LEXERS.items()): | |
55 m = map(__clean, t[-2]) | |
56 if m: | |
57 m = reduce(lambda x, y: x + y, m) | |
58 for ext in m: | |
59 desc = lx.replace('Lexer', '') | |
60 d[ext].append(desc) | |
61 | |
62 return dict(d) | |
63 | |
64 def str2bool(_str): | |
65 """ | |
66 returs True/False value from given string, it tries to translate the | |
67 string into boolean | |
68 | |
69 :param _str: string value to translate into boolean | |
70 :rtype: boolean | |
71 :returns: boolean from given string | |
72 """ | |
73 if _str is None: | |
74 return False | |
75 if _str in (True, False): | |
76 return _str | |
77 _str = str(_str).strip().lower() | |
78 return _str in ('t', 'true', 'y', 'yes', 'on', '1') | |
79 | |
80 | |
81 def convert_line_endings(line, mode): | |
82 """ | |
83 Converts a given line "line end" accordingly to given mode | |
84 | |
85 Available modes are:: | |
86 0 - Unix | |
87 1 - Mac | |
88 2 - DOS | |
89 | |
90 :param line: given line to convert | |
91 :param mode: mode to convert to | |
92 :rtype: str | |
93 :return: converted line according to mode | |
94 """ | |
95 from string import replace | |
96 | |
97 if mode == 0: | |
98 line = replace(line, '\r\n', '\n') | |
99 line = replace(line, '\r', '\n') | |
100 elif mode == 1: | |
101 line = replace(line, '\r\n', '\r') | |
102 line = replace(line, '\n', '\r') | |
103 elif mode == 2: | |
104 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line) | |
105 return line | |
106 | |
107 | |
108 def detect_mode(line, default): | |
109 """ | |
110 Detects line break for given line, if line break couldn't be found | |
111 given default value is returned | |
112 | |
113 :param line: str line | |
114 :param default: default | |
115 :rtype: int | |
116 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS | |
117 """ | |
118 if line.endswith('\r\n'): | |
119 return 2 | |
120 elif line.endswith('\n'): | |
121 return 0 | |
122 elif line.endswith('\r'): | |
123 return 1 | |
124 else: | |
125 return default | |
126 | |
127 | |
128 def generate_api_key(username, salt=None): | |
129 """ | |
130 Generates unique API key for given username, if salt is not given | |
131 it'll be generated from some random string | |
132 | |
133 :param username: username as string | |
134 :param salt: salt to hash generate KEY | |
135 :rtype: str | |
136 :returns: sha1 hash from username+salt | |
137 """ | |
138 from tempfile import _RandomNameSequence | |
139 import hashlib | |
140 | |
141 if salt is None: | |
142 salt = _RandomNameSequence().next() | |
143 | |
144 return hashlib.sha1(username + salt).hexdigest() | |
145 | |
146 | |
147 def safe_unicode(str_, from_encoding=None): | |
148 """ | |
149 safe unicode function. Does few trick to turn str_ into unicode | |
150 | |
151 In case of UnicodeDecode error we try to return it with encoding detected | |
152 by chardet library if it fails fallback to unicode with errors replaced | |
153 | |
154 :param str_: string to decode | |
155 :rtype: unicode | |
156 :returns: unicode object | |
157 """ | |
158 if isinstance(str_, unicode): | |
159 return str_ | |
160 | |
161 if not from_encoding: | |
162 import rhodecode | |
163 DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8') | |
164 from_encoding = DEFAULT_ENCODING | |
165 | |
166 try: | |
167 return unicode(str_) | |
168 except UnicodeDecodeError: | |
169 pass | |
170 | |
171 try: | |
172 return unicode(str_, from_encoding) | |
173 except UnicodeDecodeError: | |
174 pass | |
175 | |
176 try: | |
177 import chardet | |
178 encoding = chardet.detect(str_)['encoding'] | |
179 if encoding is None: | |
180 raise Exception() | |
181 return str_.decode(encoding) | |
182 except (ImportError, UnicodeDecodeError, Exception): | |
183 return unicode(str_, from_encoding, 'replace') | |
184 | |
185 | |
186 def safe_str(unicode_, to_encoding=None): | |
187 """ | |
188 safe str function. Does few trick to turn unicode_ into string | |
189 | |
190 In case of UnicodeEncodeError we try to return it with encoding detected | |
191 by chardet library if it fails fallback to string with errors replaced | |
192 | |
193 :param unicode_: unicode to encode | |
194 :rtype: str | |
195 :returns: str object | |
196 """ | |
197 | |
198 # if it's not basestr cast to str | |
199 if not isinstance(unicode_, basestring): | |
200 return str(unicode_) | |
201 | |
202 if isinstance(unicode_, str): | |
203 return unicode_ | |
204 | |
205 if not to_encoding: | |
206 import rhodecode | |
207 DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8') | |
208 to_encoding = DEFAULT_ENCODING | |
209 | |
210 try: | |
211 return unicode_.encode(to_encoding) | |
212 except UnicodeEncodeError: | |
213 pass | |
214 | |
215 try: | |
216 import chardet | |
217 encoding = chardet.detect(unicode_)['encoding'] | |
218 print encoding | |
219 if encoding is None: | |
220 raise UnicodeEncodeError() | |
221 | |
222 return unicode_.encode(encoding) | |
223 except (ImportError, UnicodeEncodeError): | |
224 return unicode_.encode(to_encoding, 'replace') | |
225 | |
226 return safe_str | |
227 | |
228 | |
229 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs): | |
230 """ | |
231 Custom engine_from_config functions that makes sure we use NullPool for | |
232 file based sqlite databases. This prevents errors on sqlite. This only | |
233 applies to sqlalchemy versions < 0.7.0 | |
234 | |
235 """ | |
236 import sqlalchemy | |
237 from sqlalchemy import engine_from_config as efc | |
238 import logging | |
239 | |
240 if int(sqlalchemy.__version__.split('.')[1]) < 7: | |
241 | |
242 # This solution should work for sqlalchemy < 0.7.0, and should use | |
243 # proxy=TimerProxy() for execution time profiling | |
244 | |
245 from sqlalchemy.pool import NullPool | |
246 url = configuration[prefix + 'url'] | |
247 | |
248 if url.startswith('sqlite'): | |
249 kwargs.update({'poolclass': NullPool}) | |
250 return efc(configuration, prefix, **kwargs) | |
251 else: | |
252 import time | |
253 from sqlalchemy import event | |
254 from sqlalchemy.engine import Engine | |
255 | |
256 log = logging.getLogger('sqlalchemy.engine') | |
257 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38) | |
258 engine = efc(configuration, prefix, **kwargs) | |
259 | |
260 def color_sql(sql): | |
261 COLOR_SEQ = "\033[1;%dm" | |
262 COLOR_SQL = YELLOW | |
263 normal = '\x1b[0m' | |
264 return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal]) | |
265 | |
266 if configuration['debug']: | |
267 #attach events only for debug configuration | |
268 | |
269 def before_cursor_execute(conn, cursor, statement, | |
270 parameters, context, executemany): | |
271 context._query_start_time = time.time() | |
272 log.info(color_sql(">>>>> STARTING QUERY >>>>>")) | |
273 | |
274 | |
275 def after_cursor_execute(conn, cursor, statement, | |
276 parameters, context, executemany): | |
277 total = time.time() - context._query_start_time | |
278 log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) | |
279 | |
280 event.listen(engine, "before_cursor_execute", | |
281 before_cursor_execute) | |
282 event.listen(engine, "after_cursor_execute", | |
283 after_cursor_execute) | |
284 | |
285 return engine | |
286 | |
287 | |
288 def age(curdate): | |
289 """ | |
290 turns a datetime into an age string. | |
291 | |
292 :param curdate: datetime object | |
293 :rtype: unicode | |
294 :returns: unicode words describing age | |
295 """ | |
296 | |
297 from datetime import datetime | |
298 from webhelpers.date import time_ago_in_words | |
299 | |
300 _ = lambda s: s | |
301 | |
302 if not curdate: | |
303 return '' | |
304 | |
305 agescales = [(_(u"year"), 3600 * 24 * 365), | |
306 (_(u"month"), 3600 * 24 * 30), | |
307 (_(u"day"), 3600 * 24), | |
308 (_(u"hour"), 3600), | |
309 (_(u"minute"), 60), | |
310 (_(u"second"), 1), ] | |
311 | |
312 age = datetime.now() - curdate | |
313 age_seconds = (age.days * agescales[2][1]) + age.seconds | |
314 pos = 1 | |
315 for scale in agescales: | |
316 if scale[1] <= age_seconds: | |
317 if pos == 6: | |
318 pos = 5 | |
319 return '%s %s' % (time_ago_in_words(curdate, | |
320 agescales[pos][0]), _('ago')) | |
321 pos += 1 | |
322 | |
323 return _(u'just now') | |
324 | |
325 | |
326 def uri_filter(uri): | |
327 """ | |
328 Removes user:password from given url string | |
329 | |
330 :param uri: | |
331 :rtype: unicode | |
332 :returns: filtered list of strings | |
333 """ | |
334 if not uri: | |
335 return '' | |
336 | |
337 proto = '' | |
338 | |
339 for pat in ('https://', 'http://'): | |
340 if uri.startswith(pat): | |
341 uri = uri[len(pat):] | |
342 proto = pat | |
343 break | |
344 | |
345 # remove passwords and username | |
346 uri = uri[uri.find('@') + 1:] | |
347 | |
348 # get the port | |
349 cred_pos = uri.find(':') | |
350 if cred_pos == -1: | |
351 host, port = uri, None | |
352 else: | |
353 host, port = uri[:cred_pos], uri[cred_pos + 1:] | |
354 | |
355 return filter(None, [proto, host, port]) | |
356 | |
357 | |
358 def credentials_filter(uri): | |
359 """ | |
360 Returns a url with removed credentials | |
361 | |
362 :param uri: | |
363 """ | |
364 | |
365 uri = uri_filter(uri) | |
366 #check if we have port | |
367 if len(uri) > 2 and uri[2]: | |
368 uri[2] = ':' + uri[2] | |
369 | |
370 return ''.join(uri) | |
371 | |
372 | |
373 def get_changeset_safe(repo, rev): | |
374 """ | |
375 Safe version of get_changeset if this changeset doesn't exists for a | |
376 repo it returns a Dummy one instead | |
377 | |
378 :param repo: | |
379 :param rev: | |
380 """ | |
381 from rhodecode.lib.vcs.backends.base import BaseRepository | |
382 from rhodecode.lib.vcs.exceptions import RepositoryError | |
383 if not isinstance(repo, BaseRepository): | |
384 raise Exception('You must pass an Repository ' | |
385 'object as first argument got %s', type(repo)) | |
386 | |
387 try: | |
388 cs = repo.get_changeset(rev) | |
389 except RepositoryError: | |
390 from rhodecode.lib.utils import EmptyChangeset | |
391 cs = EmptyChangeset(requested_revision=rev) | |
392 return cs | |
393 | |
394 | |
395 def extract_mentioned_users(s): | |
396 """ | |
397 Returns unique usernames from given string s that have @mention | |
398 | |
399 :param s: string to get mentions | |
400 """ | |
401 usrs = {} | |
402 for username in re.findall(r'(?:^@|\s@)(\w+)', s): | |
403 usrs[username] = username | |
404 | |
405 return sorted(usrs.keys()) |