# HG changeset patch # User Mads Kiilerich # Date 1473115878 -7200 # Node ID 82662f9faaf4d1317f8643bea54b55ea99f49b96 # Parent 81c13cdbe91f14b183f4f1208ae1c322bb34540e celeryd: annotate tasks so they can be run directly without run_task This also makes the system less forgiving about celery configuration problems and thus easier to debug. I like that. diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/controllers/admin/settings.py --- a/kallithea/controllers/admin/settings.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/controllers/admin/settings.py Tue Sep 06 00:51:18 2016 +0200 @@ -37,7 +37,7 @@ from kallithea.lib import helpers as h from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator from kallithea.lib.base import BaseController, render -from kallithea.lib.celerylib import tasks, run_task +from kallithea.lib.celerylib import tasks from kallithea.lib.exceptions import HgsubversionImportError from kallithea.lib.utils import repo2db_mapper, set_app_settings from kallithea.model.db import Ui, Repository, Setting @@ -325,8 +325,8 @@ recipients = [test_email] if test_email else None - run_task(tasks.send_email, recipients, test_email_subj, - test_email_txt_body, test_email_html_body) + tasks.send_email(recipients, test_email_subj, + test_email_txt_body, test_email_html_body) h.flash(_('Send email task created'), category='success') raise HTTPFound(location=url('admin_settings_email')) @@ -398,7 +398,7 @@ if request.POST: repo_location = self._get_hg_ui_settings()['paths_root_path'] full_index = request.POST.get('full_index', False) - run_task(tasks.whoosh_index, repo_location, full_index) + tasks.whoosh_index(repo_location, full_index) h.flash(_('Whoosh reindex task scheduled'), category='success') raise HTTPFound(location=url('admin_settings_search')) diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/controllers/summary.py --- a/kallithea/controllers/summary.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/controllers/summary.py Tue Sep 06 00:51:18 2016 +0200 @@ -49,7 +49,6 @@ from kallithea.lib.base import BaseRepoController, render from kallithea.lib.vcs.backends.base import EmptyChangeset from kallithea.lib.markup_renderer import MarkupRenderer -from kallithea.lib.celerylib import run_task from kallithea.lib.celerylib.tasks import get_commits_stats from kallithea.lib.compat import json from kallithea.lib.vcs.nodes import FileNode @@ -225,6 +224,5 @@ c.no_data = True recurse_limit = 500 # don't recurse more than 500 times when parsing - run_task(get_commits_stats, c.db_repo.repo_name, ts_min_y, - ts_max_y, recurse_limit) + get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit) return render('summary/statistics.html') diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/lib/celerylib/__init__.py --- a/kallithea/lib/celerylib/__init__.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/lib/celerylib/__init__.py Tue Sep 06 00:51:18 2016 +0200 @@ -27,8 +27,6 @@ import os -import socket -import traceback import logging from pylons import config @@ -62,32 +60,35 @@ task_id = None -def run_task(task, *args, **kwargs): - global CELERY_ON - if CELERY_ON: - try: - t = task.apply_async(args=args, kwargs=kwargs) - log.info('running task %s:%s', t.task_id, task) - return t +def task(f_org): + """Wrapper of celery.task.task, running async if CELERY_ON + """ - except socket.error as e: - if isinstance(e, IOError) and e.errno == 111: - log.debug('Unable to connect to celeryd. Sync execution') - CELERY_ON = False - else: - log.error(traceback.format_exc()) - except KeyError as e: - log.debug('Unable to connect to celeryd. Sync execution') - except Exception as e: - log.error(traceback.format_exc()) + if CELERY_ON: + def f_async(*args, **kwargs): + log.info('executing %s task', f_org.__name__) + try: + f_org(*args, **kwargs) + finally: + log.info('executed %s task', f_org.__name__) + f_async.__name__ = f_org.__name__ + import celery.task + runner = celery.task.task(ignore_result=True)(f_async) + def f_wrapped(*args, **kwargs): + t = runner.apply_async(args=args, kwargs=kwargs) + log.info('executing task %s in async mode - id %s', f_org, t.task_id) + return t + else: + def f_wrapped(*args, **kwargs): + log.info('executing task %s in sync', f_org.__name__) + try: + result = f_org(*args, **kwargs) + except Exception as e: + log.error('exception executing sync task %s in sync', f_org.__name__, e) + raise # TODO: return this in FakeTask as with async tasks? + return FakeTask(result) - log.debug('executing task %s in sync mode', task) - try: - result = task(*args, **kwargs) - except Exception as e: - log.error('exception running sync task %s: %s', task, e) - raise # TODO: return this in FakeTask as with async tasks? - return FakeTask(result) + return f_wrapped def __get_lockkey(func, *fargs, **fkwargs): diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/lib/celerylib/tasks.py --- a/kallithea/lib/celerylib/tasks.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/lib/celerylib/tasks.py Tue Sep 06 00:51:18 2016 +0200 @@ -26,8 +26,6 @@ :license: GPLv3, see LICENSE.md for more details. """ -from celery.task import task - import os import traceback import logging @@ -40,7 +38,8 @@ from pylons import config from kallithea import CELERY_ON -from kallithea.lib.celerylib import run_task, locked_task, dbsession, \ +from kallithea.lib import celerylib +from kallithea.lib.celerylib import locked_task, dbsession, \ str2bool, __get_lockkey, LockHeld, DaemonLock, get_session from kallithea.lib.helpers import person from kallithea.lib.rcmail.smtp_mailer import SmtpMailer @@ -60,7 +59,7 @@ log = logging.getLogger(__name__) -@task(ignore_result=True) +@celerylib.task @locked_task @dbsession def whoosh_index(repo_location, full_index): @@ -73,7 +72,7 @@ .run(full_index=full_index) -@task(ignore_result=True) +@celerylib.task @dbsession def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100): DBS = get_session() @@ -228,18 +227,17 @@ # execute another task if celery is enabled if len(repo.revisions) > 1 and CELERY_ON and recurse_limit > 0: - recurse_limit -= 1 - run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y, - recurse_limit) - if recurse_limit <= 0: - log.debug('Breaking recursive mode due to reach of recurse limit') - return True + get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1) + elif recurse_limit <= 0: + log.debug('Not recursing - limit has been reached') + else: + log.debug('Not recursing') except LockHeld: log.info('Task with key %s already running', lockkey) return 'Task with key %s already running' % lockkey -@task(ignore_result=True) +@celerylib.task @dbsession def send_email(recipients, subject, body='', html_body='', headers=None, author=None): """ @@ -326,7 +324,7 @@ return False return True -@task(ignore_result=False) +@celerylib.task @dbsession def create_repo(form_data, cur_user): from kallithea.model.repo import RepoModel @@ -410,7 +408,7 @@ return True -@task(ignore_result=False) +@celerylib.task @dbsession def create_repo_fork(form_data, cur_user): """ diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/model/notification.py --- a/kallithea/model/notification.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/model/notification.py Tue Sep 06 00:51:18 2016 +0200 @@ -62,7 +62,7 @@ :param with_email: send email with this notification :param email_kwargs: additional dict to pass as args to email template """ - from kallithea.lib.celerylib import tasks, run_task + from kallithea.lib.celerylib import tasks email_kwargs = email_kwargs or {} if recipients and not getattr(recipients, '__iter__', False): raise Exception('recipients must be a list or iterable') @@ -132,7 +132,7 @@ email_html_body = EmailNotificationModel() \ .get_email_tmpl(type_, 'html', **html_kwargs) - run_task(tasks.send_email, [rec.email], email_subject, email_txt_body, + tasks.send_email([rec.email], email_subject, email_txt_body, email_html_body, headers, author=created_by_obj) return notif diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/model/repo.py --- a/kallithea/model/repo.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/model/repo.py Tue Sep 06 00:51:18 2016 +0200 @@ -473,8 +473,8 @@ :param form_data: :param cur_user: """ - from kallithea.lib.celerylib import tasks, run_task - return run_task(tasks.create_repo, form_data, cur_user) + from kallithea.lib.celerylib import tasks + return tasks.create_repo(form_data, cur_user) def _update_permissions(self, repo, perms_new=None, perms_updates=None, check_perms=True): @@ -522,8 +522,8 @@ :param form_data: :param cur_user: """ - from kallithea.lib.celerylib import tasks, run_task - return run_task(tasks.create_repo_fork, form_data, cur_user) + from kallithea.lib.celerylib import tasks + return tasks.create_repo_fork(form_data, cur_user) def delete(self, repo, forks=None, fs_remove=True, cur_user=None): """ diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/model/user.py --- a/kallithea/model/user.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/model/user.py Tue Sep 06 00:51:18 2016 +0200 @@ -329,7 +329,7 @@ allowing users to copy-paste or manually enter the token from the email. """ - from kallithea.lib.celerylib import tasks, run_task + from kallithea.lib.celerylib import tasks from kallithea.model.notification import EmailNotificationModel import kallithea.lib.helpers as h @@ -364,8 +364,7 @@ reset_token=token, reset_url=link) log.debug('sending email') - run_task(tasks.send_email, [user_email], - _("Password reset link"), body, html_body) + tasks.send_email([user_email], _("Password reset link"), body, html_body) log.info('send new password mail to %s', user_email) else: log.debug("password reset email %s not found", user_email) @@ -375,7 +374,7 @@ timestamp=timestamp) def verify_reset_password_token(self, email, timestamp, token): - from kallithea.lib.celerylib import tasks, run_task + from kallithea.lib.celerylib import tasks from kallithea.lib import auth import kallithea.lib.helpers as h user = User.get_by_email(email) @@ -401,7 +400,7 @@ return expected_token == token def reset_password(self, user_email, new_passwd): - from kallithea.lib.celerylib import tasks, run_task + from kallithea.lib.celerylib import tasks from kallithea.lib import auth user = User.get_by_email(user_email) if user is not None: @@ -414,7 +413,7 @@ if new_passwd is None: raise Exception('unable to set new password') - run_task(tasks.send_email, [user_email], + tasks.send_email([user_email], _('Password reset notification'), _('The password to your account %s has been changed using password reset form.') % (user.username,)) log.info('send password reset mail to %s', user_email) diff -r 81c13cdbe91f -r 82662f9faaf4 kallithea/tests/models/test_notifications.py --- a/kallithea/tests/models/test_notifications.py Tue Sep 06 00:51:18 2016 +0200 +++ b/kallithea/tests/models/test_notifications.py Tue Sep 06 00:51:18 2016 +0200 @@ -265,7 +265,7 @@ # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly: desc = 'TYPE_PASSWORD_RESET' kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746') - kallithea.lib.celerylib.run_task(kallithea.lib.celerylib.tasks.send_email, ['john@doe.com'], + kallithea.lib.celerylib.tasks.send_email(['john@doe.com'], "Password reset link", EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs), EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),