view kallithea/model/changeset_status.py @ 8685:dff9658bdd98

model: don't import Session from db - import meta and get it from the real source
author Mads Kiilerich <mads@kiilerich.com>
date Mon, 12 Oct 2020 01:44:10 +0200
parents 0a277465fddf
children 5e46f73f0d1c
line wrap: on
line source

# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.model.changeset_status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Changeset status controller

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Apr 30, 2012
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""

import logging

from sqlalchemy.orm import joinedload

from kallithea.model import meta
from kallithea.model.db import ChangesetStatus, PullRequest, Repository, User


log = logging.getLogger(__name__)


class ChangesetStatusModel(object):

    def _get_status_query(self, repo, revision, pull_request,
                          with_revisions=False):
        repo = Repository.guess_instance(repo)

        q = ChangesetStatus.query() \
            .filter(ChangesetStatus.repo == repo)
        if not with_revisions:
            # only report the latest vote across all users! TODO: be smarter!
            q = q.filter(ChangesetStatus.version == 0)

        if revision:
            q = q.filter(ChangesetStatus.revision == revision)
        elif pull_request:
            pull_request = PullRequest.guess_instance(pull_request)
            q = q.filter(ChangesetStatus.pull_request == pull_request)
        else:
            raise Exception('Please specify revision or pull_request')
        q = q.order_by(ChangesetStatus.version.asc())
        return q

    def _calculate_status(self, statuses):
        """
        Given a list of statuses, calculate the resulting status, according to
        the policy: approve if consensus, reject when at least one reject.
        """

        if not statuses:
            return ChangesetStatus.STATUS_UNDER_REVIEW

        if all(st and st.status == ChangesetStatus.STATUS_APPROVED for st in statuses):
            return ChangesetStatus.STATUS_APPROVED

        if any(st and st.status == ChangesetStatus.STATUS_REJECTED for st in statuses):
            return ChangesetStatus.STATUS_REJECTED

        return ChangesetStatus.STATUS_UNDER_REVIEW

    def calculate_pull_request_result(self, pull_request):
        """
        Return a tuple (reviewers, pending reviewers, pull request status)
        Only approve and reject counts as valid votes.
        """

        # collect latest votes from all voters
        cs_statuses = dict()
        for st in reversed(self.get_statuses(pull_request.org_repo,
                                             pull_request=pull_request,
                                             with_revisions=True)):
            cs_statuses[st.author.username] = st

        # collect votes from official reviewers
        pull_request_reviewers = []
        pull_request_pending_reviewers = []
        relevant_statuses = []
        for user in pull_request.get_reviewer_users():
            st = cs_statuses.get(user.username)
            relevant_statuses.append(st)
            status = ChangesetStatus.STATUS_NOT_REVIEWED if st is None else st.status
            if status in (ChangesetStatus.STATUS_NOT_REVIEWED,
                          ChangesetStatus.STATUS_UNDER_REVIEW):
                pull_request_pending_reviewers.append(user)
            pull_request_reviewers.append((user, status))

        result = self._calculate_status(relevant_statuses)

        return (pull_request_reviewers,
                pull_request_pending_reviewers,
                result)

    def get_statuses(self, repo, revision=None, pull_request=None,
                     with_revisions=False):
        q = self._get_status_query(repo, revision, pull_request,
                                   with_revisions)
        q = q.options(joinedload('author'))
        return q.all()

    def get_status(self, repo, revision=None, pull_request=None, as_str=True):
        """
        Returns latest status of changeset for given revision or for given
        pull request. Statuses are versioned inside a table itself and
        version == 0 is always the current one

        :param repo:
        :param revision: 40char hash or None
        :param pull_request: pull_request reference
        :param as_str: return status as string not object
        """
        q = self._get_status_query(repo, revision, pull_request)

        # need to use first here since there can be multiple statuses
        # returned from pull_request
        status = q.first()
        if as_str:
            return str(status.status) if status else ChangesetStatus.DEFAULT
        return status

    def set_status(self, repo, status, user, comment, revision=None,
                   pull_request=None):
        """
        Creates new status for changeset or updates the old ones bumping their
        version, leaving the current status at the value of 'status'.

        :param repo:
        :param status:
        :param user:
        :param comment:
        :param revision:
        :param pull_request:
        """
        repo = Repository.guess_instance(repo)

        q = ChangesetStatus.query()
        if revision is not None:
            assert pull_request is None
            q = q.filter(ChangesetStatus.repo == repo)
            q = q.filter(ChangesetStatus.revision == revision)
            revisions = [revision]
        else:
            assert pull_request is not None
            pull_request = PullRequest.guess_instance(pull_request)
            repo = pull_request.org_repo
            q = q.filter(ChangesetStatus.repo == repo)
            q = q.filter(ChangesetStatus.revision.in_(pull_request.revisions))
            revisions = pull_request.revisions
        cur_statuses = q.all()

        # update all current statuses with older version
        for st in cur_statuses:
            st.version += 1

        new_statuses = []
        for rev in revisions:
            new_status = ChangesetStatus()
            new_status.version = 0 # default
            new_status.author = User.guess_instance(user)
            new_status.repo = Repository.guess_instance(repo)
            new_status.status = status
            new_status.comment = comment
            new_status.revision = rev
            new_status.pull_request = pull_request
            new_statuses.append(new_status)
            meta.Session().add(new_status)
        return new_statuses