Mercurial > kallithea
changeset 1264:0c43c6671815 beta
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Sun, 17 Apr 2011 14:45:16 +0200 |
parents | 83714588272c |
children | 08ac2c3ae810 |
files | rhodecode/lib/celerylib/__init__.py rhodecode/lib/celerylib/tasks.py |
diffstat | 2 files changed, 147 insertions(+), 122 deletions(-) [+] |
line wrap: on
line diff
--- a/rhodecode/lib/celerylib/__init__.py Sat Apr 16 22:03:19 2011 +0200 +++ b/rhodecode/lib/celerylib/__init__.py Sun Apr 17 14:45:16 2011 +0200 @@ -48,6 +48,7 @@ except KeyError: CELERY_ON = False + class ResultWrapper(object): def __init__(self, task): self.task = task @@ -56,12 +57,14 @@ def result(self): return self.task + def run_task(task, *args, **kwargs): if CELERY_ON: try: t = task.apply_async(args=args, kwargs=kwargs) log.info('running task %s:%s', t.task_id, task) return t + except socket.error, e: if e.errno == 111: log.debug('Unable to connect to celeryd. Sync execution') @@ -76,14 +79,20 @@ return ResultWrapper(task(*args, **kwargs)) +def __get_lockkey(func, *fargs, **fkwargs): + params = list(fargs) + params.extend(['%s-%s' % ar for ar in fkwargs.items()]) + + func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) + + lockkey = 'task_%s' % \ + md5(func_name + '-' + '-'.join(map(str, params))).hexdigest() + return lockkey + + def locked_task(func): def __wrapper(func, *fargs, **fkwargs): - params = list(fargs) - params.extend(['%s-%s' % ar for ar in fkwargs.items()]) - - lockkey = 'task_%s' % \ - md5(str(func.__name__) + '-' + \ - '-'.join(map(str, params))).hexdigest() + lockkey = __get_lockkey(func, *fargs, **fkwargs) log.info('running task with lockkey %s', lockkey) try: l = DaemonLock(lockkey)
--- a/rhodecode/lib/celerylib/tasks.py Sat Apr 16 22:03:19 2011 +0200 +++ b/rhodecode/lib/celerylib/tasks.py Sun Apr 17 14:45:16 2011 +0200 @@ -37,13 +37,14 @@ from pylons import config from pylons.i18n.translation import _ -from rhodecode.lib.celerylib import run_task, locked_task, str2bool +from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ + __get_lockkey, LockHeld, DaemonLock from rhodecode.lib.helpers import person from rhodecode.lib.smtp_mailer import SmtpMailer from rhodecode.lib.utils import OrderedDict, add_cache from rhodecode.model import init_model from rhodecode.model import meta -from rhodecode.model.db import RhodeCodeUi +from rhodecode.model.db import RhodeCodeUi, Statistics, Repository from vcs.backends import get_repo @@ -125,146 +126,162 @@ @task(ignore_result=True) -@locked_task def get_commits_stats(repo_name, ts_min_y, ts_max_y): try: log = get_commits_stats.get_logger() except: log = logging.getLogger(__name__) - from rhodecode.model.db import Statistics, Repository + lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y, + ts_max_y) + log.info('running task with lockkey %s', lockkey) + try: + lock = DaemonLock(lockkey) - #for js data compatibilty - akc = lambda k: person(k).replace('"', "") + #for js data compatibilty cleans the key for person from ' + akc = lambda k: person(k).replace('"', "") - co_day_auth_aggr = {} - commits_by_day_aggregate = {} - repos_path = get_repos_path() - p = os.path.join(repos_path, repo_name) - repo = get_repo(p) + co_day_auth_aggr = {} + commits_by_day_aggregate = {} + repos_path = get_repos_path() + p = os.path.join(repos_path, repo_name) + repo = get_repo(p) + repo_size = len(repo.revisions) + #return if repo have no revisions + if repo_size < 1: + lock.release() + return True - skip_date_limit = True - parse_limit = int(config['app_conf'].get('commit_parse_limit')) - last_rev = 0 - last_cs = None - timegetter = itemgetter('time') + skip_date_limit = True + parse_limit = int(config['app_conf'].get('commit_parse_limit')) + last_rev = 0 + last_cs = None + timegetter = itemgetter('time') - sa = get_session() + sa = get_session() - dbrepo = sa.query(Repository)\ - .filter(Repository.repo_name == repo_name).scalar() - cur_stats = sa.query(Statistics)\ - .filter(Statistics.repository == dbrepo).scalar() + dbrepo = sa.query(Repository)\ + .filter(Repository.repo_name == repo_name).scalar() + cur_stats = sa.query(Statistics)\ + .filter(Statistics.repository == dbrepo).scalar() - if cur_stats is not None: - last_rev = cur_stats.stat_on_revision + if cur_stats is not None: + last_rev = cur_stats.stat_on_revision - #return if repo is empty - if not repo.revisions: - return True + if last_rev == repo.get_changeset().revision and repo_size > 1: + #pass silently without any work if we're not on first revision or + #current state of parsing revision(from db marker) is the + #last revision + lock.release() + return True - if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1: - #pass silently without any work if we're not on first revision or - #current state of parsing revision(from db marker) is the last revision - return True + if cur_stats: + commits_by_day_aggregate = OrderedDict(json.loads( + cur_stats.commit_activity_combined)) + co_day_auth_aggr = json.loads(cur_stats.commit_activity) - if cur_stats: - commits_by_day_aggregate = OrderedDict( - json.loads( - cur_stats.commit_activity_combined)) - co_day_auth_aggr = json.loads(cur_stats.commit_activity) + log.debug('starting parsing %s', parse_limit) + lmktime = mktime + + last_rev = last_rev + 1 if last_rev > 0 else last_rev - log.debug('starting parsing %s', parse_limit) - lmktime = mktime - - last_rev = last_rev + 1 if last_rev > 0 else last_rev + for cs in repo[last_rev:last_rev + parse_limit]: + last_cs = cs # remember last parsed changeset + k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], + cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) - for cs in repo[last_rev:last_rev + parse_limit]: - last_cs = cs # remember last parsed changeset - k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], - cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) + if akc(cs.author) in co_day_auth_aggr: + try: + l = [timegetter(x) for x in + co_day_auth_aggr[akc(cs.author)]['data']] + time_pos = l.index(k) + except ValueError: + time_pos = False + + if time_pos >= 0 and time_pos is not False: + + datadict = \ + co_day_auth_aggr[akc(cs.author)]['data'][time_pos] - if akc(cs.author) in co_day_auth_aggr: - try: - l = [timegetter(x) for x in - co_day_auth_aggr[akc(cs.author)]['data']] - time_pos = l.index(k) - except ValueError: - time_pos = False + datadict["commits"] += 1 + datadict["added"] += len(cs.added) + datadict["changed"] += len(cs.changed) + datadict["removed"] += len(cs.removed) + + else: + if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - if time_pos >= 0 and time_pos is not False: - - datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos] - - datadict["commits"] += 1 - datadict["added"] += len(cs.added) - datadict["changed"] += len(cs.changed) - datadict["removed"] += len(cs.removed) + datadict = {"time": k, + "commits": 1, + "added": len(cs.added), + "changed": len(cs.changed), + "removed": len(cs.removed), + } + co_day_auth_aggr[akc(cs.author)]['data']\ + .append(datadict) else: if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - - datadict = {"time": k, - "commits": 1, - "added": len(cs.added), - "changed": len(cs.changed), - "removed": len(cs.removed), - } - co_day_auth_aggr[akc(cs.author)]['data']\ - .append(datadict) + co_day_auth_aggr[akc(cs.author)] = { + "label": akc(cs.author), + "data": [{"time":k, + "commits":1, + "added":len(cs.added), + "changed":len(cs.changed), + "removed":len(cs.removed), + }], + "schema": ["commits"], + } - else: - if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - co_day_auth_aggr[akc(cs.author)] = { - "label": akc(cs.author), - "data": [{"time":k, - "commits":1, - "added":len(cs.added), - "changed":len(cs.changed), - "removed":len(cs.removed), - }], - "schema": ["commits"], - } + #gather all data by day + if k in commits_by_day_aggregate: + commits_by_day_aggregate[k] += 1 + else: + commits_by_day_aggregate[k] = 1 - #gather all data by day - if k in commits_by_day_aggregate: - commits_by_day_aggregate[k] += 1 - else: - commits_by_day_aggregate[k] = 1 + overview_data = sorted(commits_by_day_aggregate.items(), + key=itemgetter(0)) + + if not co_day_auth_aggr: + co_day_auth_aggr[akc(repo.contact)] = { + "label": akc(repo.contact), + "data": [0, 1], + "schema": ["commits"], + } - overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0)) - if not co_day_auth_aggr: - co_day_auth_aggr[akc(repo.contact)] = { - "label": akc(repo.contact), - "data": [0, 1], - "schema": ["commits"], - } + stats = cur_stats if cur_stats else Statistics() + stats.commit_activity = json.dumps(co_day_auth_aggr) + stats.commit_activity_combined = json.dumps(overview_data) - stats = cur_stats if cur_stats else Statistics() - stats.commit_activity = json.dumps(co_day_auth_aggr) - stats.commit_activity_combined = json.dumps(overview_data) + log.debug('last revison %s', last_rev) + leftovers = len(repo.revisions[last_rev:]) + log.debug('revisions to parse %s', leftovers) - log.debug('last revison %s', last_rev) - leftovers = len(repo.revisions[last_rev:]) - log.debug('revisions to parse %s', leftovers) + if last_rev == 0 or leftovers < parse_limit: + log.debug('getting code trending stats') + stats.languages = json.dumps(__get_codes_stats(repo_name)) - if last_rev == 0 or leftovers < parse_limit: - log.debug('getting code trending stats') - stats.languages = json.dumps(__get_codes_stats(repo_name)) + try: + stats.repository = dbrepo + stats.stat_on_revision = last_cs.revision if last_cs else 0 + sa.add(stats) + sa.commit() + except: + log.error(traceback.format_exc()) + sa.rollback() + lock.release() + return False - try: - stats.repository = dbrepo - stats.stat_on_revision = last_cs.revision if last_cs else 0 - sa.add(stats) - sa.commit() - except: - log.error(traceback.format_exc()) - sa.rollback() - return False - if len(repo.revisions) > 1: - run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) + #final release + lock.release() - return True + #execute another task if celery is enabled + if len(repo.revisions) > 1 and CELERY_ON: + run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) + return True + except LockHeld: + log.info('LockHeld') + return 'Task with key %s already running' % lockkey @task(ignore_result=True) @@ -313,7 +330,6 @@ """ Sends an email with defined parameters from the .ini files. - :param recipients: list of recipients, it this is empty the defined email address from field 'email_to' is used instead :param subject: subject of the mail @@ -351,14 +367,14 @@ @task(ignore_result=True) def create_repo_fork(form_data, cur_user): + from rhodecode.model.repo import RepoModel + from vcs import get_backend + try: log = create_repo_fork.get_logger() except: log = logging.getLogger(__name__) - from rhodecode.model.repo import RepoModel - from vcs import get_backend - repo_model = RepoModel(get_session()) repo_model.create(form_data, cur_user, just_db=True, fork=True) repo_name = form_data['repo_name']