Mercurial > kallithea
comparison pylons_app/lib/indexers/multiprocessing_indexer.py @ 406:b153a51b1d3b
Implemented search using whoosh. Still as experimental option.
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Tue, 17 Aug 2010 23:15:36 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
405:bec06654d67b | 406:b153a51b1d3b |
---|---|
1 from multiprocessing import Process, Queue, cpu_count, Lock | |
2 import socket, sys | |
3 import time | |
4 import os | |
5 import sys | |
6 from os.path import dirname as dn | |
7 from multiprocessing.dummy import current_process | |
8 from shutil import rmtree | |
9 | |
10 sys.path.append(dn(dn(dn(os.path.realpath(__file__))))) | |
11 | |
12 from pylons_app.model.hg_model import HgModel | |
13 from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter | |
14 from whoosh.fields import TEXT, ID, STORED, Schema | |
15 from whoosh.index import create_in, open_dir | |
16 from datetime import datetime | |
17 from multiprocessing.process import current_process | |
18 from multiprocessing import Array, Value | |
19 | |
20 root = dn(dn(os.path.dirname(os.path.abspath(__file__)))) | |
21 idx_location = os.path.join(root, 'data', 'index') | |
22 root_path = '/home/marcink/python_workspace_dirty/*' | |
23 | |
24 exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf', | |
25 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll'] | |
26 | |
27 my_analyzer = RegexTokenizer() | LowercaseFilter() | |
28 def scan_paths(root_location): | |
29 return HgModel.repo_scan('/', root_location, None, True) | |
30 | |
31 def index_paths(root_dir): | |
32 index_paths_ = set() | |
33 for path, dirs, files in os.walk(root_dir): | |
34 if path.find('.hg') == -1: | |
35 #if path.find('.hg') == -1 and path.find('bel-epa') != -1: | |
36 for f in files: | |
37 index_paths_.add(os.path.join(path, f)) | |
38 | |
39 return index_paths_ | |
40 | |
41 def get_schema(): | |
42 return Schema(owner=TEXT(), | |
43 repository=TEXT(stored=True), | |
44 path=ID(stored=True, unique=True), | |
45 content=TEXT(stored=True, analyzer=my_analyzer), | |
46 modtime=STORED()) | |
47 | |
48 def add_doc(writer, path, repo_name, contact): | |
49 """ | |
50 Adding doc to writer | |
51 @param writer: | |
52 @param path: | |
53 @param repo: | |
54 @param fname: | |
55 """ | |
56 | |
57 #we don't won't to read excluded file extensions just index them | |
58 if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions: | |
59 fobj = open(path, 'rb') | |
60 content = fobj.read() | |
61 fobj.close() | |
62 try: | |
63 u_content = unicode(content) | |
64 except UnicodeDecodeError: | |
65 #incase we have a decode error just represent as byte string | |
66 u_content = unicode(str(content).encode('string_escape')) | |
67 else: | |
68 u_content = u'' | |
69 writer.add_document(repository=u"%s" % repo_name, | |
70 owner=unicode(contact), | |
71 path=u"%s" % path, | |
72 content=u_content, | |
73 modtime=os.path.getmtime(path)) | |
74 | |
75 | |
76 class MultiProcessIndexer(object): | |
77 """ multiprocessing whoosh indexer """ | |
78 | |
79 def __init__(self, idx, work_set=set(), nr_processes=cpu_count()): | |
80 q = Queue() | |
81 l = Lock() | |
82 work_set = work_set | |
83 writer = None | |
84 #writer = idx.writer() | |
85 | |
86 for q_task in work_set: | |
87 q.put(q_task) | |
88 | |
89 q.put('COMMIT') | |
90 | |
91 #to stop all processes we have to put STOP to queue and | |
92 #break the loop for each process | |
93 for _ in xrange(nr_processes): | |
94 q.put('STOP') | |
95 | |
96 | |
97 for _ in xrange(nr_processes): | |
98 p = Process(target=self.work_func, args=(q, l, idx, writer)) | |
99 p.start() | |
100 | |
101 | |
102 | |
103 def work_func(self, q, l, idx, writer): | |
104 """ worker class invoked by process """ | |
105 | |
106 | |
107 writer = idx.writer() | |
108 | |
109 while True: | |
110 q_task = q.get() | |
111 proc = current_process() | |
112 | |
113 # if q_task == 'COMMIT': | |
114 # l.acquire() | |
115 # sys.stdout.write('%s commiting and STOP\n' % proc._name) | |
116 # writer.commit(merge=False) | |
117 # l.release() | |
118 # break | |
119 # l.acquire() | |
120 # writer = idx.writer() | |
121 # l.release() | |
122 | |
123 if q_task == 'STOP': | |
124 sys.stdout.write('%s STOP\n' % proc._name) | |
125 break | |
126 | |
127 if q_task != 'COMMIT': | |
128 l.acquire() | |
129 | |
130 sys.stdout.write(' >> %s %s %s @ ' % q_task) | |
131 sys.stdout.write(' %s \n' % proc._name) | |
132 | |
133 l.release() | |
134 add_doc(writer, q_task[0], q_task[1], q_task[2]) | |
135 | |
136 l.acquire() | |
137 writer.commit(merge=True) | |
138 l.release() | |
139 | |
140 | |
141 if __name__ == "__main__": | |
142 #build queue | |
143 do = True if len(sys.argv) > 1 else False | |
144 q_tasks = [] | |
145 | |
146 if os.path.exists(idx_location): | |
147 rmtree(idx_location) | |
148 | |
149 if not os.path.exists(idx_location): | |
150 os.mkdir(idx_location) | |
151 | |
152 idx = create_in(idx_location, get_schema() , indexname='HG_INDEX') | |
153 | |
154 | |
155 if do: | |
156 sys.stdout.write('Building queue...') | |
157 for cnt, repo in enumerate(scan_paths(root_path).values()): | |
158 if repo.name != 'evoice_py': | |
159 continue | |
160 q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)]) | |
161 if cnt == 4: | |
162 break | |
163 | |
164 sys.stdout.write('done\n') | |
165 | |
166 mpi = MultiProcessIndexer(idx, q_tasks) | |
167 | |
168 | |
169 else: | |
170 print 'checking index' | |
171 reader = idx.reader() | |
172 all = reader.all_stored_fields() | |
173 #print all | |
174 for fields in all: | |
175 print fields['path'] | |
176 |