# HG changeset patch # User Marcin Kuzminski # Date 1327122558 -7200 # Node ID afe8cfa32a0f9138e2ab2f6ef7bc41ba58dac778 # Parent ba445351cf5e9a16c699074f192090e02c0d2876 backported #340 session cleanup for celery tasks diff -r ba445351cf5e -r afe8cfa32a0f rhodecode/__init__.py --- a/rhodecode/__init__.py Sat Jan 21 07:08:43 2012 +0200 +++ b/rhodecode/__init__.py Sat Jan 21 07:09:18 2012 +0200 @@ -25,7 +25,7 @@ # along with this program. If not, see . import platform -VERSION = (1, 2, 4) +VERSION = (1, 2, 5) __version__ = '.'.join((str(each) for each in VERSION[:4])) __dbversion__ = 3 # defines current db version for migrations __platform__ = platform.system() diff -r ba445351cf5e -r afe8cfa32a0f rhodecode/lib/celerylib/__init__.py --- a/rhodecode/lib/celerylib/__init__.py Sat Jan 21 07:08:43 2012 +0200 +++ b/rhodecode/lib/celerylib/__init__.py Sat Jan 21 07:09:18 2012 +0200 @@ -7,7 +7,7 @@ :created_on: Nov 27, 2010 :author: marcink - :copyright: (C) 2009-2011 Marcin Kuzminski + :copyright: (C) 2010-2012 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify @@ -29,19 +29,23 @@ import traceback import logging from os.path import dirname as dn, join as jn +from pylons import config from hashlib import md5 from decorator import decorator -from pylons import config from vcs.utils.lazy import LazyProperty from rhodecode.lib import str2bool, safe_str from rhodecode.lib.pidlock import DaemonLock, LockHeld +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import Statistics, Repository, User + +from sqlalchemy import engine_from_config from celery.messaging import establish_connection - log = logging.getLogger(__name__) try: @@ -107,3 +111,23 @@ return 'Task with key %s already running' % lockkey return decorator(__wrapper, func) + + +def get_session(): + if CELERY_ON: + engine = engine_from_config(config, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session + return sa + + +def dbsession(func): + def __wrapper(func, *fargs, **fkwargs): + try: + ret = func(*fargs, **fkwargs) + return ret + finally: + if CELERY_ON: + meta.Session.remove() + + return decorator(__wrapper, func) diff -r ba445351cf5e -r afe8cfa32a0f rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py Sat Jan 21 07:08:43 2012 +0200 +++ b/rhodecode/lib/celerylib/tasks.py Sat Jan 21 07:09:18 2012 +0200 @@ -8,7 +8,7 @@ :created_on: Oct 6, 2010 :author: marcink - :copyright: (C) 2009-2011 Marcin Kuzminski + :copyright: (C) 2010-2012 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify @@ -39,19 +39,16 @@ from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ - __get_lockkey, LockHeld, DaemonLock + __get_lockkey, LockHeld, DaemonLock, get_session, dbsession from rhodecode.lib.helpers import person from rhodecode.lib.smtp_mailer import SmtpMailer from rhodecode.lib.utils import add_cache from rhodecode.lib.compat import json, OrderedDict -from rhodecode.model import init_model -from rhodecode.model import meta from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User from vcs.backends import get_repo - -from sqlalchemy import engine_from_config +from vcs import get_backend add_cache(config) @@ -61,15 +58,6 @@ CELERY_ON = str2bool(config['app_conf'].get('use_celery')) -def get_session(): - if CELERY_ON: - engine = engine_from_config(config, 'sqlalchemy.db1.') - init_model(engine) - - sa = meta.Session() - return sa - - def get_repos_path(): sa = get_session() q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() @@ -78,6 +66,7 @@ @task(ignore_result=True) @locked_task +@dbsession def whoosh_index(repo_location, full_index): #log = whoosh_index.get_logger() from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon @@ -88,6 +77,7 @@ @task(ignore_result=True) +@dbsession def get_commits_stats(repo_name, ts_min_y, ts_max_y): try: log = get_commits_stats.get_logger() @@ -248,6 +238,7 @@ return 'Task with key %s already running' % lockkey @task(ignore_result=True) +@dbsession def send_password_link(user_email): try: log = reset_user_password.get_logger() @@ -255,7 +246,6 @@ log = logging.getLogger(__name__) from rhodecode.lib import auth - from rhodecode.model.db import User try: sa = get_session() @@ -288,6 +278,7 @@ return True @task(ignore_result=True) +@dbsession def reset_user_password(user_email): try: log = reset_user_password.get_logger() @@ -295,7 +286,6 @@ log = logging.getLogger(__name__) from rhodecode.lib import auth - from rhodecode.model.db import User try: try: @@ -329,6 +319,7 @@ @task(ignore_result=True) +@dbsession def send_email(recipients, subject, body): """ Sends an email with defined parameters from the .ini files. @@ -375,9 +366,9 @@ @task(ignore_result=True) +@dbsession def create_repo_fork(form_data, cur_user): from rhodecode.model.repo import RepoModel - from vcs import get_backend try: log = create_repo_fork.get_logger()