diff rhodecode/lib/indexers/daemon.py @ 547:1e757ac98988

renamed project to rhodecode
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 06 Oct 2010 03:18:16 +0200
parents pylons_app/lib/indexers/daemon.py@fb0c3af6031b
children f99075170eb4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rhodecode/lib/indexers/daemon.py	Wed Oct 06 03:18:16 2010 +0200
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+# encoding: utf-8
+# whoosh indexer daemon for hg-app
+# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; version 2
+# of the License or (at your opinion) any later version of the license.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA  02110-1301, USA.
+"""
+Created on Jan 26, 2010
+
+@author: marcink
+A deamon will read from task table and run tasks
+"""
+import sys
+import os
+from os.path import dirname as dn
+from os.path import join as jn
+
+#to get the rhodecode import
+project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
+sys.path.append(project_path)
+
+from rhodecode.lib.pidlock import LockHeld, DaemonLock
+from rhodecode.model.hg_model import HgModel
+from rhodecode.lib.helpers import safe_unicode
+from whoosh.index import create_in, open_dir
+from shutil import rmtree
+from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
+
+import logging
+
+log = logging.getLogger('whooshIndexer')
+# create logger
+log.setLevel(logging.DEBUG)
+log.propagate = False
+# create console handler and set level to debug
+ch = logging.StreamHandler()
+ch.setLevel(logging.DEBUG)
+
+# create formatter
+formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+
+# add formatter to ch
+ch.setFormatter(formatter)
+
+# add ch to logger
+log.addHandler(ch)
+
+def scan_paths(root_location):
+    return HgModel.repo_scan('/', root_location, None, True)
+
+class WhooshIndexingDaemon(object):
+    """Deamon for atomic jobs"""
+
+    def __init__(self, indexname='HG_INDEX', repo_location=None):
+        self.indexname = indexname
+        self.repo_location = repo_location
+        self.initial = False
+        if not os.path.isdir(IDX_LOCATION):
+            os.mkdir(IDX_LOCATION)
+            log.info('Cannot run incremental index since it does not'
+                     ' yet exist running full build')
+            self.initial = True
+    
+    def get_paths(self, root_dir):
+        """recursive walk in root dir and return a set of all path in that dir
+        excluding files in .hg dir"""
+        index_paths_ = set()
+        for path, dirs, files in os.walk(root_dir):
+            if path.find('.hg') == -1:
+                for f in files:
+                    index_paths_.add(jn(path, f))
+    
+        return index_paths_
+    
+    def add_doc(self, writer, path, repo):
+        """Adding doc to writer"""
+        
+        ext = unicode(path.split('/')[-1].split('.')[-1].lower())
+        #we just index the content of choosen files
+        if ext in INDEX_EXTENSIONS:
+            log.debug('    >> %s [WITH CONTENT]' % path)
+            fobj = open(path, 'rb')
+            content = fobj.read()
+            fobj.close()
+            u_content = safe_unicode(content)
+        else:
+            log.debug('    >> %s' % path)
+            #just index file name without it's content
+            u_content = u''
+        
+        
+        
+        try:
+            os.stat(path)
+            writer.add_document(owner=unicode(repo.contact),
+                            repository=u"%s" % repo.name,
+                            path=u"%s" % path,
+                            content=u_content,
+                            modtime=os.path.getmtime(path),
+                            extension=ext)             
+        except OSError, e:
+            import errno
+            if e.errno == errno.ENOENT:
+                log.debug('path %s does not exist or is a broken symlink' % path)
+            else:
+                raise e                 
+
+    
+    def build_index(self):
+        if os.path.exists(IDX_LOCATION):
+            log.debug('removing previos index')
+            rmtree(IDX_LOCATION)
+            
+        if not os.path.exists(IDX_LOCATION):
+            os.mkdir(IDX_LOCATION)
+        
+        idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
+        writer = idx.writer()
+        
+        for cnt, repo in enumerate(scan_paths(self.repo_location).values()):
+            log.debug('building index @ %s' % repo.path)
+        
+            for idx_path in self.get_paths(repo.path):
+                self.add_doc(writer, idx_path, repo)
+        writer.commit(merge=True)
+                
+        log.debug('>>> FINISHED BUILDING INDEX <<<')
+            
+    
+    def update_index(self):
+        log.debug('STARTING INCREMENTAL INDEXING UPDATE')
+            
+        idx = open_dir(IDX_LOCATION, indexname=self.indexname)
+        # The set of all paths in the index
+        indexed_paths = set()
+        # The set of all paths we need to re-index
+        to_index = set()
+        
+        reader = idx.reader()
+        writer = idx.writer()
+    
+        # Loop over the stored fields in the index
+        for fields in reader.all_stored_fields():
+            indexed_path = fields['path']
+            indexed_paths.add(indexed_path)
+    
+            if not os.path.exists(indexed_path):
+                # This file was deleted since it was indexed
+                log.debug('removing from index %s' % indexed_path)
+                writer.delete_by_term('path', indexed_path)
+    
+            else:
+                # Check if this file was changed since it
+                # was indexed
+                indexed_time = fields['modtime']
+                
+                mtime = os.path.getmtime(indexed_path)
+    
+                if mtime > indexed_time:
+    
+                    # The file has changed, delete it and add it to the list of
+                    # files to reindex
+                    log.debug('adding to reindex list %s' % indexed_path)
+                    writer.delete_by_term('path', indexed_path)
+                    to_index.add(indexed_path)
+                    #writer.commit()
+    
+        # Loop over the files in the filesystem
+        # Assume we have a function that gathers the filenames of the
+        # documents to be indexed
+        for repo in scan_paths(self.repo_location).values():
+            for path in self.get_paths(repo.path):
+                if path in to_index or path not in indexed_paths:
+                    # This is either a file that's changed, or a new file
+                    # that wasn't indexed before. So index it!
+                    self.add_doc(writer, path, repo)
+                    log.debug('reindexing %s' % path)
+    
+        writer.commit(merge=True)
+        #idx.optimize()
+        log.debug('>>> FINISHED <<<')
+        
+    def run(self, full_index=False):
+        """Run daemon"""
+        if full_index or self.initial:
+            self.build_index()
+        else:
+            self.update_index()
+        
+if __name__ == "__main__":
+    arg = sys.argv[1:]
+    if len(arg) != 2:
+        sys.stderr.write('Please specify indexing type [full|incremental]' 
+                         'and path to repositories as script args \n')
+        sys.exit()
+    
+    
+    if arg[0] == 'full':
+        full_index = True
+    elif arg[0] == 'incremental':
+        # False means looking just for changes
+        full_index = False
+    else:
+        sys.stdout.write('Please use [full|incremental]' 
+                         ' as script first arg \n')
+        sys.exit()
+    
+    if not os.path.isdir(arg[1]):
+        sys.stderr.write('%s is not a valid path \n' % arg[1])
+        sys.exit()
+    else:
+        if arg[1].endswith('/'):
+            repo_location = arg[1] + '*'
+        else:
+            repo_location = arg[1] + '/*'
+    
+    try:
+        l = DaemonLock()
+        WhooshIndexingDaemon(repo_location=repo_location)\
+            .run(full_index=full_index)
+        l.release()
+        reload(logging)
+    except LockHeld:
+        sys.exit(1)
+