Mercurial > kallithea
view pylons_app/lib/indexers/multiprocessing_indexer.py @ 483:a9e50dce3081 celery
Removed config names from whoosh and celery,
celery is now configured based on the config name it's using
on celeryconfig. And whoosh uses it's own logger configured just for whoosh
Test creates a fresh whoosh index now, for more accurate checks
fixed tests for searching
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Fri, 17 Sep 2010 22:54:30 +0200 |
parents | b153a51b1d3b |
children |
line wrap: on
line source
from multiprocessing import Process, Queue, cpu_count, Lock import socket, sys import time import os import sys from os.path import dirname as dn from multiprocessing.dummy import current_process from shutil import rmtree sys.path.append(dn(dn(dn(os.path.realpath(__file__))))) from pylons_app.model.hg_model import HgModel from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter from whoosh.fields import TEXT, ID, STORED, Schema from whoosh.index import create_in, open_dir from datetime import datetime from multiprocessing.process import current_process from multiprocessing import Array, Value root = dn(dn(os.path.dirname(os.path.abspath(__file__)))) idx_location = os.path.join(root, 'data', 'index') root_path = '/home/marcink/python_workspace_dirty/*' exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf', 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll'] my_analyzer = RegexTokenizer() | LowercaseFilter() def scan_paths(root_location): return HgModel.repo_scan('/', root_location, None, True) def index_paths(root_dir): index_paths_ = set() for path, dirs, files in os.walk(root_dir): if path.find('.hg') == -1: #if path.find('.hg') == -1 and path.find('bel-epa') != -1: for f in files: index_paths_.add(os.path.join(path, f)) return index_paths_ def get_schema(): return Schema(owner=TEXT(), repository=TEXT(stored=True), path=ID(stored=True, unique=True), content=TEXT(stored=True, analyzer=my_analyzer), modtime=STORED()) def add_doc(writer, path, repo_name, contact): """ Adding doc to writer @param writer: @param path: @param repo: @param fname: """ #we don't won't to read excluded file extensions just index them if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions: fobj = open(path, 'rb') content = fobj.read() fobj.close() try: u_content = unicode(content) except UnicodeDecodeError: #incase we have a decode error just represent as byte string u_content = unicode(str(content).encode('string_escape')) else: u_content = u'' writer.add_document(repository=u"%s" % repo_name, owner=unicode(contact), path=u"%s" % path, content=u_content, modtime=os.path.getmtime(path)) class MultiProcessIndexer(object): """ multiprocessing whoosh indexer """ def __init__(self, idx, work_set=set(), nr_processes=cpu_count()): q = Queue() l = Lock() work_set = work_set writer = None #writer = idx.writer() for q_task in work_set: q.put(q_task) q.put('COMMIT') #to stop all processes we have to put STOP to queue and #break the loop for each process for _ in xrange(nr_processes): q.put('STOP') for _ in xrange(nr_processes): p = Process(target=self.work_func, args=(q, l, idx, writer)) p.start() def work_func(self, q, l, idx, writer): """ worker class invoked by process """ writer = idx.writer() while True: q_task = q.get() proc = current_process() # if q_task == 'COMMIT': # l.acquire() # sys.stdout.write('%s commiting and STOP\n' % proc._name) # writer.commit(merge=False) # l.release() # break # l.acquire() # writer = idx.writer() # l.release() if q_task == 'STOP': sys.stdout.write('%s STOP\n' % proc._name) break if q_task != 'COMMIT': l.acquire() sys.stdout.write(' >> %s %s %s @ ' % q_task) sys.stdout.write(' %s \n' % proc._name) l.release() add_doc(writer, q_task[0], q_task[1], q_task[2]) l.acquire() writer.commit(merge=True) l.release() if __name__ == "__main__": #build queue do = True if len(sys.argv) > 1 else False q_tasks = [] if os.path.exists(idx_location): rmtree(idx_location) if not os.path.exists(idx_location): os.mkdir(idx_location) idx = create_in(idx_location, get_schema() , indexname='HG_INDEX') if do: sys.stdout.write('Building queue...') for cnt, repo in enumerate(scan_paths(root_path).values()): if repo.name != 'evoice_py': continue q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)]) if cnt == 4: break sys.stdout.write('done\n') mpi = MultiProcessIndexer(idx, q_tasks) else: print 'checking index' reader = idx.reader() all = reader.all_stored_fields() #print all for fields in all: print fields['path']