# HG changeset patch # User Mads Kiilerich # Date 1609520656 -3600 # Node ID 853717af31d3c395d3fc50b486a2c6b549e5c4c8 # Parent 01cb988e82a561d470ffd23d677d827374f5e19f celery: let async tasks choose at runtime if they should use immediate execution or dispatch to the Celery worker Make it completely safe to use task annotation at import time, before global config has been set. diff -r 01cb988e82a5 -r 853717af31d3 kallithea/lib/celerylib/__init__.py --- a/kallithea/lib/celerylib/__init__.py Fri Jan 01 18:04:16 2021 +0100 +++ b/kallithea/lib/celerylib/__init__.py Fri Jan 01 18:04:16 2021 +0100 @@ -43,29 +43,30 @@ def task(f_org): - """Wrapper of celery.task.task, running async if CELERY_APP + """Wrapper of celery.task.task, run at import time, before kallithea.CONFIG has been set, and before kallithea.CELERY_APP has been configured. """ - if asbool(kallithea.CONFIG.get('use_celery')): - def f_async(*args, **kwargs): - log.info('executing %s task', f_org.__name__) - try: - f_org(*args, **kwargs) - finally: - meta.Session.remove() # prevent reuse of auto created db sessions - log.info('executed %s task', f_org.__name__) - runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async) + def f_async(*args, **kwargs): + log.info('executing async task %s', f_org.__name__) + try: + f_org(*args, **kwargs) + finally: + meta.Session.remove() # prevent reuse of auto created db sessions + log.info('executed async task %s', f_org.__name__) - def f_wrapped(*args, **kwargs): + runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async) + + def f_wrapped(*args, **kwargs): + if asbool(kallithea.CONFIG.get('use_celery')): t = runner.apply_async(args=args, kwargs=kwargs) - log.info('executing task %s in async mode - id %s', f_org, t.task_id) - else: - def f_wrapped(*args, **kwargs): - log.info('executing task %s in sync', f_org.__name__) + log.info('executing async task %s - id %s', f_org, t.task_id) + else: + # invoke f_org directly, without the meta.Session.remove in f_async + log.info('executing sync task %s', f_org.__name__) try: f_org(*args, **kwargs) except Exception as e: - log.error('exception executing sync task %s in sync: %r', f_org.__name__, e) + log.error('exception executing sync task %s: %r', f_org.__name__, e) raise # TODO: report errors differently ... and consistently between sync and async return f_wrapped