changeset 8778:341e4bb9e227

celery: move async tasks to model - they both use model and are used by model, so they must be pretty much the same
author Mads Kiilerich <mads@kiilerich.com>
date Sat, 07 Nov 2020 18:42:58 +0100
parents 7e9e1f3d7a93
children ea1c608efa3a
files kallithea/controllers/admin/settings.py kallithea/controllers/summary.py kallithea/lib/celerylib/tasks.py kallithea/lib/celerypylons/__init__.py kallithea/model/async_tasks.py kallithea/model/notification.py kallithea/model/repo.py kallithea/model/user.py kallithea/tests/functional/test_login.py kallithea/tests/models/test_notifications.py kallithea/tests/other/test_mail.py scripts/deps.py
diffstat 12 files changed, 562 insertions(+), 567 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/admin/settings.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/controllers/admin/settings.py	Sat Nov 07 18:42:58 2020 +0100
@@ -39,12 +39,11 @@
 from kallithea.lib import webutils
 from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 from kallithea.lib.base import BaseController, render
-from kallithea.lib.celerylib import tasks
 from kallithea.lib.utils import repo2db_mapper, set_app_settings
 from kallithea.lib.utils2 import safe_str
 from kallithea.lib.vcs import VCSError
 from kallithea.lib.webutils import url
-from kallithea.model import db, meta
+from kallithea.model import async_tasks, db, meta
 from kallithea.model.forms import ApplicationSettingsForm, ApplicationUiSettingsForm, ApplicationVisualisationForm
 from kallithea.model.notification import EmailNotificationModel
 from kallithea.model.scm import ScmModel
@@ -301,7 +300,7 @@
 
             recipients = [test_email] if test_email else None
 
-            tasks.send_email(recipients, test_email_subj,
+            async_tasks.send_email(recipients, test_email_subj,
                              test_email_txt_body, test_email_html_body)
 
             webutils.flash(_('Send email task created'), category='success')
@@ -379,7 +378,7 @@
         if request.POST:
             repo_location = self._get_hg_ui_settings()['paths_root_path']
             full_index = request.POST.get('full_index', False)
-            tasks.whoosh_index(repo_location, full_index)
+            async_tasks.whoosh_index(repo_location, full_index)
             webutils.flash(_('Whoosh reindex task scheduled'), category='success')
             raise HTTPFound(location=url('admin_settings_search'))
 
--- a/kallithea/controllers/summary.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/controllers/summary.py	Sat Nov 07 18:42:58 2020 +0100
@@ -41,7 +41,6 @@
 from kallithea.lib import ext_json, webutils
 from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 from kallithea.lib.base import BaseRepoController, jsonify, render
-from kallithea.lib.celerylib.tasks import get_commits_stats
 from kallithea.lib.conf import ALL_EXTS, ALL_READMES, LANGUAGES_EXTENSIONS_MAP
 from kallithea.lib.markup_renderer import MarkupRenderer
 from kallithea.lib.page import Page
@@ -49,7 +48,7 @@
 from kallithea.lib.vcs.backends.base import EmptyChangeset
 from kallithea.lib.vcs.exceptions import ChangesetError, EmptyRepositoryError, NodeDoesNotExistError
 from kallithea.lib.vcs.nodes import FileNode
-from kallithea.model import db
+from kallithea.model import async_tasks, db
 
 
 log = logging.getLogger(__name__)
@@ -209,5 +208,5 @@
             c.trending_languages = []
 
         recurse_limit = 500  # don't recurse more than 500 times when parsing
-        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
+        async_tasks.get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
         return render('summary/statistics.html')
--- a/kallithea/lib/celerylib/tasks.py	Sat Dec 12 23:09:44 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,521 +0,0 @@
-# -*- coding: utf-8 -*-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-"""
-kallithea.lib.celerylib.tasks
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Kallithea task modules, containing all task that suppose to be run
-by celery daemon
-
-This file was forked by the Kallithea project in July 2014.
-Original author and date, and relevant copyright and licensing information is below:
-:created_on: Oct 6, 2010
-:author: marcink
-:copyright: (c) 2013 RhodeCode GmbH, and others.
-:license: GPLv3, see LICENSE.md for more details.
-"""
-
-import email.message
-import email.utils
-import os
-import smtplib
-import time
-import traceback
-from collections import OrderedDict
-from operator import itemgetter
-from time import mktime
-
-import celery.utils.log
-from tg import config
-
-import kallithea
-import kallithea.lib.helpers as h
-from kallithea.lib import celerylib, conf, ext_json, hooks
-from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
-from kallithea.lib.utils2 import asbool, ascii_bytes
-from kallithea.lib.vcs.utils import author_email
-from kallithea.model import db, userlog
-from kallithea.model.repo import RepoModel
-
-
-__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
-
-
-log = celery.utils.log.get_task_logger(__name__)
-
-
-@celerylib.task
-@celerylib.locked_task
-@celerylib.dbsession
-def whoosh_index(repo_location, full_index):
-    celerylib.get_session() # initialize database connection
-
-    index_location = config['index_dir']
-    WhooshIndexingDaemon(index_location=index_location,
-                         repo_location=repo_location) \
-                         .run(full_index=full_index)
-
-
-# for js data compatibility cleans the key for person from '
-def akc(k):
-    return h.person(k).replace('"', '')
-
-
-@celerylib.task
-@celerylib.dbsession
-def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
-    DBS = celerylib.get_session()
-    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
-                            ts_max_y)
-    lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
-
-    log.info('running task with lockkey %s', lockkey)
-
-    try:
-        lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
-
-        co_day_auth_aggr = {}
-        commits_by_day_aggregate = {}
-        repo = db.Repository.get_by_repo_name(repo_name)
-        if repo is None:
-            return True
-
-        repo = repo.scm_instance
-        repo_size = repo.count()
-        # return if repo have no revisions
-        if repo_size < 1:
-            lock.release()
-            return True
-
-        skip_date_limit = True
-        parse_limit = int(config.get('commit_parse_limit'))
-        last_rev = None
-        last_cs = None
-        timegetter = itemgetter('time')
-
-        dbrepo = DBS.query(db.Repository) \
-            .filter(db.Repository.repo_name == repo_name).scalar()
-        cur_stats = DBS.query(db.Statistics) \
-            .filter(db.Statistics.repository == dbrepo).scalar()
-
-        if cur_stats is not None:
-            last_rev = cur_stats.stat_on_revision
-
-        if last_rev == repo.get_changeset().revision and repo_size > 1:
-            # pass silently without any work if we're not on first revision or
-            # current state of parsing revision(from db marker) is the
-            # last revision
-            lock.release()
-            return True
-
-        if cur_stats:
-            commits_by_day_aggregate = OrderedDict(ext_json.loads(
-                                        cur_stats.commit_activity_combined))
-            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
-
-        log.debug('starting parsing %s', parse_limit)
-
-        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
-        log.debug('Getting revisions from %s to %s',
-             last_rev, last_rev + parse_limit
-        )
-        for cs in repo[last_rev:last_rev + parse_limit]:
-            log.debug('parsing %s', cs)
-            last_cs = cs  # remember last parsed changeset
-            tt = cs.date.timetuple()
-            k = mktime(tt[:3] + (0, 0, 0, 0, 0, 0))
-
-            if akc(cs.author) in co_day_auth_aggr:
-                try:
-                    l = [timegetter(x) for x in
-                         co_day_auth_aggr[akc(cs.author)]['data']]
-                    time_pos = l.index(k)
-                except ValueError:
-                    time_pos = None
-
-                if time_pos is not None and time_pos >= 0:
-                    datadict = \
-                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
-
-                    datadict["commits"] += 1
-                    datadict["added"] += len(cs.added)
-                    datadict["changed"] += len(cs.changed)
-                    datadict["removed"] += len(cs.removed)
-
-                else:
-                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-
-                        datadict = {"time": k,
-                                    "commits": 1,
-                                    "added": len(cs.added),
-                                    "changed": len(cs.changed),
-                                    "removed": len(cs.removed),
-                                   }
-                        co_day_auth_aggr[akc(cs.author)]['data'] \
-                            .append(datadict)
-
-            else:
-                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-                    co_day_auth_aggr[akc(cs.author)] = {
-                                        "label": akc(cs.author),
-                                        "data": [{"time": k,
-                                                 "commits": 1,
-                                                 "added": len(cs.added),
-                                                 "changed": len(cs.changed),
-                                                 "removed": len(cs.removed),
-                                                 }],
-                                        "schema": ["commits"],
-                                        }
-
-            # gather all data by day
-            if k in commits_by_day_aggregate:
-                commits_by_day_aggregate[k] += 1
-            else:
-                commits_by_day_aggregate[k] = 1
-
-        overview_data = sorted(commits_by_day_aggregate.items(),
-                               key=itemgetter(0))
-
-        if not co_day_auth_aggr:
-            co_day_auth_aggr[akc(repo.contact)] = {
-                "label": akc(repo.contact),
-                "data": [0, 1],
-                "schema": ["commits"],
-            }
-
-        stats = cur_stats if cur_stats else db.Statistics()
-        stats.commit_activity = ascii_bytes(ext_json.dumps(co_day_auth_aggr))
-        stats.commit_activity_combined = ascii_bytes(ext_json.dumps(overview_data))
-
-        log.debug('last revision %s', last_rev)
-        leftovers = len(repo.revisions[last_rev:])
-        log.debug('revisions to parse %s', leftovers)
-
-        if last_rev == 0 or leftovers < parse_limit:
-            log.debug('getting code trending stats')
-            stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
-
-        try:
-            stats.repository = dbrepo
-            stats.stat_on_revision = last_cs.revision if last_cs else 0
-            DBS.add(stats)
-            DBS.commit()
-        except:
-            log.error(traceback.format_exc())
-            DBS.rollback()
-            lock.release()
-            return False
-
-        # final release
-        lock.release()
-
-        # execute another task if celery is enabled
-        if len(repo.revisions) > 1 and kallithea.CELERY_APP and recurse_limit > 0:
-            get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
-        elif recurse_limit <= 0:
-            log.debug('Not recursing - limit has been reached')
-        else:
-            log.debug('Not recursing')
-    except celerylib.LockHeld:
-        log.info('Task with key %s already running', lockkey)
-        return 'Task with key %s already running' % lockkey
-
-
-@celerylib.task
-@celerylib.dbsession
-def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
-    """
-    Sends an email with defined parameters from the .ini files.
-
-    :param recipients: list of recipients, if this is None, the defined email
-        address from field 'email_to' and all admins is used instead
-    :param subject: subject of the mail
-    :param body: plain text body of the mail
-    :param html_body: html version of body
-    :param headers: dictionary of prepopulated e-mail headers
-    :param from_name: full name to be used as sender of this mail - often a
-    .full_name_or_username value
-    """
-    assert isinstance(recipients, list), recipients
-    if headers is None:
-        headers = {}
-    else:
-        # do not modify the original headers object passed by the caller
-        headers = headers.copy()
-
-    email_config = config
-    email_prefix = email_config.get('email_prefix', '')
-    if email_prefix:
-        subject = "%s %s" % (email_prefix, subject)
-
-    if not recipients:
-        # if recipients are not defined we send to email_config + all admins
-        recipients = [u.email for u in db.User.query()
-                      .filter(db.User.admin == True).all()]
-        if email_config.get('email_to') is not None:
-            recipients += email_config.get('email_to').split(',')
-
-        # If there are still no recipients, there are no admins and no address
-        # configured in email_to, so return.
-        if not recipients:
-            log.error("No recipients specified and no fallback available.")
-            return False
-
-        log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients))
-
-    # SMTP sender
-    app_email_from = email_config.get('app_email_from', 'Kallithea')
-    # 'From' header
-    if from_name is not None:
-        # set From header based on from_name but with a generic e-mail address
-        # In case app_email_from is in "Some Name <e-mail>" format, we first
-        # extract the e-mail address.
-        envelope_addr = author_email(app_email_from)
-        headers['From'] = '"%s" <%s>' % (
-            email.utils.quote('%s (no-reply)' % from_name),
-            envelope_addr)
-
-    smtp_server = email_config.get('smtp_server')
-    smtp_port = email_config.get('smtp_port')
-    smtp_use_tls = asbool(email_config.get('smtp_use_tls'))
-    smtp_use_ssl = asbool(email_config.get('smtp_use_ssl'))
-    smtp_auth = email_config.get('smtp_auth')  # undocumented - overrule automatic choice of auth mechanism
-    smtp_username = email_config.get('smtp_username')
-    smtp_password = email_config.get('smtp_password')
-
-    logmsg = ("Mail details:\n"
-              "recipients: %s\n"
-              "headers: %s\n"
-              "subject: %s\n"
-              "body:\n%s\n"
-              "html:\n%s\n"
-              % (' '.join(recipients), headers, subject, body, html_body))
-
-    if smtp_server:
-        log.debug("Sending e-mail. " + logmsg)
-    else:
-        log.error("SMTP mail server not configured - cannot send e-mail.")
-        log.warning(logmsg)
-        return False
-
-    msg = email.message.EmailMessage()
-    msg['Subject'] = subject
-    msg['From'] = app_email_from  # fallback - might be overridden by a header
-    msg['To'] = ', '.join(recipients)
-    msg['Date'] = email.utils.formatdate(time.time())
-
-    for key, value in headers.items():
-        del msg[key]  # Delete key first to make sure add_header will replace header (if any), no matter the casing
-        msg.add_header(key, value)
-
-    msg.set_content(body)
-    msg.add_alternative(html_body, subtype='html')
-
-    try:
-        if smtp_use_ssl:
-            smtp_serv = smtplib.SMTP_SSL(smtp_server, smtp_port)
-        else:
-            smtp_serv = smtplib.SMTP(smtp_server, smtp_port)
-
-        if smtp_use_tls:
-            smtp_serv.starttls()
-
-        if smtp_auth:
-            smtp_serv.ehlo()  # populate esmtp_features
-            smtp_serv.esmtp_features["auth"] = smtp_auth
-
-        if smtp_username and smtp_password is not None:
-            smtp_serv.login(smtp_username, smtp_password)
-
-        smtp_serv.sendmail(app_email_from, recipients, msg.as_string())
-        smtp_serv.quit()
-
-        log.info('Mail was sent to: %s' % recipients)
-    except:
-        log.error('Mail sending failed')
-        log.error(traceback.format_exc())
-        return False
-    return True
-
-
-@celerylib.task
-@celerylib.dbsession
-def create_repo(form_data, cur_user):
-    DBS = celerylib.get_session()
-
-    cur_user = db.User.guess_instance(cur_user)
-
-    owner = cur_user
-    repo_name = form_data['repo_name']
-    repo_name_full = form_data['repo_name_full']
-    repo_type = form_data['repo_type']
-    description = form_data['repo_description']
-    private = form_data['repo_private']
-    clone_uri = form_data.get('clone_uri')
-    repo_group = form_data['repo_group']
-    landing_rev = form_data['repo_landing_rev']
-    copy_fork_permissions = form_data.get('copy_permissions')
-    copy_group_permissions = form_data.get('repo_copy_permissions')
-    fork_of = form_data.get('fork_parent_id')
-    state = form_data.get('repo_state', db.Repository.STATE_PENDING)
-
-    # repo creation defaults, private and repo_type are filled in form
-    defs = db.Setting.get_default_repo_settings(strip_prefix=True)
-    enable_statistics = defs.get('repo_enable_statistics')
-    enable_downloads = defs.get('repo_enable_downloads')
-
-    try:
-        repo = RepoModel()._create_repo(
-            repo_name=repo_name_full,
-            repo_type=repo_type,
-            description=description,
-            owner=owner,
-            private=private,
-            clone_uri=clone_uri,
-            repo_group=repo_group,
-            landing_rev=landing_rev,
-            fork_of=fork_of,
-            copy_fork_permissions=copy_fork_permissions,
-            copy_group_permissions=copy_group_permissions,
-            enable_statistics=enable_statistics,
-            enable_downloads=enable_downloads,
-            state=state
-        )
-
-        userlog.action_logger(cur_user, 'user_created_repo',
-                      form_data['repo_name_full'], '')
-
-        DBS.commit()
-        # now create this repo on Filesystem
-        RepoModel()._create_filesystem_repo(
-            repo_name=repo_name,
-            repo_type=repo_type,
-            repo_group=db.RepoGroup.guess_instance(repo_group),
-            clone_uri=clone_uri,
-        )
-        repo = db.Repository.get_by_repo_name(repo_name_full)
-        hooks.log_create_repository(repo.get_dict(), created_by=owner.username)
-
-        # update repo changeset caches initially
-        repo.update_changeset_cache()
-
-        # set new created state
-        repo.set_state(db.Repository.STATE_CREATED)
-        DBS.commit()
-    except Exception as e:
-        log.warning('Exception %s occurred when forking repository, '
-                    'doing cleanup...' % e)
-        # rollback things manually !
-        repo = db.Repository.get_by_repo_name(repo_name_full)
-        if repo:
-            db.Repository.delete(repo.repo_id)
-            DBS.commit()
-            RepoModel()._delete_filesystem_repo(repo)
-        raise
-
-    return True
-
-
-@celerylib.task
-@celerylib.dbsession
-def create_repo_fork(form_data, cur_user):
-    """
-    Creates a fork of repository using interval VCS methods
-
-    :param form_data:
-    :param cur_user:
-    """
-    DBS = celerylib.get_session()
-
-    base_path = kallithea.CONFIG['base_path']
-    cur_user = db.User.guess_instance(cur_user)
-
-    repo_name = form_data['repo_name']  # fork in this case
-    repo_name_full = form_data['repo_name_full']
-
-    repo_type = form_data['repo_type']
-    owner = cur_user
-    private = form_data['private']
-    clone_uri = form_data.get('clone_uri')
-    repo_group = form_data['repo_group']
-    landing_rev = form_data['landing_rev']
-    copy_fork_permissions = form_data.get('copy_permissions')
-
-    try:
-        fork_of = db.Repository.guess_instance(form_data.get('fork_parent_id'))
-
-        RepoModel()._create_repo(
-            repo_name=repo_name_full,
-            repo_type=repo_type,
-            description=form_data['description'],
-            owner=owner,
-            private=private,
-            clone_uri=clone_uri,
-            repo_group=repo_group,
-            landing_rev=landing_rev,
-            fork_of=fork_of,
-            copy_fork_permissions=copy_fork_permissions
-        )
-        userlog.action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
-                      fork_of.repo_name, '')
-        DBS.commit()
-
-        source_repo_path = os.path.join(base_path, fork_of.repo_name)
-
-        # now create this repo on Filesystem
-        RepoModel()._create_filesystem_repo(
-            repo_name=repo_name,
-            repo_type=repo_type,
-            repo_group=db.RepoGroup.guess_instance(repo_group),
-            clone_uri=source_repo_path,
-        )
-        repo = db.Repository.get_by_repo_name(repo_name_full)
-        hooks.log_create_repository(repo.get_dict(), created_by=owner.username)
-
-        # update repo changeset caches initially
-        repo.update_changeset_cache()
-
-        # set new created state
-        repo.set_state(db.Repository.STATE_CREATED)
-        DBS.commit()
-    except Exception as e:
-        log.warning('Exception %s occurred when forking repository, '
-                    'doing cleanup...' % e)
-        # rollback things manually !
-        repo = db.Repository.get_by_repo_name(repo_name_full)
-        if repo:
-            db.Repository.delete(repo.repo_id)
-            DBS.commit()
-            RepoModel()._delete_filesystem_repo(repo)
-        raise
-
-    return True
-
-
-def __get_codes_stats(repo_name):
-    repo = db.Repository.get_by_repo_name(repo_name).scm_instance
-
-    tip = repo.get_changeset()
-    code_stats = {}
-
-    for _topnode, _dirnodes, filenodes in tip.walk('/'):
-        for filenode in filenodes:
-            ext = filenode.extension.lower()
-            if ext in conf.LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
-                if ext in code_stats:
-                    code_stats[ext] += 1
-                else:
-                    code_stats[ext] = 1
-
-    return code_stats or {}
--- a/kallithea/lib/celerypylons/__init__.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/lib/celerypylons/__init__.py	Sat Nov 07 18:42:58 2020 +0100
@@ -23,7 +23,7 @@
 
 
 class CeleryConfig(object):
-    imports = ['kallithea.lib.celerylib.tasks']
+    imports = ['kallithea.model.async_tasks']
     task_always_eager = False
 
 # map from Kallithea .ini Celery 3 config names to Celery 4 config names
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/kallithea/model/async_tasks.py	Sat Nov 07 18:42:58 2020 +0100
@@ -0,0 +1,520 @@
+# -*- coding: utf-8 -*-
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+kallithea.model.async_tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Kallithea task modules, containing all task that suppose to be run
+by celery daemon
+
+This file was forked by the Kallithea project in July 2014.
+Original author and date, and relevant copyright and licensing information is below:
+:created_on: Oct 6, 2010
+:author: marcink
+:copyright: (c) 2013 RhodeCode GmbH, and others.
+:license: GPLv3, see LICENSE.md for more details.
+"""
+
+import email.message
+import email.utils
+import os
+import smtplib
+import time
+import traceback
+from collections import OrderedDict
+from operator import itemgetter
+from time import mktime
+
+import celery.utils.log
+from tg import config
+
+import kallithea
+import kallithea.lib.helpers as h
+from kallithea.lib import celerylib, conf, ext_json, hooks
+from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
+from kallithea.lib.utils2 import asbool, ascii_bytes
+from kallithea.lib.vcs.utils import author_email
+from kallithea.model import db, repo, userlog
+
+
+__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
+
+
+log = celery.utils.log.get_task_logger(__name__)
+
+
+@celerylib.task
+@celerylib.locked_task
+@celerylib.dbsession
+def whoosh_index(repo_location, full_index):
+    celerylib.get_session() # initialize database connection
+
+    index_location = config['index_dir']
+    WhooshIndexingDaemon(index_location=index_location,
+                         repo_location=repo_location) \
+                         .run(full_index=full_index)
+
+
+# for js data compatibility cleans the key for person from '
+def akc(k):
+    return h.person(k).replace('"', '')
+
+
+@celerylib.task
+@celerylib.dbsession
+def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
+    DBS = celerylib.get_session()
+    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
+                            ts_max_y)
+    lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
+
+    log.info('running task with lockkey %s', lockkey)
+
+    try:
+        lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
+
+        co_day_auth_aggr = {}
+        commits_by_day_aggregate = {}
+        db_repo = db.Repository.get_by_repo_name(repo_name)
+        if db_repo is None:
+            return True
+
+        scm_repo = db_repo.scm_instance
+        repo_size = scm_repo.count()
+        # return if repo have no revisions
+        if repo_size < 1:
+            lock.release()
+            return True
+
+        skip_date_limit = True
+        parse_limit = int(config.get('commit_parse_limit'))
+        last_rev = None
+        last_cs = None
+        timegetter = itemgetter('time')
+
+        dbrepo = DBS.query(db.Repository) \
+            .filter(db.Repository.repo_name == repo_name).scalar()
+        cur_stats = DBS.query(db.Statistics) \
+            .filter(db.Statistics.repository == dbrepo).scalar()
+
+        if cur_stats is not None:
+            last_rev = cur_stats.stat_on_revision
+
+        if last_rev == scm_repo.get_changeset().revision and repo_size > 1:
+            # pass silently without any work if we're not on first revision or
+            # current state of parsing revision(from db marker) is the
+            # last revision
+            lock.release()
+            return True
+
+        if cur_stats:
+            commits_by_day_aggregate = OrderedDict(ext_json.loads(
+                                        cur_stats.commit_activity_combined))
+            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
+
+        log.debug('starting parsing %s', parse_limit)
+
+        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
+        log.debug('Getting revisions from %s to %s',
+             last_rev, last_rev + parse_limit
+        )
+        for cs in scm_repo[last_rev:last_rev + parse_limit]:
+            log.debug('parsing %s', cs)
+            last_cs = cs  # remember last parsed changeset
+            tt = cs.date.timetuple()
+            k = mktime(tt[:3] + (0, 0, 0, 0, 0, 0))
+
+            if akc(cs.author) in co_day_auth_aggr:
+                try:
+                    l = [timegetter(x) for x in
+                         co_day_auth_aggr[akc(cs.author)]['data']]
+                    time_pos = l.index(k)
+                except ValueError:
+                    time_pos = None
+
+                if time_pos is not None and time_pos >= 0:
+                    datadict = \
+                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
+
+                    datadict["commits"] += 1
+                    datadict["added"] += len(cs.added)
+                    datadict["changed"] += len(cs.changed)
+                    datadict["removed"] += len(cs.removed)
+
+                else:
+                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+
+                        datadict = {"time": k,
+                                    "commits": 1,
+                                    "added": len(cs.added),
+                                    "changed": len(cs.changed),
+                                    "removed": len(cs.removed),
+                                   }
+                        co_day_auth_aggr[akc(cs.author)]['data'] \
+                            .append(datadict)
+
+            else:
+                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+                    co_day_auth_aggr[akc(cs.author)] = {
+                                        "label": akc(cs.author),
+                                        "data": [{"time": k,
+                                                 "commits": 1,
+                                                 "added": len(cs.added),
+                                                 "changed": len(cs.changed),
+                                                 "removed": len(cs.removed),
+                                                 }],
+                                        "schema": ["commits"],
+                                        }
+
+            # gather all data by day
+            if k in commits_by_day_aggregate:
+                commits_by_day_aggregate[k] += 1
+            else:
+                commits_by_day_aggregate[k] = 1
+
+        overview_data = sorted(commits_by_day_aggregate.items(),
+                               key=itemgetter(0))
+
+        if not co_day_auth_aggr:
+            co_day_auth_aggr[akc(scm_repo.contact)] = {
+                "label": akc(scm_repo.contact),
+                "data": [0, 1],
+                "schema": ["commits"],
+            }
+
+        stats = cur_stats if cur_stats else db.Statistics()
+        stats.commit_activity = ascii_bytes(ext_json.dumps(co_day_auth_aggr))
+        stats.commit_activity_combined = ascii_bytes(ext_json.dumps(overview_data))
+
+        log.debug('last revision %s', last_rev)
+        leftovers = len(scm_repo.revisions[last_rev:])
+        log.debug('revisions to parse %s', leftovers)
+
+        if last_rev == 0 or leftovers < parse_limit:
+            log.debug('getting code trending stats')
+            stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
+
+        try:
+            stats.repository = dbrepo
+            stats.stat_on_revision = last_cs.revision if last_cs else 0
+            DBS.add(stats)
+            DBS.commit()
+        except:
+            log.error(traceback.format_exc())
+            DBS.rollback()
+            lock.release()
+            return False
+
+        # final release
+        lock.release()
+
+        # execute another task if celery is enabled
+        if len(scm_repo.revisions) > 1 and kallithea.CELERY_APP and recurse_limit > 0:
+            get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
+        elif recurse_limit <= 0:
+            log.debug('Not recursing - limit has been reached')
+        else:
+            log.debug('Not recursing')
+    except celerylib.LockHeld:
+        log.info('Task with key %s already running', lockkey)
+        return 'Task with key %s already running' % lockkey
+
+
+@celerylib.task
+@celerylib.dbsession
+def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
+    """
+    Sends an email with defined parameters from the .ini files.
+
+    :param recipients: list of recipients, if this is None, the defined email
+        address from field 'email_to' and all admins is used instead
+    :param subject: subject of the mail
+    :param body: plain text body of the mail
+    :param html_body: html version of body
+    :param headers: dictionary of prepopulated e-mail headers
+    :param from_name: full name to be used as sender of this mail - often a
+    .full_name_or_username value
+    """
+    assert isinstance(recipients, list), recipients
+    if headers is None:
+        headers = {}
+    else:
+        # do not modify the original headers object passed by the caller
+        headers = headers.copy()
+
+    email_config = config
+    email_prefix = email_config.get('email_prefix', '')
+    if email_prefix:
+        subject = "%s %s" % (email_prefix, subject)
+
+    if not recipients:
+        # if recipients are not defined we send to email_config + all admins
+        recipients = [u.email for u in db.User.query()
+                      .filter(db.User.admin == True).all()]
+        if email_config.get('email_to') is not None:
+            recipients += email_config.get('email_to').split(',')
+
+        # If there are still no recipients, there are no admins and no address
+        # configured in email_to, so return.
+        if not recipients:
+            log.error("No recipients specified and no fallback available.")
+            return False
+
+        log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients))
+
+    # SMTP sender
+    app_email_from = email_config.get('app_email_from', 'Kallithea')
+    # 'From' header
+    if from_name is not None:
+        # set From header based on from_name but with a generic e-mail address
+        # In case app_email_from is in "Some Name <e-mail>" format, we first
+        # extract the e-mail address.
+        envelope_addr = author_email(app_email_from)
+        headers['From'] = '"%s" <%s>' % (
+            email.utils.quote('%s (no-reply)' % from_name),
+            envelope_addr)
+
+    smtp_server = email_config.get('smtp_server')
+    smtp_port = email_config.get('smtp_port')
+    smtp_use_tls = asbool(email_config.get('smtp_use_tls'))
+    smtp_use_ssl = asbool(email_config.get('smtp_use_ssl'))
+    smtp_auth = email_config.get('smtp_auth')  # undocumented - overrule automatic choice of auth mechanism
+    smtp_username = email_config.get('smtp_username')
+    smtp_password = email_config.get('smtp_password')
+
+    logmsg = ("Mail details:\n"
+              "recipients: %s\n"
+              "headers: %s\n"
+              "subject: %s\n"
+              "body:\n%s\n"
+              "html:\n%s\n"
+              % (' '.join(recipients), headers, subject, body, html_body))
+
+    if smtp_server:
+        log.debug("Sending e-mail. " + logmsg)
+    else:
+        log.error("SMTP mail server not configured - cannot send e-mail.")
+        log.warning(logmsg)
+        return False
+
+    msg = email.message.EmailMessage()
+    msg['Subject'] = subject
+    msg['From'] = app_email_from  # fallback - might be overridden by a header
+    msg['To'] = ', '.join(recipients)
+    msg['Date'] = email.utils.formatdate(time.time())
+
+    for key, value in headers.items():
+        del msg[key]  # Delete key first to make sure add_header will replace header (if any), no matter the casing
+        msg.add_header(key, value)
+
+    msg.set_content(body)
+    msg.add_alternative(html_body, subtype='html')
+
+    try:
+        if smtp_use_ssl:
+            smtp_serv = smtplib.SMTP_SSL(smtp_server, smtp_port)
+        else:
+            smtp_serv = smtplib.SMTP(smtp_server, smtp_port)
+
+        if smtp_use_tls:
+            smtp_serv.starttls()
+
+        if smtp_auth:
+            smtp_serv.ehlo()  # populate esmtp_features
+            smtp_serv.esmtp_features["auth"] = smtp_auth
+
+        if smtp_username and smtp_password is not None:
+            smtp_serv.login(smtp_username, smtp_password)
+
+        smtp_serv.sendmail(app_email_from, recipients, msg.as_string())
+        smtp_serv.quit()
+
+        log.info('Mail was sent to: %s' % recipients)
+    except:
+        log.error('Mail sending failed')
+        log.error(traceback.format_exc())
+        return False
+    return True
+
+
+@celerylib.task
+@celerylib.dbsession
+def create_repo(form_data, cur_user):
+    DBS = celerylib.get_session()
+
+    cur_user = db.User.guess_instance(cur_user)
+
+    owner = cur_user
+    repo_name = form_data['repo_name']
+    repo_name_full = form_data['repo_name_full']
+    repo_type = form_data['repo_type']
+    description = form_data['repo_description']
+    private = form_data['repo_private']
+    clone_uri = form_data.get('clone_uri')
+    repo_group = form_data['repo_group']
+    landing_rev = form_data['repo_landing_rev']
+    copy_fork_permissions = form_data.get('copy_permissions')
+    copy_group_permissions = form_data.get('repo_copy_permissions')
+    fork_of = form_data.get('fork_parent_id')
+    state = form_data.get('repo_state', db.Repository.STATE_PENDING)
+
+    # repo creation defaults, private and repo_type are filled in form
+    defs = db.Setting.get_default_repo_settings(strip_prefix=True)
+    enable_statistics = defs.get('repo_enable_statistics')
+    enable_downloads = defs.get('repo_enable_downloads')
+
+    try:
+        db_repo = repo.RepoModel()._create_repo(
+            repo_name=repo_name_full,
+            repo_type=repo_type,
+            description=description,
+            owner=owner,
+            private=private,
+            clone_uri=clone_uri,
+            repo_group=repo_group,
+            landing_rev=landing_rev,
+            fork_of=fork_of,
+            copy_fork_permissions=copy_fork_permissions,
+            copy_group_permissions=copy_group_permissions,
+            enable_statistics=enable_statistics,
+            enable_downloads=enable_downloads,
+            state=state
+        )
+
+        userlog.action_logger(cur_user, 'user_created_repo',
+                      form_data['repo_name_full'], '')
+
+        DBS.commit()
+        # now create this repo on Filesystem
+        repo.RepoModel()._create_filesystem_repo(
+            repo_name=repo_name,
+            repo_type=repo_type,
+            repo_group=db.RepoGroup.guess_instance(repo_group),
+            clone_uri=clone_uri,
+        )
+        db_repo = db.Repository.get_by_repo_name(repo_name_full)
+        hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
+
+        # update repo changeset caches initially
+        db_repo.update_changeset_cache()
+
+        # set new created state
+        db_repo.set_state(db.Repository.STATE_CREATED)
+        DBS.commit()
+    except Exception as e:
+        log.warning('Exception %s occurred when forking repository, '
+                    'doing cleanup...' % e)
+        # rollback things manually !
+        db_repo = db.Repository.get_by_repo_name(repo_name_full)
+        if db_repo:
+            db.Repository.delete(db_repo.repo_id)
+            DBS.commit()
+            repo.RepoModel()._delete_filesystem_repo(db_repo)
+        raise
+
+    return True
+
+
+@celerylib.task
+@celerylib.dbsession
+def create_repo_fork(form_data, cur_user):
+    """
+    Creates a fork of repository using interval VCS methods
+
+    :param form_data:
+    :param cur_user:
+    """
+    DBS = celerylib.get_session()
+
+    base_path = kallithea.CONFIG['base_path']
+    cur_user = db.User.guess_instance(cur_user)
+
+    repo_name = form_data['repo_name']  # fork in this case
+    repo_name_full = form_data['repo_name_full']
+
+    repo_type = form_data['repo_type']
+    owner = cur_user
+    private = form_data['private']
+    clone_uri = form_data.get('clone_uri')
+    repo_group = form_data['repo_group']
+    landing_rev = form_data['landing_rev']
+    copy_fork_permissions = form_data.get('copy_permissions')
+
+    try:
+        fork_of = db.Repository.guess_instance(form_data.get('fork_parent_id'))
+
+        repo.RepoModel()._create_repo(
+            repo_name=repo_name_full,
+            repo_type=repo_type,
+            description=form_data['description'],
+            owner=owner,
+            private=private,
+            clone_uri=clone_uri,
+            repo_group=repo_group,
+            landing_rev=landing_rev,
+            fork_of=fork_of,
+            copy_fork_permissions=copy_fork_permissions
+        )
+        userlog.action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
+                      fork_of.repo_name, '')
+        DBS.commit()
+
+        source_repo_path = os.path.join(base_path, fork_of.repo_name)
+
+        # now create this repo on Filesystem
+        repo.RepoModel()._create_filesystem_repo(
+            repo_name=repo_name,
+            repo_type=repo_type,
+            repo_group=db.RepoGroup.guess_instance(repo_group),
+            clone_uri=source_repo_path,
+        )
+        db_repo = db.Repository.get_by_repo_name(repo_name_full)
+        hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
+
+        # update repo changeset caches initially
+        db_repo.update_changeset_cache()
+
+        # set new created state
+        db_repo.set_state(db.Repository.STATE_CREATED)
+        DBS.commit()
+    except Exception as e:
+        log.warning('Exception %s occurred when forking repository, '
+                    'doing cleanup...' % e)
+        # rollback things manually !
+        db_repo = db.Repository.get_by_repo_name(repo_name_full)
+        if db_repo:
+            db.Repository.delete(db_repo.repo_id)
+            DBS.commit()
+            repo.RepoModel()._delete_filesystem_repo(db_repo)
+        raise
+
+    return True
+
+
+def __get_codes_stats(repo_name):
+    scm_repo = db.Repository.get_by_repo_name(repo_name).scm_instance
+
+    tip = scm_repo.get_changeset()
+    code_stats = {}
+
+    for _topnode, _dirnodes, filenodes in tip.walk('/'):
+        for filenode in filenodes:
+            ext = filenode.extension.lower()
+            if ext in conf.LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
+                if ext in code_stats:
+                    code_stats[ext] += 1
+                else:
+                    code_stats[ext] = 1
+
+    return code_stats or {}
--- a/kallithea/model/notification.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/model/notification.py	Sat Nov 07 18:42:58 2020 +0100
@@ -34,7 +34,7 @@
 from tg.i18n import ugettext as _
 
 from kallithea.lib.utils2 import fmt_date
-from kallithea.model import db
+from kallithea.model import async_tasks, db
 
 
 log = logging.getLogger(__name__)
@@ -66,7 +66,6 @@
         :param email_kwargs: additional dict to pass as args to email template
         """
         import kallithea.lib.helpers as h
-        from kallithea.lib.celerylib import tasks
         email_kwargs = email_kwargs or {}
         if recipients and not getattr(recipients, '__iter__', False):
             raise Exception('recipients must be a list or iterable')
@@ -135,7 +134,7 @@
 
         # send email with notification to participants
         for rec_mail in sorted(rec_mails):
-            tasks.send_email([rec_mail], email_subject, email_txt_body,
+            async_tasks.send_email([rec_mail], email_subject, email_txt_body,
                      email_html_body, headers,
                      from_name=created_by_obj.full_name_or_username)
 
--- a/kallithea/model/repo.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/model/repo.py	Sat Nov 07 18:42:58 2020 +0100
@@ -405,8 +405,8 @@
         :param form_data:
         :param cur_user:
         """
-        from kallithea.lib.celerylib import tasks
-        return tasks.create_repo(form_data, cur_user)
+        from kallithea.model import async_tasks
+        return async_tasks.create_repo(form_data, cur_user)
 
     def _update_permissions(self, repo, perms_new=None, perms_updates=None,
                             check_perms=True):
@@ -448,8 +448,8 @@
         :param form_data:
         :param cur_user:
         """
-        from kallithea.lib.celerylib import tasks
-        return tasks.create_repo_fork(form_data, cur_user)
+        from kallithea.model import async_tasks
+        return async_tasks.create_repo_fork(form_data, cur_user)
 
     def delete(self, repo, forks=None, fs_remove=True, cur_user=None):
         """
--- a/kallithea/model/user.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/model/user.py	Sat Nov 07 18:42:58 2020 +0100
@@ -298,8 +298,7 @@
         allowing users to copy-paste or manually enter the token from the
         email.
         """
-        from kallithea.lib.celerylib import tasks
-        from kallithea.model import notification
+        from kallithea.model import async_tasks, notification
 
         user_email = data['email']
         user = db.User.get_by_email(user_email)
@@ -332,7 +331,7 @@
                 reset_token=token,
                 reset_url=link)
             log.debug('sending email')
-            tasks.send_email([user_email], _("Password reset link"), body, html_body)
+            async_tasks.send_email([user_email], _("Password reset link"), body, html_body)
             log.info('send new password mail to %s', user_email)
         else:
             log.debug("password reset email %s not found", user_email)
@@ -365,7 +364,7 @@
         return expected_token == token
 
     def reset_password(self, user_email, new_passwd):
-        from kallithea.lib.celerylib import tasks
+        from kallithea.model import async_tasks
         user = db.User.get_by_email(user_email)
         if user is not None:
             if not self.can_change_password(user):
@@ -376,7 +375,7 @@
         if new_passwd is None:
             raise Exception('unable to set new password')
 
-        tasks.send_email([user_email],
+        async_tasks.send_email([user_email],
                  _('Password reset notification'),
                  _('The password to your account %s has been changed using password reset form.') % (user.username,))
         log.info('send password reset mail to %s', user_email)
--- a/kallithea/tests/functional/test_login.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/tests/functional/test_login.py	Sat Nov 07 18:42:58 2020 +0100
@@ -6,7 +6,7 @@
 import mock
 from tg.util.webtest import test_context
 
-import kallithea.lib.celerylib.tasks
+import kallithea.model.async_tasks
 from kallithea.lib import webutils
 from kallithea.lib.utils2 import check_password, generate_api_key
 from kallithea.model import db, meta, validators
@@ -410,7 +410,7 @@
         def mock_send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
             collected.append((recipients, subject, body, html_body))
 
-        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', mock_send_email), \
+        with mock.patch.object(kallithea.model.async_tasks, 'send_email', mock_send_email), \
                 mock.patch.object(time, 'time', lambda: timestamp):
             response = self.app.post(base.url(controller='login',
                                          action='password_reset'),
--- a/kallithea/tests/models/test_notifications.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/tests/models/test_notifications.py	Sat Nov 07 18:42:58 2020 +0100
@@ -5,8 +5,8 @@
 from tg.util.webtest import test_context
 
 import kallithea.lib.celerylib
-import kallithea.lib.celerylib.tasks
 import kallithea.lib.helpers as h
+import kallithea.model.async_tasks
 from kallithea.model import db, meta
 from kallithea.model.notification import EmailNotificationModel, NotificationModel
 from kallithea.model.user import UserModel
@@ -48,7 +48,7 @@
                 assert body == "hi there"
                 assert '>hi there<' in html_body
                 assert from_name == 'u1 u1'
-            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
+            with mock.patch.object(kallithea.model.async_tasks, 'send_email', send_email):
                 NotificationModel().create(created_by=self.u1,
                                                    body='hi there',
                                                    recipients=usrs)
@@ -73,7 +73,7 @@
             l.append('<hr/>\n')
 
         with test_context(self.app):
-            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
+            with mock.patch.object(kallithea.model.async_tasks, 'send_email', send_email):
                 pr_kwargs = dict(
                     pr_nice_id='#7',
                     pr_title='The Title',
@@ -155,7 +155,7 @@
                 # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
                 desc = 'TYPE_PASSWORD_RESET'
                 kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
-                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
+                kallithea.model.async_tasks.send_email(['john@doe.com'],
                     "Password reset link",
                     EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
                     EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
--- a/kallithea/tests/other/test_mail.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/kallithea/tests/other/test_mail.py	Sat Nov 07 18:42:58 2020 +0100
@@ -25,7 +25,7 @@
         smtplib_mock.lastmsg = msg
 
 
-@mock.patch('kallithea.lib.celerylib.tasks.smtplib', smtplib_mock)
+@mock.patch('kallithea.model.async_tasks.smtplib', smtplib_mock)
 class TestMail(base.TestController):
 
     def test_send_mail_trivial(self):
@@ -40,8 +40,8 @@
             'smtp_server': mailserver,
             'app_email_from': envelope_from,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body)
 
         assert smtplib_mock.lastdest == set(recipients)
         assert smtplib_mock.lastsender == envelope_from
@@ -64,8 +64,8 @@
             'app_email_from': envelope_from,
             'email_to': email_to,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body)
 
         assert smtplib_mock.lastdest == set([base.TEST_USER_ADMIN_EMAIL, email_to])
         assert smtplib_mock.lastsender == envelope_from
@@ -88,8 +88,8 @@
             'app_email_from': envelope_from,
             'email_to': email_to,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body)
 
         assert smtplib_mock.lastdest == set([base.TEST_USER_ADMIN_EMAIL] + email_to.split(','))
         assert smtplib_mock.lastsender == envelope_from
@@ -110,8 +110,8 @@
             'smtp_server': mailserver,
             'app_email_from': envelope_from,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body)
 
         assert smtplib_mock.lastdest == set([base.TEST_USER_ADMIN_EMAIL])
         assert smtplib_mock.lastsender == envelope_from
@@ -133,8 +133,8 @@
             'smtp_server': mailserver,
             'app_email_from': envelope_from,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body, from_name=author.full_name_or_username)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body, from_name=author.full_name_or_username)
 
         assert smtplib_mock.lastdest == set(recipients)
         assert smtplib_mock.lastsender == envelope_from
@@ -157,8 +157,8 @@
             'smtp_server': mailserver,
             'app_email_from': envelope_from,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body, from_name=author.full_name_or_username)
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body, from_name=author.full_name_or_username)
 
         assert smtplib_mock.lastdest == set(recipients)
         assert smtplib_mock.lastsender == envelope_from
@@ -181,8 +181,8 @@
             'smtp_server': mailserver,
             'app_email_from': envelope_from,
         }
-        with mock.patch('kallithea.lib.celerylib.tasks.config', config_mock):
-            kallithea.lib.celerylib.tasks.send_email(recipients, subject, body, html_body,
+        with mock.patch('kallithea.model.async_tasks.config', config_mock):
+            kallithea.model.async_tasks.send_email(recipients, subject, body, html_body,
                                                      from_name=author.full_name_or_username, headers=headers)
 
         assert smtplib_mock.lastdest == set(recipients)
--- a/scripts/deps.py	Sat Dec 12 23:09:44 2020 +0100
+++ b/scripts/deps.py	Sat Nov 07 18:42:58 2020 +0100
@@ -131,7 +131,6 @@
 
 normal_modules = set('''
 kallithea
-kallithea.lib.celerylib.tasks
 kallithea.lib
 kallithea.lib.auth
 kallithea.lib.auth_modules
@@ -146,6 +145,7 @@
 kallithea.lib.vcs
 kallithea.lib.webutils
 kallithea.model
+kallithea.model.async_tasks
 kallithea.model.scm
 kallithea.templates.py
 '''.split())
@@ -158,10 +158,10 @@
 ('kallithea.lib.utils', 'kallithea.model'),  # clean up utils
 ('kallithea.lib.utils', 'kallithea.model.db'),
 ('kallithea.lib.utils', 'kallithea.model.scm'),
-('kallithea.lib.celerylib.tasks', 'kallithea.lib.helpers'),
-('kallithea.lib.celerylib.tasks', 'kallithea.lib.hooks'),
-('kallithea.lib.celerylib.tasks', 'kallithea.lib.indexers'),
-('kallithea.lib.celerylib.tasks', 'kallithea.model'),
+('kallithea.model.async_tasks', 'kallithea.lib.helpers'),
+('kallithea.model.async_tasks', 'kallithea.lib.hooks'),
+('kallithea.model.async_tasks', 'kallithea.lib.indexers'),
+('kallithea.model.async_tasks', 'kallithea.model'),
 ('kallithea.model', 'kallithea.lib.auth'),  # auth.HasXXX
 ('kallithea.model', 'kallithea.lib.auth_modules'),  # validators
 ('kallithea.model', 'kallithea.lib.helpers'),