view kallithea/lib/celerylib/__init__.py @ 7812:fe4086096758

flake8: fix some F401 '...' imported but unused
author Mads Kiilerich <mads@kiilerich.com>
date Wed, 07 Aug 2019 23:48:26 +0200
parents 0a277465fddf
children a38e05a0c79e
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.lib.celerylib
~~~~~~~~~~~~~~~~~~~~~~~

celery libs for Kallithea

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Nov 27, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""


import logging
import os
from hashlib import md5

from decorator import decorator
from tg import config

from kallithea import CELERY_EAGER, CELERY_ON
from kallithea.lib.pidlock import DaemonLock, LockHeld
from kallithea.lib.utils2 import safe_str
from kallithea.model import meta


log = logging.getLogger(__name__)


class FakeTask(object):
    """Fake a sync result to make it look like a finished task"""

    def __init__(self, result):
        self.result = result

    def failed(self):
        return False

    traceback = None # if failed

    task_id = None


def task(f_org):
    """Wrapper of celery.task.task, running async if CELERY_ON
    """

    if CELERY_ON:
        def f_async(*args, **kwargs):
            log.info('executing %s task', f_org.__name__)
            try:
                f_org(*args, **kwargs)
            finally:
                log.info('executed %s task', f_org.__name__)
        f_async.__name__ = f_org.__name__
        from kallithea.lib import celerypylons
        runner = celerypylons.task(ignore_result=True)(f_async)

        def f_wrapped(*args, **kwargs):
            t = runner.apply_async(args=args, kwargs=kwargs)
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
            return t
    else:
        def f_wrapped(*args, **kwargs):
            log.info('executing task %s in sync', f_org.__name__)
            try:
                result = f_org(*args, **kwargs)
            except Exception as e:
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
                raise # TODO: return this in FakeTask as with async tasks?
            return FakeTask(result)

    return f_wrapped


def __get_lockkey(func, *fargs, **fkwargs):
    params = list(fargs)
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])

    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)

    lockkey = 'task_%s.lock' % \
        md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
    return lockkey


def locked_task(func):
    def __wrapper(func, *fargs, **fkwargs):
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
        lockkey_path = config['tg.cache_dir']  # The configured cache_dir is only available under this name ...

        log.info('running task with lockkey %s', lockkey)
        try:
            l = DaemonLock(os.path.join(lockkey_path, lockkey))
            ret = func(*fargs, **fkwargs)
            l.release()
            return ret
        except LockHeld:
            log.info('LockHeld')
            return 'Task with key %s already running' % lockkey

    return decorator(__wrapper, func)


def get_session():
    sa = meta.Session()
    return sa


def dbsession(func):
    def __wrapper(func, *fargs, **fkwargs):
        try:
            ret = func(*fargs, **fkwargs)
            return ret
        finally:
            if CELERY_ON and not CELERY_EAGER:
                meta.Session.remove()

    return decorator(__wrapper, func)