comparison pylons_app/lib/indexers/multiprocessing_indexer.py @ 406:b153a51b1d3b

Implemented search using whoosh. Still as experimental option.
author Marcin Kuzminski <marcin@python-works.com>
date Tue, 17 Aug 2010 23:15:36 +0200
parents
children
comparison
equal deleted inserted replaced
405:bec06654d67b 406:b153a51b1d3b
1 from multiprocessing import Process, Queue, cpu_count, Lock
2 import socket, sys
3 import time
4 import os
5 import sys
6 from os.path import dirname as dn
7 from multiprocessing.dummy import current_process
8 from shutil import rmtree
9
10 sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
11
12 from pylons_app.model.hg_model import HgModel
13 from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
14 from whoosh.fields import TEXT, ID, STORED, Schema
15 from whoosh.index import create_in, open_dir
16 from datetime import datetime
17 from multiprocessing.process import current_process
18 from multiprocessing import Array, Value
19
20 root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
21 idx_location = os.path.join(root, 'data', 'index')
22 root_path = '/home/marcink/python_workspace_dirty/*'
23
24 exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
25 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
26
27 my_analyzer = RegexTokenizer() | LowercaseFilter()
28 def scan_paths(root_location):
29 return HgModel.repo_scan('/', root_location, None, True)
30
31 def index_paths(root_dir):
32 index_paths_ = set()
33 for path, dirs, files in os.walk(root_dir):
34 if path.find('.hg') == -1:
35 #if path.find('.hg') == -1 and path.find('bel-epa') != -1:
36 for f in files:
37 index_paths_.add(os.path.join(path, f))
38
39 return index_paths_
40
41 def get_schema():
42 return Schema(owner=TEXT(),
43 repository=TEXT(stored=True),
44 path=ID(stored=True, unique=True),
45 content=TEXT(stored=True, analyzer=my_analyzer),
46 modtime=STORED())
47
48 def add_doc(writer, path, repo_name, contact):
49 """
50 Adding doc to writer
51 @param writer:
52 @param path:
53 @param repo:
54 @param fname:
55 """
56
57 #we don't won't to read excluded file extensions just index them
58 if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
59 fobj = open(path, 'rb')
60 content = fobj.read()
61 fobj.close()
62 try:
63 u_content = unicode(content)
64 except UnicodeDecodeError:
65 #incase we have a decode error just represent as byte string
66 u_content = unicode(str(content).encode('string_escape'))
67 else:
68 u_content = u''
69 writer.add_document(repository=u"%s" % repo_name,
70 owner=unicode(contact),
71 path=u"%s" % path,
72 content=u_content,
73 modtime=os.path.getmtime(path))
74
75
76 class MultiProcessIndexer(object):
77 """ multiprocessing whoosh indexer """
78
79 def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
80 q = Queue()
81 l = Lock()
82 work_set = work_set
83 writer = None
84 #writer = idx.writer()
85
86 for q_task in work_set:
87 q.put(q_task)
88
89 q.put('COMMIT')
90
91 #to stop all processes we have to put STOP to queue and
92 #break the loop for each process
93 for _ in xrange(nr_processes):
94 q.put('STOP')
95
96
97 for _ in xrange(nr_processes):
98 p = Process(target=self.work_func, args=(q, l, idx, writer))
99 p.start()
100
101
102
103 def work_func(self, q, l, idx, writer):
104 """ worker class invoked by process """
105
106
107 writer = idx.writer()
108
109 while True:
110 q_task = q.get()
111 proc = current_process()
112
113 # if q_task == 'COMMIT':
114 # l.acquire()
115 # sys.stdout.write('%s commiting and STOP\n' % proc._name)
116 # writer.commit(merge=False)
117 # l.release()
118 # break
119 # l.acquire()
120 # writer = idx.writer()
121 # l.release()
122
123 if q_task == 'STOP':
124 sys.stdout.write('%s STOP\n' % proc._name)
125 break
126
127 if q_task != 'COMMIT':
128 l.acquire()
129
130 sys.stdout.write(' >> %s %s %s @ ' % q_task)
131 sys.stdout.write(' %s \n' % proc._name)
132
133 l.release()
134 add_doc(writer, q_task[0], q_task[1], q_task[2])
135
136 l.acquire()
137 writer.commit(merge=True)
138 l.release()
139
140
141 if __name__ == "__main__":
142 #build queue
143 do = True if len(sys.argv) > 1 else False
144 q_tasks = []
145
146 if os.path.exists(idx_location):
147 rmtree(idx_location)
148
149 if not os.path.exists(idx_location):
150 os.mkdir(idx_location)
151
152 idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
153
154
155 if do:
156 sys.stdout.write('Building queue...')
157 for cnt, repo in enumerate(scan_paths(root_path).values()):
158 if repo.name != 'evoice_py':
159 continue
160 q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
161 if cnt == 4:
162 break
163
164 sys.stdout.write('done\n')
165
166 mpi = MultiProcessIndexer(idx, q_tasks)
167
168
169 else:
170 print 'checking index'
171 reader = idx.reader()
172 all = reader.all_stored_fields()
173 #print all
174 for fields in all:
175 print fields['path']
176