comparison rhodecode/lib/indexers/daemon.py @ 547:1e757ac98988

renamed project to rhodecode
author Marcin Kuzminski <marcin@python-works.com>
date Wed, 06 Oct 2010 03:18:16 +0200
parents pylons_app/lib/indexers/daemon.py@fb0c3af6031b
children f99075170eb4
comparison
equal deleted inserted replaced
546:7c2f5e4d7bbf 547:1e757ac98988
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # whoosh indexer daemon for hg-app
4 # Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
5 #
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License
8 # as published by the Free Software Foundation; version 2
9 # of the License or (at your opinion) any later version of the license.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 # MA 02110-1301, USA.
20 """
21 Created on Jan 26, 2010
22
23 @author: marcink
24 A deamon will read from task table and run tasks
25 """
26 import sys
27 import os
28 from os.path import dirname as dn
29 from os.path import join as jn
30
31 #to get the rhodecode import
32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
33 sys.path.append(project_path)
34
35 from rhodecode.lib.pidlock import LockHeld, DaemonLock
36 from rhodecode.model.hg_model import HgModel
37 from rhodecode.lib.helpers import safe_unicode
38 from whoosh.index import create_in, open_dir
39 from shutil import rmtree
40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
41
42 import logging
43
44 log = logging.getLogger('whooshIndexer')
45 # create logger
46 log.setLevel(logging.DEBUG)
47 log.propagate = False
48 # create console handler and set level to debug
49 ch = logging.StreamHandler()
50 ch.setLevel(logging.DEBUG)
51
52 # create formatter
53 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
54
55 # add formatter to ch
56 ch.setFormatter(formatter)
57
58 # add ch to logger
59 log.addHandler(ch)
60
61 def scan_paths(root_location):
62 return HgModel.repo_scan('/', root_location, None, True)
63
64 class WhooshIndexingDaemon(object):
65 """Deamon for atomic jobs"""
66
67 def __init__(self, indexname='HG_INDEX', repo_location=None):
68 self.indexname = indexname
69 self.repo_location = repo_location
70 self.initial = False
71 if not os.path.isdir(IDX_LOCATION):
72 os.mkdir(IDX_LOCATION)
73 log.info('Cannot run incremental index since it does not'
74 ' yet exist running full build')
75 self.initial = True
76
77 def get_paths(self, root_dir):
78 """recursive walk in root dir and return a set of all path in that dir
79 excluding files in .hg dir"""
80 index_paths_ = set()
81 for path, dirs, files in os.walk(root_dir):
82 if path.find('.hg') == -1:
83 for f in files:
84 index_paths_.add(jn(path, f))
85
86 return index_paths_
87
88 def add_doc(self, writer, path, repo):
89 """Adding doc to writer"""
90
91 ext = unicode(path.split('/')[-1].split('.')[-1].lower())
92 #we just index the content of choosen files
93 if ext in INDEX_EXTENSIONS:
94 log.debug(' >> %s [WITH CONTENT]' % path)
95 fobj = open(path, 'rb')
96 content = fobj.read()
97 fobj.close()
98 u_content = safe_unicode(content)
99 else:
100 log.debug(' >> %s' % path)
101 #just index file name without it's content
102 u_content = u''
103
104
105
106 try:
107 os.stat(path)
108 writer.add_document(owner=unicode(repo.contact),
109 repository=u"%s" % repo.name,
110 path=u"%s" % path,
111 content=u_content,
112 modtime=os.path.getmtime(path),
113 extension=ext)
114 except OSError, e:
115 import errno
116 if e.errno == errno.ENOENT:
117 log.debug('path %s does not exist or is a broken symlink' % path)
118 else:
119 raise e
120
121
122 def build_index(self):
123 if os.path.exists(IDX_LOCATION):
124 log.debug('removing previos index')
125 rmtree(IDX_LOCATION)
126
127 if not os.path.exists(IDX_LOCATION):
128 os.mkdir(IDX_LOCATION)
129
130 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
131 writer = idx.writer()
132
133 for cnt, repo in enumerate(scan_paths(self.repo_location).values()):
134 log.debug('building index @ %s' % repo.path)
135
136 for idx_path in self.get_paths(repo.path):
137 self.add_doc(writer, idx_path, repo)
138 writer.commit(merge=True)
139
140 log.debug('>>> FINISHED BUILDING INDEX <<<')
141
142
143 def update_index(self):
144 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
145
146 idx = open_dir(IDX_LOCATION, indexname=self.indexname)
147 # The set of all paths in the index
148 indexed_paths = set()
149 # The set of all paths we need to re-index
150 to_index = set()
151
152 reader = idx.reader()
153 writer = idx.writer()
154
155 # Loop over the stored fields in the index
156 for fields in reader.all_stored_fields():
157 indexed_path = fields['path']
158 indexed_paths.add(indexed_path)
159
160 if not os.path.exists(indexed_path):
161 # This file was deleted since it was indexed
162 log.debug('removing from index %s' % indexed_path)
163 writer.delete_by_term('path', indexed_path)
164
165 else:
166 # Check if this file was changed since it
167 # was indexed
168 indexed_time = fields['modtime']
169
170 mtime = os.path.getmtime(indexed_path)
171
172 if mtime > indexed_time:
173
174 # The file has changed, delete it and add it to the list of
175 # files to reindex
176 log.debug('adding to reindex list %s' % indexed_path)
177 writer.delete_by_term('path', indexed_path)
178 to_index.add(indexed_path)
179 #writer.commit()
180
181 # Loop over the files in the filesystem
182 # Assume we have a function that gathers the filenames of the
183 # documents to be indexed
184 for repo in scan_paths(self.repo_location).values():
185 for path in self.get_paths(repo.path):
186 if path in to_index or path not in indexed_paths:
187 # This is either a file that's changed, or a new file
188 # that wasn't indexed before. So index it!
189 self.add_doc(writer, path, repo)
190 log.debug('reindexing %s' % path)
191
192 writer.commit(merge=True)
193 #idx.optimize()
194 log.debug('>>> FINISHED <<<')
195
196 def run(self, full_index=False):
197 """Run daemon"""
198 if full_index or self.initial:
199 self.build_index()
200 else:
201 self.update_index()
202
203 if __name__ == "__main__":
204 arg = sys.argv[1:]
205 if len(arg) != 2:
206 sys.stderr.write('Please specify indexing type [full|incremental]'
207 'and path to repositories as script args \n')
208 sys.exit()
209
210
211 if arg[0] == 'full':
212 full_index = True
213 elif arg[0] == 'incremental':
214 # False means looking just for changes
215 full_index = False
216 else:
217 sys.stdout.write('Please use [full|incremental]'
218 ' as script first arg \n')
219 sys.exit()
220
221 if not os.path.isdir(arg[1]):
222 sys.stderr.write('%s is not a valid path \n' % arg[1])
223 sys.exit()
224 else:
225 if arg[1].endswith('/'):
226 repo_location = arg[1] + '*'
227 else:
228 repo_location = arg[1] + '/*'
229
230 try:
231 l = DaemonLock()
232 WhooshIndexingDaemon(repo_location=repo_location)\
233 .run(full_index=full_index)
234 l.release()
235 reload(logging)
236 except LockHeld:
237 sys.exit(1)
238