changeset 560:3072935bdeed

rewrote whoosh indexing to run internal repository.walk() instead of filesystem. Disabled default hg update hook (not needed since whoosh is not dependent on file system files to index)
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 09 Oct 2010 00:22:19 +0200
parents bc4633a41967
children 5f3b967d9d10
files rhodecode/lib/db_manage.py rhodecode/lib/indexers/daemon.py rhodecode/lib/indexers/multiprocessing_indexer.py
diffstat 3 files changed, 35 insertions(+), 211 deletions(-) [+]
line wrap: on
line diff
--- a/rhodecode/lib/db_manage.py	Thu Oct 07 22:01:51 2010 +0200
+++ b/rhodecode/lib/db_manage.py	Sat Oct 09 00:22:19 2010 +0200
@@ -115,6 +115,7 @@
         hooks1.ui_section = 'hooks'
         hooks1.ui_key = 'changegroup.update'
         hooks1.ui_value = 'hg update >&2'
+        hooks1.ui_active = False
         
         hooks2 = RhodeCodeUi()
         hooks2.ui_section = 'hooks'
--- a/rhodecode/lib/indexers/daemon.py	Thu Oct 07 22:01:51 2010 +0200
+++ b/rhodecode/lib/indexers/daemon.py	Sat Oct 09 00:22:19 2010 +0200
@@ -39,6 +39,9 @@
 from shutil import rmtree
 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
 
+from time import mktime
+from vcs.backends import hg
+
 import logging
 
 log = logging.getLogger('whooshIndexer')
@@ -62,7 +65,9 @@
     return HgModel.repo_scan('/', root_location, None, True)
 
 class WhooshIndexingDaemon(object):
-    """Deamon for atomic jobs"""
+    """
+    Deamon for atomic jobs
+    """
 
     def __init__(self, indexname='HG_INDEX', repo_location=None):
         self.indexname = indexname
@@ -73,55 +78,49 @@
             log.info('Cannot run incremental index since it does not'
                      ' yet exist running full build')
             self.initial = True
-    
+        
     def get_paths(self, root_dir):
-        """recursive walk in root dir and return a set of all path in that dir
-        excluding files in .hg dir"""
+        """
+        recursive walk in root dir and return a set of all path in that dir
+        based on repository walk function
+        """
+        repo = hg.MercurialRepository(root_dir)
         index_paths_ = set()
-        for path, dirs, files in os.walk(root_dir):
-            if path.find('.hg') == -1:
+        for topnode, dirs, files in repo.walk('/', 'tip'):
+            for f in files:
+                index_paths_.add(jn(root_dir, f.path))
+            for dir in dirs:
                 for f in files:
-                    index_paths_.add(jn(path, f))
-    
-        return index_paths_
-    
+                    index_paths_.add(jn(root_dir, f.path))
+            
+        return index_paths_        
+
+
     def add_doc(self, writer, path, repo):
         """Adding doc to writer"""
-        
-        ext = unicode(path.split('/')[-1].split('.')[-1].lower())
-        #we just index the content of choosen files
-        if ext in INDEX_EXTENSIONS:
+        n_path = path[len(repo.path) + 1:]
+        node = repo.get_changeset().get_node(n_path)
+
+        #we just index the content of chosen files
+        if node.extension in INDEX_EXTENSIONS:
             log.debug('    >> %s [WITH CONTENT]' % path)
-            fobj = open(path, 'rb')
-            content = fobj.read()
-            fobj.close()
-            u_content = safe_unicode(content)
+            u_content = node.content
         else:
             log.debug('    >> %s' % path)
             #just index file name without it's content
             u_content = u''
         
-        
-        
-        try:
-            os.stat(path)
-            writer.add_document(owner=unicode(repo.contact),
-                            repository=safe_unicode(repo.name),
-                            path=safe_unicode(path),
-                            content=u_content,
-                            modtime=os.path.getmtime(path),
-                            extension=ext)             
-        except OSError, e:
-            import errno
-            if e.errno == errno.ENOENT:
-                log.debug('path %s does not exist or is a broken symlink' % path)
-            else:
-                raise e                 
+        writer.add_document(owner=unicode(repo.contact),
+                        repository=safe_unicode(repo.name),
+                        path=safe_unicode(path),
+                        content=u_content,
+                        modtime=mktime(node.last_changeset.date.timetuple()),
+                        extension=node.extension)             
 
     
     def build_index(self):
         if os.path.exists(IDX_LOCATION):
-            log.debug('removing previos index')
+            log.debug('removing previous index')
             rmtree(IDX_LOCATION)
             
         if not os.path.exists(IDX_LOCATION):
--- a/rhodecode/lib/indexers/multiprocessing_indexer.py	Thu Oct 07 22:01:51 2010 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,176 +0,0 @@
-from multiprocessing import Process, Queue, cpu_count, Lock
-import socket, sys
-import time
-import os
-import sys
-from os.path import dirname as dn
-from multiprocessing.dummy import current_process
-from shutil import rmtree
-
-sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
-
-from rhodecode.model.hg_model import HgModel
-from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
-from whoosh.fields import TEXT, ID, STORED, Schema
-from whoosh.index import create_in, open_dir
-from datetime import datetime
-from multiprocessing.process import current_process
-from multiprocessing import Array, Value
-
-root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
-idx_location = os.path.join(root, 'data', 'index')
-root_path = '/home/marcink/python_workspace_dirty/*'
-
-exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
-                       'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
-
-my_analyzer = RegexTokenizer() | LowercaseFilter()
-def scan_paths(root_location):
-    return HgModel.repo_scan('/', root_location, None, True)
-
-def index_paths(root_dir):
-    index_paths_ = set()
-    for path, dirs, files in os.walk(root_dir):
-        if path.find('.hg') == -1:
-        #if path.find('.hg') == -1 and path.find('bel-epa') != -1:    
-            for f in files:
-                index_paths_.add(os.path.join(path, f))
-
-    return index_paths_
-                    
-def get_schema():
-    return Schema(owner=TEXT(),
-                repository=TEXT(stored=True),
-                path=ID(stored=True, unique=True),
-                content=TEXT(stored=True, analyzer=my_analyzer),
-                modtime=STORED())
-
-def add_doc(writer, path, repo_name, contact):
-    """
-    Adding doc to writer
-    @param writer:
-    @param path:
-    @param repo:
-    @param fname:
-    """
-    
-    #we don't won't to read excluded file extensions just index them
-    if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
-        fobj = open(path, 'rb')
-        content = fobj.read()
-        fobj.close()
-        try:
-            u_content = unicode(content)
-        except UnicodeDecodeError:
-            #incase we have a decode error just represent as byte string
-            u_content = unicode(str(content).encode('string_escape'))
-    else:
-        u_content = u''    
-    writer.add_document(repository=u"%s" % repo_name,
-                        owner=unicode(contact),
-                        path=u"%s" % path,
-                        content=u_content,
-                        modtime=os.path.getmtime(path)) 
-
-
-class MultiProcessIndexer(object):
-    """ multiprocessing whoosh indexer """
-
-    def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
-        q = Queue()
-        l = Lock()
-        work_set = work_set
-        writer = None
-        #writer = idx.writer()
-        
-        for q_task in work_set:
-            q.put(q_task)
-
-        q.put('COMMIT')
-        
-        #to stop all processes we have to put STOP to queue and 
-        #break the loop for each process
-        for _ in xrange(nr_processes):
-            q.put('STOP')
-
-        
-        for _ in xrange(nr_processes):
-            p = Process(target=self.work_func, args=(q, l, idx, writer))
-            p.start()
-        
-        
-
-    def work_func(self, q, l, idx, writer):
-        """ worker class invoked by process """
-        
-
-        writer = idx.writer()
-
-        while True:
-            q_task = q.get()
-            proc = current_process()
-            
-#            if q_task == 'COMMIT':
-#                l.acquire()
-#                sys.stdout.write('%s commiting and STOP\n' % proc._name)
-#                writer.commit(merge=False)
-#                l.release()               
-#                break
-#            l.acquire()
-#            writer = idx.writer()
-#            l.release() 
-                        
-            if q_task == 'STOP':
-                sys.stdout.write('%s STOP\n' % proc._name)  
-                break
-            
-            if q_task != 'COMMIT':
-                l.acquire()
-                
-                sys.stdout.write('    >> %s %s %s @ ' % q_task)
-                sys.stdout.write(' %s \n' % proc._name)
-                
-                l.release()
-                add_doc(writer, q_task[0], q_task[1], q_task[2])
-                
-            l.acquire()
-            writer.commit(merge=True)
-            l.release()
-            
-
-if __name__ == "__main__":
-    #build queue
-    do = True if len(sys.argv) > 1 else False
-    q_tasks = []
-    
-    if os.path.exists(idx_location):
-        rmtree(idx_location)
-        
-    if not os.path.exists(idx_location):
-        os.mkdir(idx_location)
-                    
-    idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')    
-    
-    
-    if do:
-        sys.stdout.write('Building queue...')
-        for cnt, repo in enumerate(scan_paths(root_path).values()):
-            if repo.name != 'evoice_py':
-                continue            
-            q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
-            if cnt == 4:
-                break
-            
-        sys.stdout.write('done\n')
-        
-        mpi = MultiProcessIndexer(idx, q_tasks)
-
-    
-    else:
-        print 'checking index'
-        reader = idx.reader()
-        all = reader.all_stored_fields()
-        #print all
-        for fields in all:
-            print fields['path']
-