Mercurial > kallithea
view kallithea/controllers/admin/repos.py @ 8998:a80ac2033a8a draft i18n tip
i18n: updated translation for Greek
Currently translated at 100.0% (1082 of 1082 strings)
author | Aristotelis Stageiritis <aristotelis79@gmail.com> |
---|---|
date | Sun, 07 Apr 2024 08:07:49 +0200 |
parents | e3d033042fca |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.controllers.admin.repos ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Repositories controller for Kallithea This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Apr 7, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging import traceback import formencode from formencode import htmlfill from tg import request from tg import tmpl_context as c from tg.i18n import ugettext as _ from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound import kallithea from kallithea.controllers import base from kallithea.lib import webutils from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired, NotAnonymous from kallithea.lib.exceptions import AttachedForksError from kallithea.lib.utils2 import safe_int from kallithea.lib.vcs import RepositoryError from kallithea.lib.webutils import url from kallithea.model import db, meta, userlog from kallithea.model.forms import RepoFieldForm, RepoForm, RepoPermsForm from kallithea.model.repo import RepoModel from kallithea.model.scm import AvailableRepoGroupChoices, RepoList, ScmModel log = logging.getLogger(__name__) class ReposController(base.BaseRepoController): @LoginRequired(allow_default_user=True) def _before(self, *args, **kwargs): super(ReposController, self)._before(*args, **kwargs) def _load_repo(self): repo_obj = c.db_repo if repo_obj is None: raise HTTPNotFound() return repo_obj def __load_defaults(self, repo=None): extras = [] if repo is None else [repo.group] c.repo_groups = AvailableRepoGroupChoices('write', extras) c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo) def __load_data(self): """ Load defaults settings for edit, and update """ c.repo_info = self._load_repo() self.__load_defaults(c.repo_info) defaults = RepoModel()._get_defaults(c.repo_name) defaults['clone_uri'] = c.repo_info.clone_uri_hidden # don't show password defaults['permanent_url'] = c.repo_info.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=True) return defaults def index(self, format='html'): repos_list = RepoList(db.Repository.query(sorted=True).all(), perm_level='admin') # the repo list will be filtered to only show repos where the user has read permissions repos_data = RepoModel().get_repos_as_dict(repos_list, admin=True) # data used to render the grid c.data = repos_data return base.render('admin/repos/repos.html') @NotAnonymous() def create(self): self.__load_defaults() try: # CanWriteGroup validators checks permissions of this POST form_result = RepoForm(repo_groups=c.repo_groups, landing_revs=c.landing_revs_choices)() \ .to_python(dict(request.POST)) except formencode.Invalid as errors: log.info(errors) return htmlfill.render( base.render('admin/repos/repo_add.html'), defaults=errors.value, errors=errors.error_dict or {}, prefix_error=False, force_defaults=False, encoding="UTF-8") try: # create is done sometimes async on celery, db transaction # management is handled there. RepoModel().create(form_result, request.authuser.user_id) except Exception: log.error(traceback.format_exc()) msg = (_('Error creating repository %s') % form_result.get('repo_name')) webutils.flash(msg, category='error') raise HTTPFound(location=url('home')) raise HTTPFound(location=webutils.url('repo_creating_home', repo_name=form_result['repo_name_full'], )) @NotAnonymous() def create_repository(self): self.__load_defaults() if not c.repo_groups: raise HTTPForbidden parent_group = request.GET.get('parent_group') ## apply the defaults from defaults page defaults = db.Setting.get_default_repo_settings(strip_prefix=True) if parent_group: prg = db.RepoGroup.get(parent_group) if prg is None or not any(rgc[0] == prg.group_id for rgc in c.repo_groups): raise HTTPForbidden else: parent_group = '-1' defaults.update({'repo_group': parent_group}) return htmlfill.render( base.render('admin/repos/repo_add.html'), defaults=defaults, errors={}, prefix_error=False, encoding="UTF-8", force_defaults=False) @LoginRequired() def repo_creating(self, repo_name): c.repo = repo_name if not c.repo: raise HTTPNotFound() return base.render('admin/repos/repo_creating.html') @LoginRequired() @base.jsonify def repo_check(self, repo_name): c.repo = repo_name repo = db.Repository.get_by_repo_name(repo_name) if repo and repo.repo_state == db.Repository.STATE_CREATED: if repo.clone_uri: webutils.flash(_('Created repository %s from %s') % (repo.repo_name, repo.clone_uri_hidden), category='success') else: repo_url = webutils.link_to(repo.repo_name, webutils.url('summary_home', repo_name=repo.repo_name)) fork = repo.fork if fork is not None: fork_name = fork.repo_name webutils.flash(webutils.HTML(_('Forked repository %s as %s')) % (fork_name, repo_url), category='success') else: webutils.flash(webutils.HTML(_('Created repository %s')) % repo_url, category='success') return {'result': True} return {'result': False} @HasRepoPermissionLevelDecorator('admin') def update(self, repo_name): c.repo_info = self._load_repo() self.__load_defaults(c.repo_info) c.active = 'settings' c.repo_fields = db.RepositoryField.query() \ .filter(db.RepositoryField.repository == c.repo_info).all() repo_model = RepoModel() changed_name = repo_name repo = db.Repository.get_by_repo_name(repo_name) old_data = { 'repo_name': repo_name, 'repo_group': repo.group.get_dict() if repo.group else {}, 'repo_type': repo.repo_type, } _form = RepoForm(edit=True, old_data=old_data, repo_groups=c.repo_groups, landing_revs=c.landing_revs_choices)() try: form_result = _form.to_python(dict(request.POST)) repo = repo_model.update(repo_name, **form_result) ScmModel().mark_for_invalidation(repo_name) webutils.flash(_('Repository %s updated successfully') % repo_name, category='success') changed_name = repo.repo_name userlog.action_logger(request.authuser, 'admin_updated_repo', changed_name, request.ip_addr) meta.Session().commit() except formencode.Invalid as errors: log.info(errors) defaults = self.__load_data() defaults.update(errors.value) return htmlfill.render( base.render('admin/repos/repo_edit.html'), defaults=defaults, errors=errors.error_dict or {}, prefix_error=False, encoding="UTF-8", force_defaults=False) except Exception: log.error(traceback.format_exc()) webutils.flash(_('Error occurred during update of repository %s') % repo_name, category='error') raise HTTPFound(location=url('edit_repo', repo_name=changed_name)) @HasRepoPermissionLevelDecorator('admin') def delete(self, repo_name): repo_model = RepoModel() repo = repo_model.get_by_repo_name(repo_name) if not repo: raise HTTPNotFound() try: _forks = repo.forks.count() handle_forks = None if _forks and request.POST.get('forks'): do = request.POST['forks'] if do == 'detach_forks': handle_forks = 'detach' webutils.flash(_('Detached %s forks') % _forks, category='success') elif do == 'delete_forks': handle_forks = 'delete' webutils.flash(_('Deleted %s forks') % _forks, category='success') repo_model.delete(repo, forks=handle_forks) userlog.action_logger(request.authuser, 'admin_deleted_repo', repo_name, request.ip_addr) ScmModel().mark_for_invalidation(repo_name) webutils.flash(_('Deleted repository %s') % repo_name, category='success') meta.Session().commit() except AttachedForksError: webutils.flash(_('Cannot delete repository %s which still has forks') % repo_name, category='warning') except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during deletion of %s') % repo_name, category='error') if repo.group: raise HTTPFound(location=url('repos_group_home', group_name=repo.group.group_name)) raise HTTPFound(location=url('repos')) @HasRepoPermissionLevelDecorator('admin') def edit(self, repo_name): defaults = self.__load_data() c.repo_fields = db.RepositoryField.query() \ .filter(db.RepositoryField.repository == c.repo_info).all() c.active = 'settings' return htmlfill.render( base.render('admin/repos/repo_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) @HasRepoPermissionLevelDecorator('admin') def edit_permissions(self, repo_name): c.repo_info = self._load_repo() c.active = 'permissions' defaults = RepoModel()._get_defaults(repo_name) return htmlfill.render( base.render('admin/repos/repo_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) @HasRepoPermissionLevelDecorator('admin') def edit_permissions_update(self, repo_name): form = RepoPermsForm()().to_python(request.POST) RepoModel()._update_permissions(repo_name, form['perms_new'], form['perms_updates']) # TODO: implement this #action_logger(request.authuser, 'admin_changed_repo_permissions', # repo_name, request.ip_addr) meta.Session().commit() webutils.flash(_('Repository permissions updated'), category='success') raise HTTPFound(location=url('edit_repo_perms', repo_name=repo_name)) @HasRepoPermissionLevelDecorator('admin') def edit_permissions_revoke(self, repo_name): try: obj_type = request.POST.get('obj_type') obj_id = None if obj_type == 'user': obj_id = safe_int(request.POST.get('user_id')) elif obj_type == 'user_group': obj_id = safe_int(request.POST.get('user_group_id')) else: assert False if obj_type == 'user': RepoModel().revoke_user_permission(repo=repo_name, user=obj_id) elif obj_type == 'user_group': RepoModel().revoke_user_group_permission( repo=repo_name, group_name=obj_id ) else: assert False # TODO: implement this #action_logger(request.authuser, 'admin_revoked_repo_permissions', # repo_name, request.ip_addr) meta.Session().commit() except Exception: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during revoking of permission'), category='error') raise HTTPInternalServerError() return [] @HasRepoPermissionLevelDecorator('admin') def edit_fields(self, repo_name): c.repo_info = self._load_repo() c.repo_fields = db.RepositoryField.query() \ .filter(db.RepositoryField.repository == c.repo_info).all() c.active = 'fields' if request.POST: raise HTTPFound(location=url('repo_edit_fields')) return base.render('admin/repos/repo_edit.html') @HasRepoPermissionLevelDecorator('admin') def create_repo_field(self, repo_name): try: form_result = RepoFieldForm()().to_python(dict(request.POST)) new_field = db.RepositoryField() new_field.repository = db.Repository.get_by_repo_name(repo_name) new_field.field_key = form_result['new_field_key'] new_field.field_type = form_result['new_field_type'] # python type new_field.field_value = form_result['new_field_value'] # set initial blank value new_field.field_desc = form_result['new_field_desc'] new_field.field_label = form_result['new_field_label'] meta.Session().add(new_field) meta.Session().commit() except formencode.Invalid as e: webutils.flash(_('Field validation error: %s') % e.msg, category='error') except Exception as e: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during creation of field: %r') % e, category='error') raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name)) @HasRepoPermissionLevelDecorator('admin') def delete_repo_field(self, repo_name, field_id): field = db.RepositoryField.get_or_404(field_id) try: meta.Session().delete(field) meta.Session().commit() except Exception as e: log.error(traceback.format_exc()) msg = _('An error occurred during removal of field') webutils.flash(msg, category='error') raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name)) @HasRepoPermissionLevelDecorator('admin') def edit_advanced(self, repo_name): c.repo_info = self._load_repo() c.default_user_id = kallithea.DEFAULT_USER_ID c.in_public_journal = db.UserFollowing.query() \ .filter(db.UserFollowing.user_id == c.default_user_id) \ .filter(db.UserFollowing.follows_repository == c.repo_info).scalar() _repos = db.Repository.query(sorted=True).all() read_access_repos = RepoList(_repos, perm_level='read') c.repos_list = [(None, _('-- Not a fork --'))] c.repos_list += [(x.repo_id, x.repo_name) for x in read_access_repos if x.repo_id != c.repo_info.repo_id and x.repo_type == c.repo_info.repo_type] defaults = { 'id_fork_of': c.repo_info.fork_id if c.repo_info.fork_id else '' } c.active = 'advanced' if request.POST: raise HTTPFound(location=url('repo_edit_advanced')) return htmlfill.render( base.render('admin/repos/repo_edit.html'), defaults=defaults, encoding="UTF-8", force_defaults=False) @HasRepoPermissionLevelDecorator('admin') def edit_advanced_journal(self, repo_name): """ Sets this repository to be visible in public journal, in other words asking default user to follow this repo :param repo_name: """ try: repo_id = db.Repository.get_by_repo_name(repo_name).repo_id user_id = kallithea.DEFAULT_USER_ID self.scm_model.toggle_following_repo(repo_id, user_id) webutils.flash(_('Updated repository visibility in public journal'), category='success') meta.Session().commit() except Exception: webutils.flash(_('An error occurred during setting this' ' repository in public journal'), category='error') raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name)) @HasRepoPermissionLevelDecorator('admin') def edit_advanced_fork(self, repo_name): """ Mark given repository as a fork of another :param repo_name: """ try: fork_id = request.POST.get('id_fork_of') repo = ScmModel().mark_as_fork(repo_name, fork_id, request.authuser.username) fork = repo.fork.repo_name if repo.fork else _('Nothing') meta.Session().commit() webutils.flash(_('Marked repository %s as fork of %s') % (repo_name, fork), category='success') except RepositoryError as e: log.error(traceback.format_exc()) webutils.flash(e, category='error') except Exception as e: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during this operation'), category='error') raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name)) @HasRepoPermissionLevelDecorator('admin') def edit_remote(self, repo_name): c.repo_info = self._load_repo() c.active = 'remote' if request.POST: try: ScmModel().pull_changes(repo_name, request.authuser.username, request.ip_addr) webutils.flash(_('Pulled from remote location'), category='success') except Exception as e: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during pull from remote location'), category='error') raise HTTPFound(location=url('edit_repo_remote', repo_name=c.repo_name)) return base.render('admin/repos/repo_edit.html') @HasRepoPermissionLevelDecorator('admin') def edit_statistics(self, repo_name): c.repo_info = self._load_repo() repo = c.repo_info.scm_instance if c.repo_info.stats: # this is on what revision we ended up so we add +1 for count last_rev = c.repo_info.stats.stat_on_revision + 1 else: last_rev = 0 c.stats_revision = last_rev c.repo_last_rev = repo.count() if repo.revisions else 0 if last_rev == 0 or c.repo_last_rev == 0: c.stats_percentage = 0 else: c.stats_percentage = '%.2f' % ((float((last_rev)) / c.repo_last_rev) * 100) c.active = 'statistics' if request.POST: try: RepoModel().delete_stats(repo_name) meta.Session().commit() except Exception as e: log.error(traceback.format_exc()) webutils.flash(_('An error occurred during deletion of repository stats'), category='error') raise HTTPFound(location=url('edit_repo_statistics', repo_name=c.repo_name)) return base.render('admin/repos/repo_edit.html')