changeset 1264:0c43c6671815 beta

moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
author Marcin Kuzminski <marcin@python-works.com>
date Sun, 17 Apr 2011 14:45:16 +0200
parents 83714588272c
children 08ac2c3ae810
files rhodecode/lib/celerylib/__init__.py rhodecode/lib/celerylib/tasks.py
diffstat 2 files changed, 147 insertions(+), 122 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/lib/celerylib/__init__.py	Sat Apr 16 22:03:19 2011 +0200
+++ b/rhodecode/lib/celerylib/__init__.py	Sun Apr 17 14:45:16 2011 +0200
@@ -48,6 +48,7 @@
 except KeyError:
     CELERY_ON = False
 
+
 class ResultWrapper(object):
     def __init__(self, task):
         self.task = task
@@ -56,12 +57,14 @@
     def result(self):
         return self.task
 
+
 def run_task(task, *args, **kwargs):
     if CELERY_ON:
         try:
             t = task.apply_async(args=args, kwargs=kwargs)
             log.info('running task %s:%s', t.task_id, task)
             return t
+
         except socket.error, e:
             if  e.errno == 111:
                 log.debug('Unable to connect to celeryd. Sync execution')
@@ -76,14 +79,20 @@
     return ResultWrapper(task(*args, **kwargs))
 
 
+def __get_lockkey(func, *fargs, **fkwargs):
+    params = list(fargs)
+    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
+
+    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
+
+    lockkey = 'task_%s' % \
+        md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
+    return lockkey
+
+
 def locked_task(func):
     def __wrapper(func, *fargs, **fkwargs):
-        params = list(fargs)
-        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
-
-        lockkey = 'task_%s' % \
-            md5(str(func.__name__) + '-' + \
-                '-'.join(map(str, params))).hexdigest()
+        lockkey = __get_lockkey(func, *fargs, **fkwargs)
         log.info('running task with lockkey %s', lockkey)
         try:
             l = DaemonLock(lockkey)
--- a/rhodecode/lib/celerylib/tasks.py	Sat Apr 16 22:03:19 2011 +0200
+++ b/rhodecode/lib/celerylib/tasks.py	Sun Apr 17 14:45:16 2011 +0200
@@ -37,13 +37,14 @@
 from pylons import config
 from pylons.i18n.translation import _
 
-from rhodecode.lib.celerylib import run_task, locked_task, str2bool
+from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
+    __get_lockkey, LockHeld, DaemonLock
 from rhodecode.lib.helpers import person
 from rhodecode.lib.smtp_mailer import SmtpMailer
 from rhodecode.lib.utils import OrderedDict, add_cache
 from rhodecode.model import init_model
 from rhodecode.model import meta
-from rhodecode.model.db import RhodeCodeUi
+from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
 
 from vcs.backends import get_repo
 
@@ -125,146 +126,162 @@
 
 
 @task(ignore_result=True)
-@locked_task
 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
     try:
         log = get_commits_stats.get_logger()
     except:
         log = logging.getLogger(__name__)
 
-    from rhodecode.model.db import Statistics, Repository
+    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
+                            ts_max_y)
+    log.info('running task with lockkey %s', lockkey)
+    try:
+        lock = DaemonLock(lockkey)
 
-    #for js data compatibilty
-    akc = lambda k: person(k).replace('"', "")
+        #for js data compatibilty cleans the key for person from '
+        akc = lambda k: person(k).replace('"', "")
 
-    co_day_auth_aggr = {}
-    commits_by_day_aggregate = {}
-    repos_path = get_repos_path()
-    p = os.path.join(repos_path, repo_name)
-    repo = get_repo(p)
+        co_day_auth_aggr = {}
+        commits_by_day_aggregate = {}
+        repos_path = get_repos_path()
+        p = os.path.join(repos_path, repo_name)
+        repo = get_repo(p)
+        repo_size = len(repo.revisions)
+        #return if repo have no revisions
+        if repo_size < 1:
+            lock.release()
+            return True
 
-    skip_date_limit = True
-    parse_limit = int(config['app_conf'].get('commit_parse_limit'))
-    last_rev = 0
-    last_cs = None
-    timegetter = itemgetter('time')
+        skip_date_limit = True
+        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
+        last_rev = 0
+        last_cs = None
+        timegetter = itemgetter('time')
 
-    sa = get_session()
+        sa = get_session()
 
-    dbrepo = sa.query(Repository)\
-        .filter(Repository.repo_name == repo_name).scalar()
-    cur_stats = sa.query(Statistics)\
-        .filter(Statistics.repository == dbrepo).scalar()
+        dbrepo = sa.query(Repository)\
+            .filter(Repository.repo_name == repo_name).scalar()
+        cur_stats = sa.query(Statistics)\
+            .filter(Statistics.repository == dbrepo).scalar()
 
-    if cur_stats is not None:
-        last_rev = cur_stats.stat_on_revision
+        if cur_stats is not None:
+            last_rev = cur_stats.stat_on_revision
 
-    #return if repo is empty
-    if not repo.revisions:
-        return True
+        if last_rev == repo.get_changeset().revision and repo_size > 1:
+            #pass silently without any work if we're not on first revision or
+            #current state of parsing revision(from db marker) is the
+            #last revision
+            lock.release()
+            return True
 
-    if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1:
-        #pass silently without any work if we're not on first revision or
-        #current state of parsing revision(from db marker) is the last revision
-        return True
+        if cur_stats:
+            commits_by_day_aggregate = OrderedDict(json.loads(
+                                        cur_stats.commit_activity_combined))
+            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 
-    if cur_stats:
-        commits_by_day_aggregate = OrderedDict(
-                                       json.loads(
-                                        cur_stats.commit_activity_combined))
-        co_day_auth_aggr = json.loads(cur_stats.commit_activity)
+        log.debug('starting parsing %s', parse_limit)
+        lmktime = mktime
+
+        last_rev = last_rev + 1 if last_rev > 0 else last_rev
 
-    log.debug('starting parsing %s', parse_limit)
-    lmktime = mktime
-
-    last_rev = last_rev + 1 if last_rev > 0 else last_rev
+        for cs in repo[last_rev:last_rev + parse_limit]:
+            last_cs = cs  # remember last parsed changeset
+            k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
+                          cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 
-    for cs in repo[last_rev:last_rev + parse_limit]:
-        last_cs = cs  # remember last parsed changeset
-        k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
-                      cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
+            if akc(cs.author) in co_day_auth_aggr:
+                try:
+                    l = [timegetter(x) for x in
+                         co_day_auth_aggr[akc(cs.author)]['data']]
+                    time_pos = l.index(k)
+                except ValueError:
+                    time_pos = False
+
+                if time_pos >= 0 and time_pos is not False:
+
+                    datadict = \
+                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 
-        if akc(cs.author) in co_day_auth_aggr:
-            try:
-                l = [timegetter(x) for x in
-                     co_day_auth_aggr[akc(cs.author)]['data']]
-                time_pos = l.index(k)
-            except ValueError:
-                time_pos = False
+                    datadict["commits"] += 1
+                    datadict["added"] += len(cs.added)
+                    datadict["changed"] += len(cs.changed)
+                    datadict["removed"] += len(cs.removed)
+
+                else:
+                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
-            if time_pos >= 0 and time_pos is not False:
-
-                datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
-
-                datadict["commits"] += 1
-                datadict["added"] += len(cs.added)
-                datadict["changed"] += len(cs.changed)
-                datadict["removed"] += len(cs.removed)
+                        datadict = {"time": k,
+                                    "commits": 1,
+                                    "added": len(cs.added),
+                                    "changed": len(cs.changed),
+                                    "removed": len(cs.removed),
+                                   }
+                        co_day_auth_aggr[akc(cs.author)]['data']\
+                            .append(datadict)
 
             else:
                 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-
-                    datadict = {"time": k,
-                                "commits": 1,
-                                "added": len(cs.added),
-                                "changed": len(cs.changed),
-                                "removed": len(cs.removed),
-                               }
-                    co_day_auth_aggr[akc(cs.author)]['data']\
-                        .append(datadict)
+                    co_day_auth_aggr[akc(cs.author)] = {
+                                        "label": akc(cs.author),
+                                        "data": [{"time":k,
+                                                 "commits":1,
+                                                 "added":len(cs.added),
+                                                 "changed":len(cs.changed),
+                                                 "removed":len(cs.removed),
+                                                 }],
+                                        "schema": ["commits"],
+                                        }
 
-        else:
-            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-                co_day_auth_aggr[akc(cs.author)] = {
-                                    "label": akc(cs.author),
-                                    "data": [{"time":k,
-                                             "commits":1,
-                                             "added":len(cs.added),
-                                             "changed":len(cs.changed),
-                                             "removed":len(cs.removed),
-                                             }],
-                                    "schema": ["commits"],
-                                    }
+            #gather all data by day
+            if k in commits_by_day_aggregate:
+                commits_by_day_aggregate[k] += 1
+            else:
+                commits_by_day_aggregate[k] = 1
 
-        #gather all data by day
-        if k in commits_by_day_aggregate:
-            commits_by_day_aggregate[k] += 1
-        else:
-            commits_by_day_aggregate[k] = 1
+        overview_data = sorted(commits_by_day_aggregate.items(),
+                               key=itemgetter(0))
+
+        if not co_day_auth_aggr:
+            co_day_auth_aggr[akc(repo.contact)] = {
+                "label": akc(repo.contact),
+                "data": [0, 1],
+                "schema": ["commits"],
+            }
 
-    overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0))
-    if not co_day_auth_aggr:
-        co_day_auth_aggr[akc(repo.contact)] = {
-            "label": akc(repo.contact),
-            "data": [0, 1],
-            "schema": ["commits"],
-        }
+        stats = cur_stats if cur_stats else Statistics()
+        stats.commit_activity = json.dumps(co_day_auth_aggr)
+        stats.commit_activity_combined = json.dumps(overview_data)
 
-    stats = cur_stats if cur_stats else Statistics()
-    stats.commit_activity = json.dumps(co_day_auth_aggr)
-    stats.commit_activity_combined = json.dumps(overview_data)
+        log.debug('last revison %s', last_rev)
+        leftovers = len(repo.revisions[last_rev:])
+        log.debug('revisions to parse %s', leftovers)
 
-    log.debug('last revison %s', last_rev)
-    leftovers = len(repo.revisions[last_rev:])
-    log.debug('revisions to parse %s', leftovers)
+        if last_rev == 0 or leftovers < parse_limit:
+            log.debug('getting code trending stats')
+            stats.languages = json.dumps(__get_codes_stats(repo_name))
 
-    if last_rev == 0 or leftovers < parse_limit:
-        log.debug('getting code trending stats')
-        stats.languages = json.dumps(__get_codes_stats(repo_name))
+        try:
+            stats.repository = dbrepo
+            stats.stat_on_revision = last_cs.revision if last_cs else 0
+            sa.add(stats)
+            sa.commit()
+        except:
+            log.error(traceback.format_exc())
+            sa.rollback()
+            lock.release()
+            return False
 
-    try:
-        stats.repository = dbrepo
-        stats.stat_on_revision = last_cs.revision if last_cs else 0
-        sa.add(stats)
-        sa.commit()
-    except:
-        log.error(traceback.format_exc())
-        sa.rollback()
-        return False
-    if len(repo.revisions) > 1:
-        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+        #final release
+        lock.release()
 
-    return True
+        #execute another task if celery is enabled
+        if len(repo.revisions) > 1 and CELERY_ON:
+            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+        return True
+    except LockHeld:
+        log.info('LockHeld')
+        return 'Task with key %s already running' % lockkey
 
 
 @task(ignore_result=True)
@@ -313,7 +330,6 @@
     """
     Sends an email with defined parameters from the .ini files.
 
-
     :param recipients: list of recipients, it this is empty the defined email
         address from field 'email_to' is used instead
     :param subject: subject of the mail
@@ -351,14 +367,14 @@
 
 @task(ignore_result=True)
 def create_repo_fork(form_data, cur_user):
+    from rhodecode.model.repo import RepoModel
+    from vcs import get_backend
+
     try:
         log = create_repo_fork.get_logger()
     except:
         log = logging.getLogger(__name__)
 
-    from rhodecode.model.repo import RepoModel
-    from vcs import get_backend
-
     repo_model = RepoModel(get_session())
     repo_model.create(form_data, cur_user, just_db=True, fork=True)
     repo_name = form_data['repo_name']