comparison rhodecode/lib/indexers/daemon.py @ 561:5f3b967d9d10

fixed reindexing, and made some optimizations to reuse repo instances from repo scann list.
author Marcin Kuzminski <marcin@python-works.com>
date Sat, 09 Oct 2010 01:11:44 +0200
parents 3072935bdeed
children 80dc0a23edf7
comparison
equal deleted inserted replaced
560:3072935bdeed 561:5f3b967d9d10
38 from whoosh.index import create_in, open_dir 38 from whoosh.index import create_in, open_dir
39 from shutil import rmtree 39 from shutil import rmtree
40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME 40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
41 41
42 from time import mktime 42 from time import mktime
43 from vcs.backends import hg 43 from vcs.exceptions import ChangesetError
44 44
45 import logging 45 import logging
46 46
47 log = logging.getLogger('whooshIndexer') 47 log = logging.getLogger('whooshIndexer')
48 # create logger 48 # create logger
70 """ 70 """
71 71
72 def __init__(self, indexname='HG_INDEX', repo_location=None): 72 def __init__(self, indexname='HG_INDEX', repo_location=None):
73 self.indexname = indexname 73 self.indexname = indexname
74 self.repo_location = repo_location 74 self.repo_location = repo_location
75 self.repo_paths = scan_paths(self.repo_location)
75 self.initial = False 76 self.initial = False
76 if not os.path.isdir(IDX_LOCATION): 77 if not os.path.isdir(IDX_LOCATION):
77 os.mkdir(IDX_LOCATION) 78 os.mkdir(IDX_LOCATION)
78 log.info('Cannot run incremental index since it does not' 79 log.info('Cannot run incremental index since it does not'
79 ' yet exist running full build') 80 ' yet exist running full build')
80 self.initial = True 81 self.initial = True
81 82
82 def get_paths(self, root_dir): 83 def get_paths(self, repo):
83 """ 84 """
84 recursive walk in root dir and return a set of all path in that dir 85 recursive walk in root dir and return a set of all path in that dir
85 based on repository walk function 86 based on repository walk function
86 """ 87 """
87 repo = hg.MercurialRepository(root_dir)
88 index_paths_ = set() 88 index_paths_ = set()
89 for topnode, dirs, files in repo.walk('/', 'tip'): 89 for topnode, dirs, files in repo.walk('/', 'tip'):
90 for f in files: 90 for f in files:
91 index_paths_.add(jn(root_dir, f.path)) 91 index_paths_.add(jn(repo.path, f.path))
92 for dir in dirs: 92 for dir in dirs:
93 for f in files: 93 for f in files:
94 index_paths_.add(jn(root_dir, f.path)) 94 index_paths_.add(jn(repo.path, f.path))
95 95
96 return index_paths_ 96 return index_paths_
97 97
98 98 def get_node(self, repo, path):
99 n_path = path[len(repo.path) + 1:]
100 node = repo.get_changeset().get_node(n_path)
101 return node
102
103 def get_node_mtime(self, node):
104 return mktime(node.last_changeset.date.timetuple())
105
99 def add_doc(self, writer, path, repo): 106 def add_doc(self, writer, path, repo):
100 """Adding doc to writer""" 107 """Adding doc to writer"""
101 n_path = path[len(repo.path) + 1:] 108 node = self.get_node(repo, path)
102 node = repo.get_changeset().get_node(n_path)
103 109
104 #we just index the content of chosen files 110 #we just index the content of chosen files
105 if node.extension in INDEX_EXTENSIONS: 111 if node.extension in INDEX_EXTENSIONS:
106 log.debug(' >> %s [WITH CONTENT]' % path) 112 log.debug(' >> %s [WITH CONTENT]' % path)
107 u_content = node.content 113 u_content = node.content
112 118
113 writer.add_document(owner=unicode(repo.contact), 119 writer.add_document(owner=unicode(repo.contact),
114 repository=safe_unicode(repo.name), 120 repository=safe_unicode(repo.name),
115 path=safe_unicode(path), 121 path=safe_unicode(path),
116 content=u_content, 122 content=u_content,
117 modtime=mktime(node.last_changeset.date.timetuple()), 123 modtime=self.get_node_mtime(node),
118 extension=node.extension) 124 extension=node.extension)
119 125
120 126
121 def build_index(self): 127 def build_index(self):
122 if os.path.exists(IDX_LOCATION): 128 if os.path.exists(IDX_LOCATION):
127 os.mkdir(IDX_LOCATION) 133 os.mkdir(IDX_LOCATION)
128 134
129 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME) 135 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
130 writer = idx.writer() 136 writer = idx.writer()
131 137
132 for cnt, repo in enumerate(scan_paths(self.repo_location).values()): 138 for cnt, repo in enumerate(self.repo_paths.values()):
133 log.debug('building index @ %s' % repo.path) 139 log.debug('building index @ %s' % repo.path)
134 140
135 for idx_path in self.get_paths(repo.path): 141 for idx_path in self.get_paths(repo):
136 self.add_doc(writer, idx_path, repo) 142 self.add_doc(writer, idx_path, repo)
143
144 log.debug('>> COMMITING CHANGES <<')
137 writer.commit(merge=True) 145 writer.commit(merge=True)
138
139 log.debug('>>> FINISHED BUILDING INDEX <<<') 146 log.debug('>>> FINISHED BUILDING INDEX <<<')
140 147
141 148
142 def update_index(self): 149 def update_index(self):
143 log.debug('STARTING INCREMENTAL INDEXING UPDATE') 150 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
153 160
154 # Loop over the stored fields in the index 161 # Loop over the stored fields in the index
155 for fields in reader.all_stored_fields(): 162 for fields in reader.all_stored_fields():
156 indexed_path = fields['path'] 163 indexed_path = fields['path']
157 indexed_paths.add(indexed_path) 164 indexed_paths.add(indexed_path)
158 165
159 if not os.path.exists(indexed_path): 166 repo = self.repo_paths[fields['repository']]
167
168 try:
169 node = self.get_node(repo, indexed_path)
170 except ChangesetError:
160 # This file was deleted since it was indexed 171 # This file was deleted since it was indexed
161 log.debug('removing from index %s' % indexed_path) 172 log.debug('removing from index %s' % indexed_path)
162 writer.delete_by_term('path', indexed_path) 173 writer.delete_by_term('path', indexed_path)
163 174
164 else: 175 else:
165 # Check if this file was changed since it 176 # Check if this file was changed since it was indexed
166 # was indexed
167 indexed_time = fields['modtime'] 177 indexed_time = fields['modtime']
168 178 mtime = self.get_node_mtime(node)
169 mtime = os.path.getmtime(indexed_path)
170
171 if mtime > indexed_time: 179 if mtime > indexed_time:
172
173 # The file has changed, delete it and add it to the list of 180 # The file has changed, delete it and add it to the list of
174 # files to reindex 181 # files to reindex
175 log.debug('adding to reindex list %s' % indexed_path) 182 log.debug('adding to reindex list %s' % indexed_path)
176 writer.delete_by_term('path', indexed_path) 183 writer.delete_by_term('path', indexed_path)
177 to_index.add(indexed_path) 184 to_index.add(indexed_path)
178 #writer.commit()
179 185
180 # Loop over the files in the filesystem 186 # Loop over the files in the filesystem
181 # Assume we have a function that gathers the filenames of the 187 # Assume we have a function that gathers the filenames of the
182 # documents to be indexed 188 # documents to be indexed
183 for repo in scan_paths(self.repo_location).values(): 189 for repo in self.repo_paths.values():
184 for path in self.get_paths(repo.path): 190 for path in self.get_paths(repo):
185 if path in to_index or path not in indexed_paths: 191 if path in to_index or path not in indexed_paths:
186 # This is either a file that's changed, or a new file 192 # This is either a file that's changed, or a new file
187 # that wasn't indexed before. So index it! 193 # that wasn't indexed before. So index it!
188 self.add_doc(writer, path, repo) 194 self.add_doc(writer, path, repo)
189 log.debug('reindexing %s' % path) 195 log.debug('re indexing %s' % path)
190 196
197 log.debug('>> COMMITING CHANGES <<')
191 writer.commit(merge=True) 198 writer.commit(merge=True)
192 #idx.optimize() 199 log.debug('>>> FINISHED REBUILDING INDEX <<<')
193 log.debug('>>> FINISHED <<<')
194 200
195 def run(self, full_index=False): 201 def run(self, full_index=False):
196 """Run daemon""" 202 """Run daemon"""
197 if full_index or self.initial: 203 if full_index or self.initial:
198 self.build_index() 204 self.build_index()