comparison rhodecode/lib/celerylib/tasks.py @ 547:1e757ac98988

renamed project to rhodecode
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 06 Oct 2010 03:18:16 +0200
parents pylons_app/lib/celerylib/tasks.py@72778dda34cf
children b75b77ef649d
comparison
equal deleted inserted replaced
546:7c2f5e4d7bbf 547:1e757ac98988
1 from celery.decorators import task
2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config
4 from operator import itemgetter
5 from pylons.i18n.translation import _
6 from rhodecode.lib.celerylib import run_task, locked_task
7 from rhodecode.lib.helpers import person
8 from rhodecode.lib.smtp_mailer import SmtpMailer
9 from rhodecode.lib.utils import OrderedDict
10 from time import mktime
11 from vcs.backends.hg import MercurialRepository
12 import json
13 import traceback
14
15 __all__ = ['whoosh_index', 'get_commits_stats',
16 'reset_user_password', 'send_email']
17
18 def get_session():
19 from sqlalchemy import engine_from_config
20 from sqlalchemy.orm import sessionmaker, scoped_session
21 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
22 sa = scoped_session(sessionmaker(bind=engine))
23 return sa
24
25 def get_hg_settings():
26 from rhodecode.model.db import HgAppSettings
27 try:
28 sa = get_session()
29 ret = sa.query(HgAppSettings).all()
30 finally:
31 sa.remove()
32
33 if not ret:
34 raise Exception('Could not get application settings !')
35 settings = {}
36 for each in ret:
37 settings['hg_app_' + each.app_settings_name] = each.app_settings_value
38
39 return settings
40
41 def get_hg_ui_settings():
42 from rhodecode.model.db import HgAppUi
43 try:
44 sa = get_session()
45 ret = sa.query(HgAppUi).all()
46 finally:
47 sa.remove()
48
49 if not ret:
50 raise Exception('Could not get application ui settings !')
51 settings = {}
52 for each in ret:
53 k = each.ui_key
54 v = each.ui_value
55 if k == '/':
56 k = 'root_path'
57
58 if k.find('.') != -1:
59 k = k.replace('.', '_')
60
61 if each.ui_section == 'hooks':
62 v = each.ui_active
63
64 settings[each.ui_section + '_' + k] = v
65
66 return settings
67
68 @task
69 @locked_task
70 def whoosh_index(repo_location, full_index):
71 log = whoosh_index.get_logger()
72 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
73 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
74
75 @task
76 @locked_task
77 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
78 from rhodecode.model.db import Statistics, Repository
79 log = get_commits_stats.get_logger()
80 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
81
82 commits_by_day_author_aggregate = {}
83 commits_by_day_aggregate = {}
84 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
85 repo = MercurialRepository(repos_path + repo_name)
86
87 skip_date_limit = True
88 parse_limit = 350 #limit for single task changeset parsing optimal for
89 last_rev = 0
90 last_cs = None
91 timegetter = itemgetter('time')
92
93 sa = get_session()
94
95 dbrepo = sa.query(Repository)\
96 .filter(Repository.repo_name == repo_name).scalar()
97 cur_stats = sa.query(Statistics)\
98 .filter(Statistics.repository == dbrepo).scalar()
99 if cur_stats:
100 last_rev = cur_stats.stat_on_revision
101 if not repo.revisions:
102 return True
103
104 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
105 #pass silently without any work if we're not on first revision or current
106 #state of parsing revision(from db marker) is the last revision
107 return True
108
109 if cur_stats:
110 commits_by_day_aggregate = OrderedDict(
111 json.loads(
112 cur_stats.commit_activity_combined))
113 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
114
115 log.debug('starting parsing %s', parse_limit)
116 for cnt, rev in enumerate(repo.revisions[last_rev:]):
117 last_cs = cs = repo.get_changeset(rev)
118 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
119 cs.date.timetuple()[2])
120 timetupple = [int(x) for x in k.split('-')]
121 timetupple.extend([0 for _ in xrange(6)])
122 k = mktime(timetupple)
123 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
124 try:
125 l = [timegetter(x) for x in commits_by_day_author_aggregate\
126 [author_key_cleaner(cs.author)]['data']]
127 time_pos = l.index(k)
128 except ValueError:
129 time_pos = False
130
131 if time_pos >= 0 and time_pos is not False:
132
133 datadict = commits_by_day_author_aggregate\
134 [author_key_cleaner(cs.author)]['data'][time_pos]
135
136 datadict["commits"] += 1
137 datadict["added"] += len(cs.added)
138 datadict["changed"] += len(cs.changed)
139 datadict["removed"] += len(cs.removed)
140 #print datadict
141
142 else:
143 #print 'ELSE !!!!'
144 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
145
146 datadict = {"time":k,
147 "commits":1,
148 "added":len(cs.added),
149 "changed":len(cs.changed),
150 "removed":len(cs.removed),
151 }
152 commits_by_day_author_aggregate\
153 [author_key_cleaner(cs.author)]['data'].append(datadict)
154
155 else:
156 #print k, 'nokey ADDING'
157 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
158 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
159 "label":author_key_cleaner(cs.author),
160 "data":[{"time":k,
161 "commits":1,
162 "added":len(cs.added),
163 "changed":len(cs.changed),
164 "removed":len(cs.removed),
165 }],
166 "schema":["commits"],
167 }
168
169 # #gather all data by day
170 if commits_by_day_aggregate.has_key(k):
171 commits_by_day_aggregate[k] += 1
172 else:
173 commits_by_day_aggregate[k] = 1
174
175 if cnt >= parse_limit:
176 #don't fetch to much data since we can freeze application
177 break
178
179 overview_data = []
180 for k, v in commits_by_day_aggregate.items():
181 overview_data.append([k, v])
182 overview_data = sorted(overview_data, key=itemgetter(0))
183
184 if not commits_by_day_author_aggregate:
185 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
186 "label":author_key_cleaner(repo.contact),
187 "data":[0, 1],
188 "schema":["commits"],
189 }
190
191 stats = cur_stats if cur_stats else Statistics()
192 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
193 stats.commit_activity_combined = json.dumps(overview_data)
194
195 log.debug('last revison %s', last_rev)
196 leftovers = len(repo.revisions[last_rev:])
197 log.debug('revisions to parse %s', leftovers)
198
199 if last_rev == 0 or leftovers < parse_limit:
200 stats.languages = json.dumps(__get_codes_stats(repo_name))
201
202 stats.repository = dbrepo
203 stats.stat_on_revision = last_cs.revision
204
205 try:
206 sa.add(stats)
207 sa.commit()
208 except:
209 log.error(traceback.format_exc())
210 sa.rollback()
211 return False
212 if len(repo.revisions) > 1:
213 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
214
215 return True
216
217 @task
218 def reset_user_password(user_email):
219 log = reset_user_password.get_logger()
220 from rhodecode.lib import auth
221 from rhodecode.model.db import User
222
223 try:
224 try:
225 sa = get_session()
226 user = sa.query(User).filter(User.email == user_email).scalar()
227 new_passwd = auth.PasswordGenerator().gen_password(8,
228 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
229 if user:
230 user.password = auth.get_crypt_password(new_passwd)
231 sa.add(user)
232 sa.commit()
233 log.info('change password for %s', user_email)
234 if new_passwd is None:
235 raise Exception('unable to generate new password')
236
237 except:
238 log.error(traceback.format_exc())
239 sa.rollback()
240
241 run_task(send_email, user_email,
242 "Your new hg-app password",
243 'Your new hg-app password:%s' % (new_passwd))
244 log.info('send new password mail to %s', user_email)
245
246
247 except:
248 log.error('Failed to update user password')
249 log.error(traceback.format_exc())
250 return True
251
252 @task
253 def send_email(recipients, subject, body):
254 log = send_email.get_logger()
255 email_config = dict(config.items('DEFAULT'))
256 mail_from = email_config.get('app_email_from')
257 user = email_config.get('smtp_username')
258 passwd = email_config.get('smtp_password')
259 mail_server = email_config.get('smtp_server')
260 mail_port = email_config.get('smtp_port')
261 tls = email_config.get('smtp_use_tls')
262 ssl = False
263
264 try:
265 m = SmtpMailer(mail_from, user, passwd, mail_server,
266 mail_port, ssl, tls)
267 m.send(recipients, subject, body)
268 except:
269 log.error('Mail sending failed')
270 log.error(traceback.format_exc())
271 return False
272 return True
273
274 @task
275 def create_repo_fork(form_data, cur_user):
276 import os
277 from rhodecode.model.repo_model import RepoModel
278 sa = get_session()
279 rm = RepoModel(sa)
280
281 rm.create(form_data, cur_user, just_db=True, fork=True)
282
283 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
284 repo_path = os.path.join(repos_path, form_data['repo_name'])
285 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
286
287 MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
288
289
290 def __get_codes_stats(repo_name):
291 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
292 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
293 'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
294 'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
295 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
296 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
297 'yaws']
298 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
299 repo = MercurialRepository(repos_path + repo_name)
300
301 code_stats = {}
302 for topnode, dirs, files in repo.walk('/', 'tip'):
303 for f in files:
304 k = f.mimetype
305 if f.extension in LANGUAGES_EXTENSIONS:
306 if code_stats.has_key(k):
307 code_stats[k] += 1
308 else:
309 code_stats[k] = 1
310
311 return code_stats or {}
312
313
314
315
316