comparison rhodecode/lib/celerylib/tasks.py @ 2031:82a88013a3fd

merge 1.3 into stable
author Marcin Kuzminski <marcin@python-works.com>
date Sun, 26 Feb 2012 17:25:09 +0200
parents afe8cfa32a0f 324ac367a4da
children dc2584ba5fbc
comparison
equal deleted inserted replaced
2005:ab0e122b38a7 2031:82a88013a3fd
26 from celery.decorators import task 26 from celery.decorators import task
27 27
28 import os 28 import os
29 import traceback 29 import traceback
30 import logging 30 import logging
31 from os.path import dirname as dn, join as jn 31 from os.path import join as jn
32 32
33 from time import mktime 33 from time import mktime
34 from operator import itemgetter 34 from operator import itemgetter
35 from string import lower 35 from string import lower
36 36
37 from pylons import config, url 37 from pylons import config, url
38 from pylons.i18n.translation import _ 38 from pylons.i18n.translation import _
39 39
40 from rhodecode.lib.vcs import get_backend
41
42 from rhodecode import CELERY_ON
40 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str 43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
41 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ 44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
42 __get_lockkey, LockHeld, DaemonLock, get_session, dbsession 45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
43 from rhodecode.lib.helpers import person 46 from rhodecode.lib.helpers import person
44 from rhodecode.lib.smtp_mailer import SmtpMailer 47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
45 from rhodecode.lib.utils import add_cache 48 from rhodecode.lib.utils import add_cache, action_logger
46 from rhodecode.lib.compat import json, OrderedDict 49 from rhodecode.lib.compat import json, OrderedDict
47 50
48 from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User 51 from rhodecode.model.db import Statistics, Repository, User
49 52
50 from vcs.backends import get_repo
51 from vcs import get_backend
52 53
53 add_cache(config) 54 add_cache(config)
54 55
55 __all__ = ['whoosh_index', 'get_commits_stats', 56 __all__ = ['whoosh_index', 'get_commits_stats',
56 'reset_user_password', 'send_email'] 57 'reset_user_password', 'send_email']
57 58
58 CELERY_ON = str2bool(config['app_conf'].get('use_celery')) 59
59 60 def get_logger(cls):
60 61 if CELERY_ON:
61 def get_repos_path(): 62 try:
62 sa = get_session() 63 log = cls.get_logger()
63 q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() 64 except:
64 return q.ui_value 65 log = logging.getLogger(__name__)
66 else:
67 log = logging.getLogger(__name__)
68
69 return log
65 70
66 71
67 @task(ignore_result=True) 72 @task(ignore_result=True)
68 @locked_task 73 @locked_task
69 @dbsession 74 @dbsession
70 def whoosh_index(repo_location, full_index): 75 def whoosh_index(repo_location, full_index):
71 #log = whoosh_index.get_logger()
72 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon 76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
77 log = whoosh_index.get_logger(whoosh_index)
78 DBS = get_session()
79
73 index_location = config['index_dir'] 80 index_location = config['index_dir']
74 WhooshIndexingDaemon(index_location=index_location, 81 WhooshIndexingDaemon(index_location=index_location,
75 repo_location=repo_location, sa=get_session())\ 82 repo_location=repo_location, sa=DBS)\
76 .run(full_index=full_index) 83 .run(full_index=full_index)
77 84
78 85
79 @task(ignore_result=True) 86 @task(ignore_result=True)
80 @dbsession 87 @dbsession
81 def get_commits_stats(repo_name, ts_min_y, ts_max_y): 88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
82 try: 89 log = get_logger(get_commits_stats)
83 log = get_commits_stats.get_logger() 90 DBS = get_session()
84 except:
85 log = logging.getLogger(__name__)
86
87 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y, 91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
88 ts_max_y) 92 ts_max_y)
89 lockkey_path = config['here'] 93 lockkey_path = config['here']
90 94
91 log.info('running task with lockkey %s', lockkey) 95 log.info('running task with lockkey %s' % lockkey)
96
92 try: 97 try:
93 sa = get_session()
94 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey)) 98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
95 99
96 # for js data compatibilty cleans the key for person from ' 100 # for js data compatibility cleans the key for person from '
97 akc = lambda k: person(k).replace('"', "") 101 akc = lambda k: person(k).replace('"', "")
98 102
99 co_day_auth_aggr = {} 103 co_day_auth_aggr = {}
100 commits_by_day_aggregate = {} 104 commits_by_day_aggregate = {}
101 repos_path = get_repos_path() 105 repo = Repository.get_by_repo_name(repo_name)
102 repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) 106 if repo is None:
107 return True
108
109 repo = repo.scm_instance
103 repo_size = repo.count() 110 repo_size = repo.count()
104 # return if repo have no revisions 111 # return if repo have no revisions
105 if repo_size < 1: 112 if repo_size < 1:
106 lock.release() 113 lock.release()
107 return True 114 return True
110 parse_limit = int(config['app_conf'].get('commit_parse_limit')) 117 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
111 last_rev = None 118 last_rev = None
112 last_cs = None 119 last_cs = None
113 timegetter = itemgetter('time') 120 timegetter = itemgetter('time')
114 121
115 dbrepo = sa.query(Repository)\ 122 dbrepo = DBS.query(Repository)\
116 .filter(Repository.repo_name == repo_name).scalar() 123 .filter(Repository.repo_name == repo_name).scalar()
117 cur_stats = sa.query(Statistics)\ 124 cur_stats = DBS.query(Statistics)\
118 .filter(Statistics.repository == dbrepo).scalar() 125 .filter(Statistics.repository == dbrepo).scalar()
119 126
120 if cur_stats is not None: 127 if cur_stats is not None:
121 last_rev = cur_stats.stat_on_revision 128 last_rev = cur_stats.stat_on_revision
122 129
130 if cur_stats: 137 if cur_stats:
131 commits_by_day_aggregate = OrderedDict(json.loads( 138 commits_by_day_aggregate = OrderedDict(json.loads(
132 cur_stats.commit_activity_combined)) 139 cur_stats.commit_activity_combined))
133 co_day_auth_aggr = json.loads(cur_stats.commit_activity) 140 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
134 141
135 log.debug('starting parsing %s', parse_limit) 142 log.debug('starting parsing %s' % parse_limit)
136 lmktime = mktime 143 lmktime = mktime
137 144
138 last_rev = last_rev + 1 if last_rev >= 0 else 0 145 last_rev = last_rev + 1 if last_rev >= 0 else 0
139 log.debug('Getting revisions from %s to %s' % ( 146 log.debug('Getting revisions from %s to %s' % (
140 last_rev, last_rev + parse_limit) 147 last_rev, last_rev + parse_limit)
205 212
206 stats = cur_stats if cur_stats else Statistics() 213 stats = cur_stats if cur_stats else Statistics()
207 stats.commit_activity = json.dumps(co_day_auth_aggr) 214 stats.commit_activity = json.dumps(co_day_auth_aggr)
208 stats.commit_activity_combined = json.dumps(overview_data) 215 stats.commit_activity_combined = json.dumps(overview_data)
209 216
210 log.debug('last revison %s', last_rev) 217 log.debug('last revison %s' % last_rev)
211 leftovers = len(repo.revisions[last_rev:]) 218 leftovers = len(repo.revisions[last_rev:])
212 log.debug('revisions to parse %s', leftovers) 219 log.debug('revisions to parse %s' % leftovers)
213 220
214 if last_rev == 0 or leftovers < parse_limit: 221 if last_rev == 0 or leftovers < parse_limit:
215 log.debug('getting code trending stats') 222 log.debug('getting code trending stats')
216 stats.languages = json.dumps(__get_codes_stats(repo_name)) 223 stats.languages = json.dumps(__get_codes_stats(repo_name))
217 224
218 try: 225 try:
219 stats.repository = dbrepo 226 stats.repository = dbrepo
220 stats.stat_on_revision = last_cs.revision if last_cs else 0 227 stats.stat_on_revision = last_cs.revision if last_cs else 0
221 sa.add(stats) 228 DBS.add(stats)
222 sa.commit() 229 DBS.commit()
223 except: 230 except:
224 log.error(traceback.format_exc()) 231 log.error(traceback.format_exc())
225 sa.rollback() 232 DBS.rollback()
226 lock.release() 233 lock.release()
227 return False 234 return False
228 235
229 # final release 236 #final release
230 lock.release() 237 lock.release()
231 238
232 # execute another task if celery is enabled 239 #execute another task if celery is enabled
233 if len(repo.revisions) > 1 and CELERY_ON: 240 if len(repo.revisions) > 1 and CELERY_ON:
234 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) 241 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
235 return True 242 return True
236 except LockHeld: 243 except LockHeld:
237 log.info('LockHeld') 244 log.info('LockHeld')
238 return 'Task with key %s already running' % lockkey 245 return 'Task with key %s already running' % lockkey
239 246
240 @task(ignore_result=True) 247 @task(ignore_result=True)
241 @dbsession 248 @dbsession
242 def send_password_link(user_email): 249 def send_password_link(user_email):
250 from rhodecode.model.notification import EmailNotificationModel
251
252 log = get_logger(send_password_link)
253 DBS = get_session()
254
243 try: 255 try:
244 log = reset_user_password.get_logger() 256 user = User.get_by_email(user_email)
245 except:
246 log = logging.getLogger(__name__)
247
248 from rhodecode.lib import auth
249
250 try:
251 sa = get_session()
252 user = sa.query(User).filter(User.email == user_email).scalar()
253
254 if user: 257 if user:
258 log.debug('password reset user found %s' % user)
255 link = url('reset_password_confirmation', key=user.api_key, 259 link = url('reset_password_confirmation', key=user.api_key,
256 qualified=True) 260 qualified=True)
257 tmpl = """ 261 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
258 Hello %s 262 body = EmailNotificationModel().get_email_tmpl(reg_type,
259 263 **{'user':user.short_contact,
260 We received a request to create a new password for your account. 264 'reset_url':link})
261 265 log.debug('sending email')
262 You can generate it by clicking following URL:
263
264 %s
265
266 If you didn't request new password please ignore this email.
267 """
268 run_task(send_email, user_email, 266 run_task(send_email, user_email,
269 "RhodeCode password reset link", 267 _("password reset link"), body)
270 tmpl % (user.short_contact, link)) 268 log.info('send new password mail to %s' % user_email)
271 log.info('send new password mail to %s', user_email) 269 else:
272 270 log.debug("password reset email %s not found" % user_email)
273 except: 271 except:
274 log.error('Failed to update user password')
275 log.error(traceback.format_exc()) 272 log.error(traceback.format_exc())
276 return False 273 return False
277 274
278 return True 275 return True
279 276
280 @task(ignore_result=True) 277 @task(ignore_result=True)
281 @dbsession 278 @dbsession
282 def reset_user_password(user_email): 279 def reset_user_password(user_email):
283 try:
284 log = reset_user_password.get_logger()
285 except:
286 log = logging.getLogger(__name__)
287
288 from rhodecode.lib import auth 280 from rhodecode.lib import auth
281
282 log = get_logger(reset_user_password)
283 DBS = get_session()
289 284
290 try: 285 try:
291 try: 286 try:
292 sa = get_session() 287 user = User.get_by_email(user_email)
293 user = sa.query(User).filter(User.email == user_email).scalar()
294 new_passwd = auth.PasswordGenerator().gen_password(8, 288 new_passwd = auth.PasswordGenerator().gen_password(8,
295 auth.PasswordGenerator.ALPHABETS_BIG_SMALL) 289 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
296 if user: 290 if user:
297 user.password = auth.get_crypt_password(new_passwd) 291 user.password = auth.get_crypt_password(new_passwd)
298 user.api_key = auth.generate_api_key(user.username) 292 user.api_key = auth.generate_api_key(user.username)
299 sa.add(user) 293 DBS.add(user)
300 sa.commit() 294 DBS.commit()
301 log.info('change password for %s', user_email) 295 log.info('change password for %s' % user_email)
302 if new_passwd is None: 296 if new_passwd is None:
303 raise Exception('unable to generate new password') 297 raise Exception('unable to generate new password')
304
305 except: 298 except:
306 log.error(traceback.format_exc()) 299 log.error(traceback.format_exc())
307 sa.rollback() 300 DBS.rollback()
308 301
309 run_task(send_email, user_email, 302 run_task(send_email, user_email,
310 "Your new RhodeCode password", 303 'Your new password',
311 'Your new RhodeCode password:%s' % (new_passwd)) 304 'Your new RhodeCode password:%s' % (new_passwd))
312 log.info('send new password mail to %s', user_email) 305 log.info('send new password mail to %s' % user_email)
313 306
314 except: 307 except:
315 log.error('Failed to update user password') 308 log.error('Failed to update user password')
316 log.error(traceback.format_exc()) 309 log.error(traceback.format_exc())
317 310
318 return True 311 return True
319 312
320 313
321 @task(ignore_result=True) 314 @task(ignore_result=True)
322 @dbsession 315 @dbsession
323 def send_email(recipients, subject, body): 316 def send_email(recipients, subject, body, html_body=''):
324 """ 317 """
325 Sends an email with defined parameters from the .ini files. 318 Sends an email with defined parameters from the .ini files.
326 319
327 :param recipients: list of recipients, it this is empty the defined email 320 :param recipients: list of recipients, it this is empty the defined email
328 address from field 'email_to' is used instead 321 address from field 'email_to' is used instead
329 :param subject: subject of the mail 322 :param subject: subject of the mail
330 :param body: body of the mail 323 :param body: body of the mail
324 :param html_body: html version of body
331 """ 325 """
332 try: 326 log = get_logger(send_email)
333 log = send_email.get_logger() 327 DBS = get_session()
334 except: 328
335 log = logging.getLogger(__name__)
336
337 sa = get_session()
338 email_config = config 329 email_config = config
339 330 subject = "%s %s" % (email_config.get('email_prefix'), subject)
340 if not recipients: 331 if not recipients:
341 # if recipients are not defined we send to email_config + all admins 332 # if recipients are not defined we send to email_config + all admins
342 admins = [ 333 admins = [u.email for u in User.query()
343 u.email for u in sa.query(User).filter(User.admin==True).all() 334 .filter(User.admin == True).all()]
344 ]
345 recipients = [email_config.get('email_to')] + admins 335 recipients = [email_config.get('email_to')] + admins
346 336
347 mail_from = email_config.get('app_email_from') 337 mail_from = email_config.get('app_email_from', 'RhodeCode')
348 user = email_config.get('smtp_username') 338 user = email_config.get('smtp_username')
349 passwd = email_config.get('smtp_password') 339 passwd = email_config.get('smtp_password')
350 mail_server = email_config.get('smtp_server') 340 mail_server = email_config.get('smtp_server')
351 mail_port = email_config.get('smtp_port') 341 mail_port = email_config.get('smtp_port')
352 tls = str2bool(email_config.get('smtp_use_tls')) 342 tls = str2bool(email_config.get('smtp_use_tls'))
353 ssl = str2bool(email_config.get('smtp_use_ssl')) 343 ssl = str2bool(email_config.get('smtp_use_ssl'))
354 debug = str2bool(config.get('debug')) 344 debug = str2bool(config.get('debug'))
355 smtp_auth = email_config.get('smtp_auth') 345 smtp_auth = email_config.get('smtp_auth')
356 346
357 try: 347 try:
358 m = SmtpMailer(mail_from, user, passwd, mail_server,smtp_auth, 348 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
359 mail_port, ssl, tls, debug=debug) 349 mail_port, ssl, tls, debug=debug)
360 m.send(recipients, subject, body) 350 m.send(recipients, subject, body, html_body)
361 except: 351 except:
362 log.error('Mail sending failed') 352 log.error('Mail sending failed')
363 log.error(traceback.format_exc()) 353 log.error(traceback.format_exc())
364 return False 354 return False
365 return True 355 return True
366 356
367 357
368 @task(ignore_result=True) 358 @task(ignore_result=True)
369 @dbsession 359 @dbsession
370 def create_repo_fork(form_data, cur_user): 360 def create_repo_fork(form_data, cur_user):
361 """
362 Creates a fork of repository using interval VCS methods
363
364 :param form_data:
365 :param cur_user:
366 """
371 from rhodecode.model.repo import RepoModel 367 from rhodecode.model.repo import RepoModel
372 368
373 try: 369 log = get_logger(create_repo_fork)
374 log = create_repo_fork.get_logger() 370 DBS = get_session()
375 except: 371
376 log = logging.getLogger(__name__) 372 base_path = Repository.base_path()
377 373
378 repo_model = RepoModel(get_session()) 374 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
379 repo_model.create(form_data, cur_user, just_db=True, fork=True) 375
380 repo_name = form_data['repo_name']
381 repos_path = get_repos_path()
382 repo_path = os.path.join(repos_path, repo_name)
383 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
384 alias = form_data['repo_type'] 376 alias = form_data['repo_type']
385 377 org_repo_name = form_data['org_path']
386 log.info('creating repo fork %s as %s', repo_name, repo_path) 378 fork_name = form_data['repo_name_full']
379 update_after_clone = form_data['update_after_clone']
380 source_repo_path = os.path.join(base_path, org_repo_name)
381 destination_fork_path = os.path.join(base_path, fork_name)
382
383 log.info('creating fork of %s as %s', source_repo_path,
384 destination_fork_path)
387 backend = get_backend(alias) 385 backend = get_backend(alias)
388 backend(str(repo_fork_path), create=True, src_url=str(repo_path)) 386 backend(safe_str(destination_fork_path), create=True,
389 387 src_url=safe_str(source_repo_path),
388 update_after_clone=update_after_clone)
389 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
390 org_repo_name, '', DBS)
391
392 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
393 fork_name, '', DBS)
394 # finally commit at latest possible stage
395 DBS.commit()
390 396
391 def __get_codes_stats(repo_name): 397 def __get_codes_stats(repo_name):
392 repos_path = get_repos_path() 398 repo = Repository.get_by_repo_name(repo_name).scm_instance
393 repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) 399
394 tip = repo.get_changeset() 400 tip = repo.get_changeset()
395 code_stats = {} 401 code_stats = {}
396 402
397 def aggregate(cs): 403 def aggregate(cs):
398 for f in cs[2]: 404 for f in cs[2]: