changeset 6170:82662f9faaf4

celeryd: annotate tasks so they can be run directly without run_task This also makes the system less forgiving about celery configuration problems and thus easier to debug. I like that.
author Mads Kiilerich <madski@unity3d.com>
date Tue, 06 Sep 2016 00:51:18 +0200
parents 81c13cdbe91f
children 54545cc34c36
files kallithea/controllers/admin/settings.py kallithea/controllers/summary.py kallithea/lib/celerylib/__init__.py kallithea/lib/celerylib/tasks.py kallithea/model/notification.py kallithea/model/repo.py kallithea/model/user.py kallithea/tests/models/test_notifications.py
diffstat 8 files changed, 56 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/admin/settings.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/controllers/admin/settings.py	Tue Sep 06 00:51:18 2016 +0200
@@ -37,7 +37,7 @@
 from kallithea.lib import helpers as h
 from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator
 from kallithea.lib.base import BaseController, render
-from kallithea.lib.celerylib import tasks, run_task
+from kallithea.lib.celerylib import tasks
 from kallithea.lib.exceptions import HgsubversionImportError
 from kallithea.lib.utils import repo2db_mapper, set_app_settings
 from kallithea.model.db import Ui, Repository, Setting
@@ -325,8 +325,8 @@
 
             recipients = [test_email] if test_email else None
 
-            run_task(tasks.send_email, recipients, test_email_subj,
-                     test_email_txt_body, test_email_html_body)
+            tasks.send_email(recipients, test_email_subj,
+                             test_email_txt_body, test_email_html_body)
 
             h.flash(_('Send email task created'), category='success')
             raise HTTPFound(location=url('admin_settings_email'))
@@ -398,7 +398,7 @@
         if request.POST:
             repo_location = self._get_hg_ui_settings()['paths_root_path']
             full_index = request.POST.get('full_index', False)
-            run_task(tasks.whoosh_index, repo_location, full_index)
+            tasks.whoosh_index(repo_location, full_index)
             h.flash(_('Whoosh reindex task scheduled'), category='success')
             raise HTTPFound(location=url('admin_settings_search'))
 
--- a/kallithea/controllers/summary.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/controllers/summary.py	Tue Sep 06 00:51:18 2016 +0200
@@ -49,7 +49,6 @@
 from kallithea.lib.base import BaseRepoController, render
 from kallithea.lib.vcs.backends.base import EmptyChangeset
 from kallithea.lib.markup_renderer import MarkupRenderer
-from kallithea.lib.celerylib import run_task
 from kallithea.lib.celerylib.tasks import get_commits_stats
 from kallithea.lib.compat import json
 from kallithea.lib.vcs.nodes import FileNode
@@ -225,6 +224,5 @@
             c.no_data = True
 
         recurse_limit = 500  # don't recurse more than 500 times when parsing
-        run_task(get_commits_stats, c.db_repo.repo_name, ts_min_y,
-                 ts_max_y, recurse_limit)
+        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
         return render('summary/statistics.html')
--- a/kallithea/lib/celerylib/__init__.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/lib/celerylib/__init__.py	Tue Sep 06 00:51:18 2016 +0200
@@ -27,8 +27,6 @@
 
 
 import os
-import socket
-import traceback
 import logging
 
 from pylons import config
@@ -62,32 +60,35 @@
     task_id = None
 
 
-def run_task(task, *args, **kwargs):
-    global CELERY_ON
-    if CELERY_ON:
-        try:
-            t = task.apply_async(args=args, kwargs=kwargs)
-            log.info('running task %s:%s', t.task_id, task)
-            return t
+def task(f_org):
+    """Wrapper of celery.task.task, running async if CELERY_ON
+    """
 
-        except socket.error as e:
-            if isinstance(e, IOError) and e.errno == 111:
-                log.debug('Unable to connect to celeryd. Sync execution')
-                CELERY_ON = False
-            else:
-                log.error(traceback.format_exc())
-        except KeyError as e:
-                log.debug('Unable to connect to celeryd. Sync execution')
-        except Exception as e:
-            log.error(traceback.format_exc())
+    if CELERY_ON:
+        def f_async(*args, **kwargs):
+            log.info('executing %s task', f_org.__name__)
+            try:
+                f_org(*args, **kwargs)
+            finally:
+                log.info('executed %s task', f_org.__name__)
+        f_async.__name__ = f_org.__name__
+        import celery.task
+        runner = celery.task.task(ignore_result=True)(f_async)
+        def f_wrapped(*args, **kwargs):
+            t = runner.apply_async(args=args, kwargs=kwargs)
+            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
+            return t
+    else:
+        def f_wrapped(*args, **kwargs):
+            log.info('executing task %s in sync', f_org.__name__)
+            try:
+                result = f_org(*args, **kwargs)
+            except Exception as e:
+                log.error('exception executing sync task %s in sync', f_org.__name__, e)
+                raise # TODO: return this in FakeTask as with async tasks?
+            return FakeTask(result)
 
-    log.debug('executing task %s in sync mode', task)
-    try:
-        result = task(*args, **kwargs)
-    except Exception as e:
-        log.error('exception running sync task %s: %s', task, e)
-        raise # TODO: return this in FakeTask as with async tasks?
-    return FakeTask(result)
+    return f_wrapped
 
 
 def __get_lockkey(func, *fargs, **fkwargs):
--- a/kallithea/lib/celerylib/tasks.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/lib/celerylib/tasks.py	Tue Sep 06 00:51:18 2016 +0200
@@ -26,8 +26,6 @@
 :license: GPLv3, see LICENSE.md for more details.
 """
 
-from celery.task import task
-
 import os
 import traceback
 import logging
@@ -40,7 +38,8 @@
 from pylons import config
 
 from kallithea import CELERY_ON
-from kallithea.lib.celerylib import run_task, locked_task, dbsession, \
+from kallithea.lib import celerylib
+from kallithea.lib.celerylib import locked_task, dbsession, \
     str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
 from kallithea.lib.helpers import person
 from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
@@ -60,7 +59,7 @@
 log = logging.getLogger(__name__)
 
 
-@task(ignore_result=True)
+@celerylib.task
 @locked_task
 @dbsession
 def whoosh_index(repo_location, full_index):
@@ -73,7 +72,7 @@
                          .run(full_index=full_index)
 
 
-@task(ignore_result=True)
+@celerylib.task
 @dbsession
 def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
     DBS = get_session()
@@ -228,18 +227,17 @@
 
         # execute another task if celery is enabled
         if len(repo.revisions) > 1 and CELERY_ON and recurse_limit > 0:
-            recurse_limit -= 1
-            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y,
-                     recurse_limit)
-        if recurse_limit <= 0:
-            log.debug('Breaking recursive mode due to reach of recurse limit')
-        return True
+            get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
+        elif recurse_limit <= 0:
+            log.debug('Not recursing - limit has been reached')
+        else:
+            log.debug('Not recursing')
     except LockHeld:
         log.info('Task with key %s already running', lockkey)
         return 'Task with key %s already running' % lockkey
 
 
-@task(ignore_result=True)
+@celerylib.task
 @dbsession
 def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
     """
@@ -326,7 +324,7 @@
         return False
     return True
 
-@task(ignore_result=False)
+@celerylib.task
 @dbsession
 def create_repo(form_data, cur_user):
     from kallithea.model.repo import RepoModel
@@ -410,7 +408,7 @@
     return True
 
 
-@task(ignore_result=False)
+@celerylib.task
 @dbsession
 def create_repo_fork(form_data, cur_user):
     """
--- a/kallithea/model/notification.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/model/notification.py	Tue Sep 06 00:51:18 2016 +0200
@@ -62,7 +62,7 @@
         :param with_email: send email with this notification
         :param email_kwargs: additional dict to pass as args to email template
         """
-        from kallithea.lib.celerylib import tasks, run_task
+        from kallithea.lib.celerylib import tasks
         email_kwargs = email_kwargs or {}
         if recipients and not getattr(recipients, '__iter__', False):
             raise Exception('recipients must be a list or iterable')
@@ -132,7 +132,7 @@
             email_html_body = EmailNotificationModel() \
                                 .get_email_tmpl(type_, 'html', **html_kwargs)
 
-            run_task(tasks.send_email, [rec.email], email_subject, email_txt_body,
+            tasks.send_email([rec.email], email_subject, email_txt_body,
                      email_html_body, headers, author=created_by_obj)
 
         return notif
--- a/kallithea/model/repo.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/model/repo.py	Tue Sep 06 00:51:18 2016 +0200
@@ -473,8 +473,8 @@
         :param form_data:
         :param cur_user:
         """
-        from kallithea.lib.celerylib import tasks, run_task
-        return run_task(tasks.create_repo, form_data, cur_user)
+        from kallithea.lib.celerylib import tasks
+        return tasks.create_repo(form_data, cur_user)
 
     def _update_permissions(self, repo, perms_new=None, perms_updates=None,
                             check_perms=True):
@@ -522,8 +522,8 @@
         :param form_data:
         :param cur_user:
         """
-        from kallithea.lib.celerylib import tasks, run_task
-        return run_task(tasks.create_repo_fork, form_data, cur_user)
+        from kallithea.lib.celerylib import tasks
+        return tasks.create_repo_fork(form_data, cur_user)
 
     def delete(self, repo, forks=None, fs_remove=True, cur_user=None):
         """
--- a/kallithea/model/user.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/model/user.py	Tue Sep 06 00:51:18 2016 +0200
@@ -329,7 +329,7 @@
         allowing users to copy-paste or manually enter the token from the
         email.
         """
-        from kallithea.lib.celerylib import tasks, run_task
+        from kallithea.lib.celerylib import tasks
         from kallithea.model.notification import EmailNotificationModel
         import kallithea.lib.helpers as h
 
@@ -364,8 +364,7 @@
                 reset_token=token,
                 reset_url=link)
             log.debug('sending email')
-            run_task(tasks.send_email, [user_email],
-                     _("Password reset link"), body, html_body)
+            tasks.send_email([user_email], _("Password reset link"), body, html_body)
             log.info('send new password mail to %s', user_email)
         else:
             log.debug("password reset email %s not found", user_email)
@@ -375,7 +374,7 @@
                      timestamp=timestamp)
 
     def verify_reset_password_token(self, email, timestamp, token):
-        from kallithea.lib.celerylib import tasks, run_task
+        from kallithea.lib.celerylib import tasks
         from kallithea.lib import auth
         import kallithea.lib.helpers as h
         user = User.get_by_email(email)
@@ -401,7 +400,7 @@
         return expected_token == token
 
     def reset_password(self, user_email, new_passwd):
-        from kallithea.lib.celerylib import tasks, run_task
+        from kallithea.lib.celerylib import tasks
         from kallithea.lib import auth
         user = User.get_by_email(user_email)
         if user is not None:
@@ -414,7 +413,7 @@
         if new_passwd is None:
             raise Exception('unable to set new password')
 
-        run_task(tasks.send_email, [user_email],
+        tasks.send_email([user_email],
                  _('Password reset notification'),
                  _('The password to your account %s has been changed using password reset form.') % (user.username,))
         log.info('send password reset mail to %s', user_email)
--- a/kallithea/tests/models/test_notifications.py	Tue Sep 06 00:51:18 2016 +0200
+++ b/kallithea/tests/models/test_notifications.py	Tue Sep 06 00:51:18 2016 +0200
@@ -265,7 +265,7 @@
             # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
             desc = 'TYPE_PASSWORD_RESET'
             kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
-            kallithea.lib.celerylib.run_task(kallithea.lib.celerylib.tasks.send_email, ['john@doe.com'],
+            kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
                 "Password reset link",
                 EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
                 EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),