# HG changeset patch # User Marcin Kuzminski # Date 1286576539 -7200 # Node ID 3072935bdeed71f42ac0e411a68c2c675513a9b2 # Parent bc4633a41967536202c3bc3f96d35e22d7640efd rewrote whoosh indexing to run internal repository.walk() instead of filesystem. Disabled default hg update hook (not needed since whoosh is not dependent on file system files to index) diff -r bc4633a41967 -r 3072935bdeed rhodecode/lib/db_manage.py --- a/rhodecode/lib/db_manage.py Thu Oct 07 22:01:51 2010 +0200 +++ b/rhodecode/lib/db_manage.py Sat Oct 09 00:22:19 2010 +0200 @@ -115,6 +115,7 @@ hooks1.ui_section = 'hooks' hooks1.ui_key = 'changegroup.update' hooks1.ui_value = 'hg update >&2' + hooks1.ui_active = False hooks2 = RhodeCodeUi() hooks2.ui_section = 'hooks' diff -r bc4633a41967 -r 3072935bdeed rhodecode/lib/indexers/daemon.py --- a/rhodecode/lib/indexers/daemon.py Thu Oct 07 22:01:51 2010 +0200 +++ b/rhodecode/lib/indexers/daemon.py Sat Oct 09 00:22:19 2010 +0200 @@ -39,6 +39,9 @@ from shutil import rmtree from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME +from time import mktime +from vcs.backends import hg + import logging log = logging.getLogger('whooshIndexer') @@ -62,7 +65,9 @@ return HgModel.repo_scan('/', root_location, None, True) class WhooshIndexingDaemon(object): - """Deamon for atomic jobs""" + """ + Deamon for atomic jobs + """ def __init__(self, indexname='HG_INDEX', repo_location=None): self.indexname = indexname @@ -73,55 +78,49 @@ log.info('Cannot run incremental index since it does not' ' yet exist running full build') self.initial = True - + def get_paths(self, root_dir): - """recursive walk in root dir and return a set of all path in that dir - excluding files in .hg dir""" + """ + recursive walk in root dir and return a set of all path in that dir + based on repository walk function + """ + repo = hg.MercurialRepository(root_dir) index_paths_ = set() - for path, dirs, files in os.walk(root_dir): - if path.find('.hg') == -1: + for topnode, dirs, files in repo.walk('/', 'tip'): + for f in files: + index_paths_.add(jn(root_dir, f.path)) + for dir in dirs: for f in files: - index_paths_.add(jn(path, f)) - - return index_paths_ - + index_paths_.add(jn(root_dir, f.path)) + + return index_paths_ + + def add_doc(self, writer, path, repo): """Adding doc to writer""" - - ext = unicode(path.split('/')[-1].split('.')[-1].lower()) - #we just index the content of choosen files - if ext in INDEX_EXTENSIONS: + n_path = path[len(repo.path) + 1:] + node = repo.get_changeset().get_node(n_path) + + #we just index the content of chosen files + if node.extension in INDEX_EXTENSIONS: log.debug(' >> %s [WITH CONTENT]' % path) - fobj = open(path, 'rb') - content = fobj.read() - fobj.close() - u_content = safe_unicode(content) + u_content = node.content else: log.debug(' >> %s' % path) #just index file name without it's content u_content = u'' - - - try: - os.stat(path) - writer.add_document(owner=unicode(repo.contact), - repository=safe_unicode(repo.name), - path=safe_unicode(path), - content=u_content, - modtime=os.path.getmtime(path), - extension=ext) - except OSError, e: - import errno - if e.errno == errno.ENOENT: - log.debug('path %s does not exist or is a broken symlink' % path) - else: - raise e + writer.add_document(owner=unicode(repo.contact), + repository=safe_unicode(repo.name), + path=safe_unicode(path), + content=u_content, + modtime=mktime(node.last_changeset.date.timetuple()), + extension=node.extension) def build_index(self): if os.path.exists(IDX_LOCATION): - log.debug('removing previos index') + log.debug('removing previous index') rmtree(IDX_LOCATION) if not os.path.exists(IDX_LOCATION): diff -r bc4633a41967 -r 3072935bdeed rhodecode/lib/indexers/multiprocessing_indexer.py --- a/rhodecode/lib/indexers/multiprocessing_indexer.py Thu Oct 07 22:01:51 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,176 +0,0 @@ -from multiprocessing import Process, Queue, cpu_count, Lock -import socket, sys -import time -import os -import sys -from os.path import dirname as dn -from multiprocessing.dummy import current_process -from shutil import rmtree - -sys.path.append(dn(dn(dn(os.path.realpath(__file__))))) - -from rhodecode.model.hg_model import HgModel -from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter -from whoosh.fields import TEXT, ID, STORED, Schema -from whoosh.index import create_in, open_dir -from datetime import datetime -from multiprocessing.process import current_process -from multiprocessing import Array, Value - -root = dn(dn(os.path.dirname(os.path.abspath(__file__)))) -idx_location = os.path.join(root, 'data', 'index') -root_path = '/home/marcink/python_workspace_dirty/*' - -exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf', - 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll'] - -my_analyzer = RegexTokenizer() | LowercaseFilter() -def scan_paths(root_location): - return HgModel.repo_scan('/', root_location, None, True) - -def index_paths(root_dir): - index_paths_ = set() - for path, dirs, files in os.walk(root_dir): - if path.find('.hg') == -1: - #if path.find('.hg') == -1 and path.find('bel-epa') != -1: - for f in files: - index_paths_.add(os.path.join(path, f)) - - return index_paths_ - -def get_schema(): - return Schema(owner=TEXT(), - repository=TEXT(stored=True), - path=ID(stored=True, unique=True), - content=TEXT(stored=True, analyzer=my_analyzer), - modtime=STORED()) - -def add_doc(writer, path, repo_name, contact): - """ - Adding doc to writer - @param writer: - @param path: - @param repo: - @param fname: - """ - - #we don't won't to read excluded file extensions just index them - if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions: - fobj = open(path, 'rb') - content = fobj.read() - fobj.close() - try: - u_content = unicode(content) - except UnicodeDecodeError: - #incase we have a decode error just represent as byte string - u_content = unicode(str(content).encode('string_escape')) - else: - u_content = u'' - writer.add_document(repository=u"%s" % repo_name, - owner=unicode(contact), - path=u"%s" % path, - content=u_content, - modtime=os.path.getmtime(path)) - - -class MultiProcessIndexer(object): - """ multiprocessing whoosh indexer """ - - def __init__(self, idx, work_set=set(), nr_processes=cpu_count()): - q = Queue() - l = Lock() - work_set = work_set - writer = None - #writer = idx.writer() - - for q_task in work_set: - q.put(q_task) - - q.put('COMMIT') - - #to stop all processes we have to put STOP to queue and - #break the loop for each process - for _ in xrange(nr_processes): - q.put('STOP') - - - for _ in xrange(nr_processes): - p = Process(target=self.work_func, args=(q, l, idx, writer)) - p.start() - - - - def work_func(self, q, l, idx, writer): - """ worker class invoked by process """ - - - writer = idx.writer() - - while True: - q_task = q.get() - proc = current_process() - -# if q_task == 'COMMIT': -# l.acquire() -# sys.stdout.write('%s commiting and STOP\n' % proc._name) -# writer.commit(merge=False) -# l.release() -# break -# l.acquire() -# writer = idx.writer() -# l.release() - - if q_task == 'STOP': - sys.stdout.write('%s STOP\n' % proc._name) - break - - if q_task != 'COMMIT': - l.acquire() - - sys.stdout.write(' >> %s %s %s @ ' % q_task) - sys.stdout.write(' %s \n' % proc._name) - - l.release() - add_doc(writer, q_task[0], q_task[1], q_task[2]) - - l.acquire() - writer.commit(merge=True) - l.release() - - -if __name__ == "__main__": - #build queue - do = True if len(sys.argv) > 1 else False - q_tasks = [] - - if os.path.exists(idx_location): - rmtree(idx_location) - - if not os.path.exists(idx_location): - os.mkdir(idx_location) - - idx = create_in(idx_location, get_schema() , indexname='HG_INDEX') - - - if do: - sys.stdout.write('Building queue...') - for cnt, repo in enumerate(scan_paths(root_path).values()): - if repo.name != 'evoice_py': - continue - q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)]) - if cnt == 4: - break - - sys.stdout.write('done\n') - - mpi = MultiProcessIndexer(idx, q_tasks) - - - else: - print 'checking index' - reader = idx.reader() - all = reader.all_stored_fields() - #print all - for fields in all: - print fields['path'] -