Mercurial > kallithea
view rhodecode/controllers/admin/admin.py @ 3062:a08624dd675e beta
Implemented filtering of admin journal based on Whoosh Query language
ref #210
author | Marcin Kuzminski <marcin@python-works.com> |
---|---|
date | Wed, 05 Dec 2012 21:14:31 +0100 |
parents | 6b176c679896 |
children | ca2b21819dfd |
line wrap: on
line source
# -*- coding: utf-8 -*- """ rhodecode.controllers.admin.admin ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Controller for Admin panel of Rhodecode :created_on: Apr 7, 2010 :author: marcink :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging from pylons import request, tmpl_context as c, url from sqlalchemy.orm import joinedload from webhelpers.paginate import Page from whoosh.qparser.default import QueryParser from whoosh import query from sqlalchemy.sql.expression import or_ from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator from rhodecode.lib.base import BaseController, render from rhodecode.model.db import UserLog, User from rhodecode.lib.utils2 import safe_int, remove_prefix from rhodecode.lib.indexers import JOURNAL_SCHEMA log = logging.getLogger(__name__) def _filter(user_log, search_term): """ Filters sqlalchemy user_log based on search_term with whoosh Query language http://packages.python.org/Whoosh/querylang.html :param user_log: :param search_term: """ qry = None if search_term: qp = QueryParser('repository', schema=JOURNAL_SCHEMA) qry = qp.parse(unicode(search_term)) log.debug('Filtering using query %r' % qry) def get_filterion(field, val, term): if field == 'repository': field = getattr(UserLog, 'repository_name') elif field == 'ip': field = getattr(UserLog, 'user_ip') elif field == 'date': field = getattr(UserLog, 'action_date') elif field == 'username': ##special case for username if isinstance(term, query.Wildcard): #only support wildcards with * at beggining val = remove_prefix(val, prefix='*') return getattr(UserLog, 'user_id').in_( [x.user_id for x in User.query().filter(User.username.endswith(val))]) elif isinstance(term, query.Prefix): return getattr(UserLog, 'user_id').in_( [x.user_id for x in User.query().filter(User.username.startswith(val))]) # term == exact match, case insensitive field = getattr(UserLog, 'user') val = User.get_by_username(val, case_insensitive=True) else: field = getattr(UserLog, field) #sql filtering if isinstance(term, query.Wildcard): return field.endsswith(val) elif isinstance(term, query.Prefix): return field.startswith(val) return field == val if isinstance(qry, (query.And, query.Term, query.Prefix, query.Wildcard)): if not isinstance(qry, query.And): qry = [qry] for term in qry: field = term.fieldname val = term.text user_log = user_log.filter(get_filterion(field, val, term)) elif isinstance(qry, query.Or): filters = [] for term in qry: field = term.fieldname val = term.text if isinstance(term, query.Term): filters.append(get_filterion(field, val, term)) user_log = user_log.filter(or_(*filters)) return user_log class AdminController(BaseController): @LoginRequired() def __before__(self): super(AdminController, self).__before__() @HasPermissionAllDecorator('hg.admin') def index(self): users_log = UserLog.query()\ .options(joinedload(UserLog.user))\ .options(joinedload(UserLog.repository)) #FILTERING c.search_term = request.GET.get('filter') try: users_log = _filter(users_log, c.search_term) except: # we want this to crash for now raise users_log = users_log.order_by(UserLog.action_date.desc()) p = safe_int(request.params.get('page', 1), 1) def url_generator(**kw): return url.current(filter=c.search_term, **kw) c.users_log = Page(users_log, page=p, items_per_page=10, url=url_generator) c.log_data = render('admin/admin_log.html') if request.environ.get('HTTP_X_PARTIAL_XHR'): return c.log_data return render('admin/admin.html')