Mercurial > kallithea
view kallithea/lib/celerylib/__init__.py @ 7812:fe4086096758
flake8: fix some F401 '...' imported but unused
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Wed, 07 Aug 2019 23:48:26 +0200 |
parents | 0a277465fddf |
children | a38e05a0c79e |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.lib.celerylib ~~~~~~~~~~~~~~~~~~~~~~~ celery libs for Kallithea This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Nov 27, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging import os from hashlib import md5 from decorator import decorator from tg import config from kallithea import CELERY_EAGER, CELERY_ON from kallithea.lib.pidlock import DaemonLock, LockHeld from kallithea.lib.utils2 import safe_str from kallithea.model import meta log = logging.getLogger(__name__) class FakeTask(object): """Fake a sync result to make it look like a finished task""" def __init__(self, result): self.result = result def failed(self): return False traceback = None # if failed task_id = None def task(f_org): """Wrapper of celery.task.task, running async if CELERY_ON """ if CELERY_ON: def f_async(*args, **kwargs): log.info('executing %s task', f_org.__name__) try: f_org(*args, **kwargs) finally: log.info('executed %s task', f_org.__name__) f_async.__name__ = f_org.__name__ from kallithea.lib import celerypylons runner = celerypylons.task(ignore_result=True)(f_async) def f_wrapped(*args, **kwargs): t = runner.apply_async(args=args, kwargs=kwargs) log.info('executing task %s in async mode - id %s', f_org, t.task_id) return t else: def f_wrapped(*args, **kwargs): log.info('executing task %s in sync', f_org.__name__) try: result = f_org(*args, **kwargs) except Exception as e: log.error('exception executing sync task %s in sync: %r', f_org.__name__, e) raise # TODO: return this in FakeTask as with async tasks? return FakeTask(result) return f_wrapped def __get_lockkey(func, *fargs, **fkwargs): params = list(fargs) params.extend(['%s-%s' % ar for ar in fkwargs.items()]) func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) lockkey = 'task_%s.lock' % \ md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest() return lockkey def locked_task(func): def __wrapper(func, *fargs, **fkwargs): lockkey = __get_lockkey(func, *fargs, **fkwargs) lockkey_path = config['tg.cache_dir'] # The configured cache_dir is only available under this name ... log.info('running task with lockkey %s', lockkey) try: l = DaemonLock(os.path.join(lockkey_path, lockkey)) ret = func(*fargs, **fkwargs) l.release() return ret except LockHeld: log.info('LockHeld') return 'Task with key %s already running' % lockkey return decorator(__wrapper, func) def get_session(): sa = meta.Session() return sa def dbsession(func): def __wrapper(func, *fargs, **fkwargs): try: ret = func(*fargs, **fkwargs) return ret finally: if CELERY_ON and not CELERY_EAGER: meta.Session.remove() return decorator(__wrapper, func)