view kallithea/controllers/admin/gists.py @ 6071:32b674ab4878

routing: drop unused PUT handlers
author Mads Kiilerich <madski@unity3d.com>
date Thu, 04 Aug 2016 14:23:36 +0200
parents c0a3519e7d2c
children af3539a458f6
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.controllers.admin.gists
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

gist controller for Kallithea

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: May 9, 2013
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""

import time
import logging
import traceback
import formencode.htmlfill

from pylons import request, response, tmpl_context as c, url
from pylons.i18n.translation import _
from webob.exc import HTTPFound, HTTPNotFound, HTTPForbidden

from kallithea.model.forms import GistForm
from kallithea.model.gist import GistModel
from kallithea.model.meta import Session
from kallithea.model.db import Gist, User
from kallithea.lib import helpers as h
from kallithea.lib.base import BaseController, render
from kallithea.lib.auth import LoginRequired, NotAnonymous
from kallithea.lib.utils import jsonify
from kallithea.lib.utils2 import safe_int, safe_unicode, time_to_datetime
from kallithea.lib.helpers import Page
from sqlalchemy.sql.expression import or_
from kallithea.lib.vcs.exceptions import VCSError, NodeNotChangedError

log = logging.getLogger(__name__)


class GistsController(BaseController):
    """REST Controller styled on the Atom Publishing Protocol"""

    def __load_defaults(self, extra_values=None):
        c.lifetime_values = [
            (str(-1), _('Forever')),
            (str(5), _('5 minutes')),
            (str(60), _('1 hour')),
            (str(60 * 24), _('1 day')),
            (str(60 * 24 * 30), _('1 month')),
        ]
        if extra_values:
            c.lifetime_values.append(extra_values)
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]

    @LoginRequired()
    def index(self):
        not_default_user = not c.authuser.is_default_user
        c.show_private = request.GET.get('private') and not_default_user
        c.show_public = request.GET.get('public') and not_default_user

        gists = Gist().query() \
            .filter(or_(Gist.gist_expires == -1, Gist.gist_expires >= time.time())) \
            .order_by(Gist.created_on.desc())

        # MY private
        if c.show_private and not c.show_public:
            gists = gists.filter(Gist.gist_type == Gist.GIST_PRIVATE) \
                             .filter(Gist.gist_owner == c.authuser.user_id)
        # MY public
        elif c.show_public and not c.show_private:
            gists = gists.filter(Gist.gist_type == Gist.GIST_PUBLIC) \
                             .filter(Gist.gist_owner == c.authuser.user_id)

        # MY public+private
        elif c.show_private and c.show_public:
            gists = gists.filter(or_(Gist.gist_type == Gist.GIST_PUBLIC,
                                     Gist.gist_type == Gist.GIST_PRIVATE)) \
                             .filter(Gist.gist_owner == c.authuser.user_id)

        # default show ALL public gists
        if not c.show_public and not c.show_private:
            gists = gists.filter(Gist.gist_type == Gist.GIST_PUBLIC)

        c.gists = gists
        p = safe_int(request.GET.get('page'), 1)
        c.gists_pager = Page(c.gists, page=p, items_per_page=10)
        return render('admin/gists/index.html')

    @LoginRequired()
    @NotAnonymous()
    def create(self):
        self.__load_defaults()
        gist_form = GistForm([x[0] for x in c.lifetime_values])()
        try:
            form_result = gist_form.to_python(dict(request.POST))
            #TODO: multiple files support, from the form
            filename = form_result['filename'] or Gist.DEFAULT_FILENAME
            nodes = {
                filename: {
                    'content': form_result['content'],
                    'lexer': form_result['mimetype']  # None is autodetect
                }
            }
            _public = form_result['public']
            gist_type = Gist.GIST_PUBLIC if _public else Gist.GIST_PRIVATE
            gist = GistModel().create(
                description=form_result['description'],
                owner=c.authuser.user_id,
                gist_mapping=nodes,
                gist_type=gist_type,
                lifetime=form_result['lifetime']
            )
            Session().commit()
            new_gist_id = gist.gist_access_id
        except formencode.Invalid as errors:
            defaults = errors.value

            return formencode.htmlfill.render(
                render('admin/gists/new.html'),
                defaults=defaults,
                errors=errors.error_dict or {},
                prefix_error=False,
                encoding="UTF-8",
                force_defaults=False)

        except Exception as e:
            log.error(traceback.format_exc())
            h.flash(_('Error occurred during gist creation'), category='error')
            raise HTTPFound(location=url('new_gist'))
        raise HTTPFound(location=url('gist', gist_id=new_gist_id))

    @LoginRequired()
    @NotAnonymous()
    def new(self, format='html'):
        self.__load_defaults()
        return render('admin/gists/new.html')

    @LoginRequired()
    @NotAnonymous()
    def delete(self, gist_id):
        gist = GistModel().get_gist(gist_id)
        owner = gist.gist_owner == c.authuser.user_id
        if h.HasPermissionAny('hg.admin')() or owner:
            GistModel().delete(gist)
            Session().commit()
            h.flash(_('Deleted gist %s') % gist.gist_access_id, category='success')
        else:
            raise HTTPForbidden()

        raise HTTPFound(location=url('gists'))

    @LoginRequired()
    def show(self, gist_id, revision='tip', format='html', f_path=None):
        c.gist = Gist.get_or_404(gist_id)

        #check if this gist is not expired
        if c.gist.gist_expires != -1:
            if time.time() > c.gist.gist_expires:
                log.error('Gist expired at %s',
                          time_to_datetime(c.gist.gist_expires))
                raise HTTPNotFound()
        try:
            c.file_changeset, c.files = GistModel().get_gist_files(gist_id,
                                                            revision=revision)
        except VCSError:
            log.error(traceback.format_exc())
            raise HTTPNotFound()
        if format == 'raw':
            content = '\n\n'.join([f.content for f in c.files if (f_path is None or safe_unicode(f.path) == f_path)])
            response.content_type = 'text/plain'
            return content
        return render('admin/gists/show.html')

    @LoginRequired()
    @NotAnonymous()
    def edit(self, gist_id, format='html'):
        c.gist = Gist.get_or_404(gist_id)

        #check if this gist is not expired
        if c.gist.gist_expires != -1:
            if time.time() > c.gist.gist_expires:
                log.error('Gist expired at %s',
                          time_to_datetime(c.gist.gist_expires))
                raise HTTPNotFound()
        try:
            c.file_changeset, c.files = GistModel().get_gist_files(gist_id)
        except VCSError:
            log.error(traceback.format_exc())
            raise HTTPNotFound()

        self.__load_defaults(extra_values=('0', _('Unmodified')))
        rendered = render('admin/gists/edit.html')

        if request.POST:
            rpost = request.POST
            nodes = {}
            for org_filename, filename, mimetype, content in zip(
                                                    rpost.getall('org_files'),
                                                    rpost.getall('files'),
                                                    rpost.getall('mimetypes'),
                                                    rpost.getall('contents')):

                nodes[org_filename] = {
                    'org_filename': org_filename,
                    'filename': filename,
                    'content': content,
                    'lexer': mimetype,
                }
            try:
                GistModel().update(
                    gist=c.gist,
                    description=rpost['description'],
                    owner=c.gist.owner,
                    gist_mapping=nodes,
                    gist_type=c.gist.gist_type,
                    lifetime=rpost['lifetime']
                )

                Session().commit()
                h.flash(_('Successfully updated gist content'), category='success')
            except NodeNotChangedError:
                # raised if nothing was changed in repo itself. We anyway then
                # store only DB stuff for gist
                Session().commit()
                h.flash(_('Successfully updated gist data'), category='success')
            except Exception:
                log.error(traceback.format_exc())
                h.flash(_('Error occurred during update of gist %s') % gist_id,
                        category='error')

            raise HTTPFound(location=url('gist', gist_id=gist_id))

        return rendered

    @LoginRequired()
    @NotAnonymous()
    @jsonify
    def check_revision(self, gist_id):
        c.gist = Gist.get_or_404(gist_id)
        last_rev = c.gist.scm_instance.get_changeset()
        success = True
        revision = request.POST.get('revision')

        ##TODO: maybe move this to model ?
        if revision != last_rev.raw_id:
            log.error('Last revision %s is different than submitted %s',
                      revision, last_rev)
            # our gist has newer version than we
            success = False

        return {'success': success}