changeset 1937:afe8cfa32a0f

backported #340 session cleanup for celery tasks
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 21 Jan 2012 07:09:18 +0200
parents ba445351cf5e
children bab80d1436fb
files rhodecode/__init__.py rhodecode/lib/celerylib/__init__.py rhodecode/lib/celerylib/tasks.py
diffstat 3 files changed, 37 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/__init__.py	Sat Jan 21 07:08:43 2012 +0200
+++ b/rhodecode/__init__.py	Sat Jan 21 07:09:18 2012 +0200
@@ -25,7 +25,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 import platform
 
-VERSION = (1, 2, 4)
+VERSION = (1, 2, 5)
 __version__ = '.'.join((str(each) for each in VERSION[:4]))
 __dbversion__ = 3  # defines current db version for migrations
 __platform__ = platform.system()
--- a/rhodecode/lib/celerylib/__init__.py	Sat Jan 21 07:08:43 2012 +0200
+++ b/rhodecode/lib/celerylib/__init__.py	Sat Jan 21 07:09:18 2012 +0200
@@ -7,7 +7,7 @@
 
     :created_on: Nov 27, 2010
     :author: marcink
-    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
     :license: GPLv3, see COPYING for more details.
 """
 # This program is free software: you can redistribute it and/or modify
@@ -29,19 +29,23 @@
 import traceback
 import logging
 from os.path import dirname as dn, join as jn
+from pylons import config
 
 from hashlib import md5
 from decorator import decorator
-from pylons import  config
 
 from vcs.utils.lazy import LazyProperty
 
 from rhodecode.lib import str2bool, safe_str
 from rhodecode.lib.pidlock import DaemonLock, LockHeld
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import Statistics, Repository, User
+
+from sqlalchemy import engine_from_config
 
 from celery.messaging import establish_connection
 
-
 log = logging.getLogger(__name__)
 
 try:
@@ -107,3 +111,23 @@
             return 'Task with key %s already running' % lockkey
 
     return decorator(__wrapper, func)
+
+
+def get_session():
+    if CELERY_ON:
+        engine = engine_from_config(config, 'sqlalchemy.db1.')
+        init_model(engine)
+    sa = meta.Session
+    return sa
+
+
+def dbsession(func):
+    def __wrapper(func, *fargs, **fkwargs):
+        try:
+            ret = func(*fargs, **fkwargs)
+            return ret
+        finally:
+            if CELERY_ON:
+                meta.Session.remove()
+
+    return decorator(__wrapper, func)
--- a/rhodecode/lib/celerylib/tasks.py	Sat Jan 21 07:08:43 2012 +0200
+++ b/rhodecode/lib/celerylib/tasks.py	Sat Jan 21 07:09:18 2012 +0200
@@ -8,7 +8,7 @@
 
     :created_on: Oct 6, 2010
     :author: marcink
-    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
+    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
     :license: GPLv3, see COPYING for more details.
 """
 # This program is free software: you can redistribute it and/or modify
@@ -39,19 +39,16 @@
 
 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
-    __get_lockkey, LockHeld, DaemonLock
+    __get_lockkey, LockHeld, DaemonLock, get_session, dbsession
 from rhodecode.lib.helpers import person
 from rhodecode.lib.smtp_mailer import SmtpMailer
 from rhodecode.lib.utils import add_cache
 from rhodecode.lib.compat import json, OrderedDict
 
-from rhodecode.model import init_model
-from rhodecode.model import meta
 from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User
 
 from vcs.backends import get_repo
-
-from sqlalchemy import engine_from_config
+from vcs import get_backend
 
 add_cache(config)
 
@@ -61,15 +58,6 @@
 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
 
-def get_session():
-    if CELERY_ON:
-        engine = engine_from_config(config, 'sqlalchemy.db1.')
-        init_model(engine)
-
-    sa = meta.Session()
-    return sa
-
-
 def get_repos_path():
     sa = get_session()
     q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
@@ -78,6 +66,7 @@
 
 @task(ignore_result=True)
 @locked_task
+@dbsession
 def whoosh_index(repo_location, full_index):
     #log = whoosh_index.get_logger()
     from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
@@ -88,6 +77,7 @@
 
 
 @task(ignore_result=True)
+@dbsession
 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
     try:
         log = get_commits_stats.get_logger()
@@ -248,6 +238,7 @@
         return 'Task with key %s already running' % lockkey
 
 @task(ignore_result=True)
+@dbsession
 def send_password_link(user_email):
     try:
         log = reset_user_password.get_logger()
@@ -255,7 +246,6 @@
         log = logging.getLogger(__name__)
 
     from rhodecode.lib import auth
-    from rhodecode.model.db import User
 
     try:
         sa = get_session()
@@ -288,6 +278,7 @@
     return True
 
 @task(ignore_result=True)
+@dbsession
 def reset_user_password(user_email):
     try:
         log = reset_user_password.get_logger()
@@ -295,7 +286,6 @@
         log = logging.getLogger(__name__)
 
     from rhodecode.lib import auth
-    from rhodecode.model.db import User
 
     try:
         try:
@@ -329,6 +319,7 @@
 
 
 @task(ignore_result=True)
+@dbsession
 def send_email(recipients, subject, body):
     """
     Sends an email with defined parameters from the .ini files.
@@ -375,9 +366,9 @@
 
 
 @task(ignore_result=True)
+@dbsession
 def create_repo_fork(form_data, cur_user):
     from rhodecode.model.repo import RepoModel
-    from vcs import get_backend
 
     try:
         log = create_repo_fork.get_logger()