Mercurial > kallithea
view kallithea/controllers/admin/admin.py @ 6864:7691290837d2
codingstyle: trivial whitespace fixes
Reported by flake8.
author | Lars Kruse <devel@sumpfralle.de> |
---|---|
date | Fri, 25 Aug 2017 14:32:50 +0200 |
parents | 4517e212f09a |
children | aa25ef34ebab |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.controllers.admin.admin ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Controller for Admin panel of Kallithea This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Apr 7, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging from tg import request, tmpl_context as c from sqlalchemy.orm import joinedload from whoosh.qparser.default import QueryParser from whoosh.qparser.dateparse import DateParserPlugin from whoosh import query from sqlalchemy.sql.expression import or_, and_, func from kallithea.config.routing import url from kallithea.model.db import UserLog from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator from kallithea.lib.base import BaseController, render from kallithea.lib.utils2 import safe_int, remove_prefix, remove_suffix from kallithea.lib.indexers import JOURNAL_SCHEMA from kallithea.lib.page import Page log = logging.getLogger(__name__) def _journal_filter(user_log, search_term): """ Filters sqlalchemy user_log based on search_term with whoosh Query language http://packages.python.org/Whoosh/querylang.html :param user_log: :param search_term: """ log.debug('Initial search term: %r', search_term) qry = None if search_term: qp = QueryParser('repository', schema=JOURNAL_SCHEMA) qp.add_plugin(DateParserPlugin()) qry = qp.parse(unicode(search_term)) log.debug('Filtering using parsed query %r', qry) def wildcard_handler(col, wc_term): if wc_term.startswith('*') and not wc_term.endswith('*'): # postfix == endswith wc_term = remove_prefix(wc_term, prefix='*') return func.lower(col).endswith(func.lower(wc_term)) elif wc_term.startswith('*') and wc_term.endswith('*'): # wildcard == ilike wc_term = remove_prefix(wc_term, prefix='*') wc_term = remove_suffix(wc_term, suffix='*') return func.lower(col).contains(func.lower(wc_term)) def get_filterion(field, val, term): if field == 'repository': field = getattr(UserLog, 'repository_name') elif field == 'ip': field = getattr(UserLog, 'user_ip') elif field == 'date': field = getattr(UserLog, 'action_date') elif field == 'username': field = getattr(UserLog, 'username') else: field = getattr(UserLog, field) log.debug('filter field: %s val=>%s', field, val) # sql filtering if isinstance(term, query.Wildcard): return wildcard_handler(field, val) elif isinstance(term, query.Prefix): return func.lower(field).startswith(func.lower(val)) elif isinstance(term, query.DateRange): return and_(field >= val[0], field <= val[1]) return func.lower(field) == func.lower(val) if isinstance(qry, (query.And, query.Term, query.Prefix, query.Wildcard, query.DateRange)): if not isinstance(qry, query.And): qry = [qry] for term in qry: field = term.fieldname val = (term.text if not isinstance(term, query.DateRange) else [term.startdate, term.enddate]) user_log = user_log.filter(get_filterion(field, val, term)) elif isinstance(qry, query.Or): filters = [] for term in qry: field = term.fieldname val = (term.text if not isinstance(term, query.DateRange) else [term.startdate, term.enddate]) filters.append(get_filterion(field, val, term)) user_log = user_log.filter(or_(*filters)) return user_log class AdminController(BaseController): @LoginRequired() def _before(self, *args, **kwargs): super(AdminController, self)._before(*args, **kwargs) @HasPermissionAnyDecorator('hg.admin') def index(self): users_log = UserLog.query() \ .options(joinedload(UserLog.user)) \ .options(joinedload(UserLog.repository)) # FILTERING c.search_term = request.GET.get('filter') users_log = _journal_filter(users_log, c.search_term) users_log = users_log.order_by(UserLog.action_date.desc()) p = safe_int(request.GET.get('page'), 1) def url_generator(**kw): return url.current(filter=c.search_term, **kw) c.users_log = Page(users_log, page=p, items_per_page=10, url=url_generator) if request.environ.get('HTTP_X_PARTIAL_XHR'): return render('admin/admin_log.html') return render('admin/admin.html')