view kallithea/tests/models/test_notifications.py @ 6170:82662f9faaf4

celeryd: annotate tasks so they can be run directly without run_task This also makes the system less forgiving about celery configuration problems and thus easier to debug. I like that.
author Mads Kiilerich <madski@unity3d.com>
date Tue, 06 Sep 2016 00:51:18 +0200
parents 8b75085c2c02
children 8d98924c58b1
line wrap: on
line source

import os
import re

import mock
import routes.util

from kallithea.tests import *
from kallithea.lib import helpers as h
from kallithea.model.db import User, Notification, UserNotification
from kallithea.model.user import UserModel
from kallithea.model.meta import Session
from kallithea.model.notification import NotificationModel, EmailNotificationModel

import kallithea.lib.celerylib
import kallithea.lib.celerylib.tasks


class TestNotifications(TestController):

    def setup_method(self, method):
        Session.remove()
        u1 = UserModel().create_or_update(username=u'u1',
                                        password=u'qweqwe',
                                        email=u'u1@example.com',
                                        firstname=u'u1', lastname=u'u1')
        Session().commit()
        self.u1 = u1.user_id

        u2 = UserModel().create_or_update(username=u'u2',
                                        password=u'qweqwe',
                                        email=u'u2@example.com',
                                        firstname=u'u2', lastname=u'u3')
        Session().commit()
        self.u2 = u2.user_id

        u3 = UserModel().create_or_update(username=u'u3',
                                        password=u'qweqwe',
                                        email=u'u3@example.com',
                                        firstname=u'u3', lastname=u'u3')
        Session().commit()
        self.u3 = u3.user_id

        self.remove_all_notifications()
        assert [] == Notification.query().all()
        assert [] == UserNotification.query().all()

    def test_create_notification(self):
        usrs = [self.u1, self.u2]
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
            assert recipients == ['u2@example.com']
            assert subject == 'Test Message'
            assert body == u"hi there"
            assert '>hi there<' in html_body
            assert author.username == 'u1'
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
            notification = NotificationModel().create(created_by=self.u1,
                                               subject=u'subj', body=u'hi there',
                                               recipients=usrs)
        Session().commit()
        u1 = User.get(self.u1)
        u2 = User.get(self.u2)
        u3 = User.get(self.u3)
        notifications = Notification.query().all()
        assert len(notifications) == 1

        assert notifications[0].recipients == [u1, u2]
        assert notification.notification_id == notifications[0].notification_id

        unotification = UserNotification.query() \
            .filter(UserNotification.notification == notification).all()

        assert len(unotification) == len(usrs)
        assert set([x.user.user_id for x in unotification]) == set(usrs)

    def test_user_notifications(self):
        notification1 = NotificationModel().create(created_by=self.u1,
                                            subject=u'subj', body=u'hi there1',
                                            recipients=[self.u3])
        Session().commit()
        notification2 = NotificationModel().create(created_by=self.u1,
                                            subject=u'subj', body=u'hi there2',
                                            recipients=[self.u3])
        Session().commit()
        u3 = Session().query(User).get(self.u3)

        assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])

    def test_delete_notifications(self):
        notification = NotificationModel().create(created_by=self.u1,
                                           subject=u'title', body=u'hi there3',
                                    recipients=[self.u3, self.u1, self.u2])
        Session().commit()
        notifications = Notification.query().all()
        assert notification in notifications

        Notification.delete(notification.notification_id)
        Session().commit()

        notifications = Notification.query().all()
        assert not notification in notifications

        un = UserNotification.query().filter(UserNotification.notification
                                             == notification).all()
        assert un == []

    def test_delete_association(self):
        notification = NotificationModel().create(created_by=self.u1,
                                           subject=u'title', body=u'hi there3',
                                    recipients=[self.u3, self.u1, self.u2])
        Session().commit()

        unotification = UserNotification.query() \
                            .filter(UserNotification.notification ==
                                    notification) \
                            .filter(UserNotification.user_id == self.u3) \
                            .scalar()

        assert unotification.user_id == self.u3

        NotificationModel().delete(self.u3,
                                   notification.notification_id)
        Session().commit()

        u3notification = UserNotification.query() \
                            .filter(UserNotification.notification ==
                                    notification) \
                            .filter(UserNotification.user_id == self.u3) \
                            .scalar()

        assert u3notification == None

        # notification object is still there
        assert Notification.query().all() == [notification]

        #u1 and u2 still have assignments
        u1notification = UserNotification.query() \
                            .filter(UserNotification.notification ==
                                    notification) \
                            .filter(UserNotification.user_id == self.u1) \
                            .scalar()
        assert u1notification != None
        u2notification = UserNotification.query() \
                            .filter(UserNotification.notification ==
                                    notification) \
                            .filter(UserNotification.user_id == self.u2) \
                            .scalar()
        assert u2notification != None

    def test_notification_counter(self):
        NotificationModel().create(created_by=self.u1,
                            subject=u'title', body=u'hi there_delete',
                            recipients=[self.u3, self.u1])
        Session().commit()

        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 0
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 1

        notification = NotificationModel().create(created_by=self.u1,
                                           subject=u'title', body=u'hi there3',
                                    recipients=[self.u3, self.u1, self.u2])
        Session().commit()

        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 1
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 2

    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
    def test_dump_html_mails(self):
        # Exercise all notification types and dump them to one big html file
        l = []

        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
            l.append('<hr/>\n')
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
            l.append('<pre>\n')
            l.append('From: %s\n' % author.username)
            l.append('To: %s\n' % ' '.join(recipients))
            l.append('Subject: %s\n' % subject)
            l.append('</pre>\n')
            l.append('<hr/>\n')
            l.append('<pre>%s</pre>\n' % body)
            l.append('<hr/>\n')
            l.append(html_body)
            l.append('<hr/>\n')

        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
            pr_kwargs = dict(
                pr_nice_id='#7',
                pr_title='The Title',
                pr_title_short='The Title',
                pr_url='http://pr.org/7',
                pr_target_repo='http://mainline.com/repo',
                pr_target_branch='trunk',
                pr_source_repo='https://dev.org/repo',
                pr_source_branch='devbranch',
                pr_owner=User.get(self.u2),
                pr_owner_username='u2'
                )

            for type_, body, kwargs in [
                (Notification.TYPE_CHANGESET_COMMENT,
                 u'This is the new comment.\n\n - and here it ends indented.',
                 dict(
                    short_id='cafe1234',
                    raw_id='cafe1234c0ffeecafe',
                    branch='brunch',
                    cs_comment_user='Opinionated User (jsmith)',
                    cs_comment_url='http://comment.org',
                    is_mention=[False, True],
                    message='This changeset did something clever which is hard to explain',
                    message_short='This changeset did something cl...',
                    status_change=[None, 'Approved'],
                    cs_target_repo='repo_target',
                    cs_url='http://changeset.com',
                    cs_author=User.get(self.u2))),
                (Notification.TYPE_MESSAGE,
                 u'This is the body of the test message\n - nothing interesting here except indentation.',
                 dict()),
                #(Notification.TYPE_MENTION, '$body', None), # not used
                (Notification.TYPE_REGISTRATION,
                 u'Registration body',
                 dict(
                    new_username='newbie',
                    registered_user_url='http://newbie.org',
                    new_email='new@email.com',
                    new_full_name='New Full Name')),
                (Notification.TYPE_PULL_REQUEST,
                 u'This PR is awesome because it does stuff\n - please approve indented!',
                 dict(
                    pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
                    is_mention=[False, True],
                    pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
                    org_repo_name='repo_org',
                    **pr_kwargs)),
                (Notification.TYPE_PULL_REQUEST_COMMENT,
                 u'Me too!\n\n - and indented on second line',
                 dict(
                    closing_pr=[False, True],
                    is_mention=[False, True],
                    pr_comment_user='Opinionated User (jsmith)',
                    pr_comment_url='http://pr.org/comment',
                    status_change=[None, 'Under Review'],
                    **pr_kwargs)),
                ]:
                kwargs['repo_name'] = u'repo/name'
                params = [(type_, type_, body, kwargs)]
                for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
                    if not isinstance(kwargs.get(param_name), list):
                        continue
                    new_params = []
                    for v in kwargs[param_name]:
                        for desc, type_, body, kwargs in params:
                            kwargs = dict(kwargs)
                            kwargs[param_name] = v
                            new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
                    params = new_params

                for desc, type_, body, kwargs in params:
                    # desc is used as "global" variable
                    notification = NotificationModel().create(created_by=self.u1,
                                                       subject=u'unused', body=body, email_kwargs=kwargs,
                                                       recipients=[self.u2], type_=type_)

            # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
            desc = 'TYPE_PASSWORD_RESET'
            kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
            kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
                "Password reset link",
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
                author=User.get(self.u1))

        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))

        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
        with file(outfn, 'w') as f:
            f.write(out)
        with file(reffn) as f:
            ref = f.read()
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
        os.unlink(outfn)