view kallithea/tests/functional/test_login.py @ 8687:5e46f73f0d1c

model: always import the whole db module - drop "from" imports
author Mads Kiilerich <mads@kiilerich.com>
date Mon, 12 Oct 2020 11:12:37 +0200
parents b095e2fbba44
children 67e5b90801aa
line wrap: on
line source

# -*- coding: utf-8 -*-
import re
import time
import urllib.parse

import mock
from tg.util.webtest import test_context

import kallithea.lib.celerylib.tasks
from kallithea.lib import helpers as h
from kallithea.lib.auth import check_password
from kallithea.lib.utils2 import generate_api_key
from kallithea.model import db, meta, validators
from kallithea.model.api_key import ApiKeyModel
from kallithea.model.user import UserModel
from kallithea.tests import base
from kallithea.tests.fixture import Fixture


fixture = Fixture()


class TestLoginController(base.TestController):

    def test_index(self):
        response = self.app.get(base.url(controller='login', action='index'))
        assert response.status == '200 OK'
        # Test response...

    def test_login_admin_ok(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
                                  'password': base.TEST_USER_ADMIN_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
        assert response.status == '302 Found'
        self.assert_authenticated_user(response, base.TEST_USER_ADMIN_LOGIN)

        response = response.follow()
        response.mustcontain('/%s' % base.HG_REPO)

    def test_login_regular_ok(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
                                  'password': base.TEST_USER_REGULAR_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        assert response.status == '302 Found'
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)

        response = response.follow()
        response.mustcontain('/%s' % base.HG_REPO)

    def test_login_regular_email_ok(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_EMAIL,
                                  'password': base.TEST_USER_REGULAR_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        assert response.status == '302 Found'
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)

        response = response.follow()
        response.mustcontain('/%s' % base.HG_REPO)

    def test_login_ok_came_from(self):
        test_came_from = '/_admin/users'
        response = self.app.post(base.url(controller='login', action='index',
                                     came_from=test_came_from),
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
                                  'password': base.TEST_USER_ADMIN_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
        assert response.status == '302 Found'
        response = response.follow()

        assert response.status == '200 OK'
        response.mustcontain('Users Administration')

    def test_login_do_not_remember(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
                                  'password': base.TEST_USER_REGULAR_PASS,
                                  'remember': False,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        assert 'Set-Cookie' in response.headers
        for cookie in response.headers.getall('Set-Cookie'):
            assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie

    def test_login_remember(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
                                  'password': base.TEST_USER_REGULAR_PASS,
                                  'remember': True,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        assert 'Set-Cookie' in response.headers
        for cookie in response.headers.getall('Set-Cookie'):
            assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie

    def test_logout(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
                                  'password': base.TEST_USER_REGULAR_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        # Verify that a login session has been established.
        response = self.app.get(base.url(controller='login', action='index'))
        response = response.follow()
        assert 'authuser' in response.session

        response.click('Log Out')

        # Verify that the login session has been terminated.
        response = self.app.get(base.url(controller='login', action='index'))
        assert 'authuser' not in response.session

    @base.parametrize('url_came_from', [
          ('data:text/html,<script>window.alert("xss")</script>',),
          ('mailto:test@example.com',),
          ('file:///etc/passwd',),
          ('ftp://ftp.example.com',),
          ('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
          ('//evil.example.com/',),
          ('/\r\nX-Header-Injection: boo',),
          ('/invälid_url_bytes',),
          ('non-absolute-path',),
    ])
    def test_login_bad_came_froms(self, url_came_from):
        response = self.app.post(base.url(controller='login', action='index',
                                     came_from=url_came_from),
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
                                  'password': base.TEST_USER_ADMIN_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()},
                                 status=400)

    def test_login_short_password(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
                                  'password': 'as',
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
        assert response.status == '200 OK'

        response.mustcontain('Enter 3 characters or more')

    def test_login_wrong_username_password(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': 'error',
                                  'password': 'test12',
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('Invalid username or password')

    def test_login_non_ascii(self):
        response = self.app.post(base.url(controller='login', action='index'),
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
                                  'password': 'blåbærgrød',
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('>Invalid username or password<')

    # verify that get arguments are correctly passed along login redirection

    @base.parametrize('args', [
        {'foo':'one', 'bar':'two'},
        {'blue': 'blå', 'green': 'grøn'},
    ])
    def test_redirection_to_login_form_preserves_get_args(self, args):
        with fixture.anon_access(False):
            response = self.app.get(base.url(controller='summary', action='index',
                                        repo_name=base.HG_REPO,
                                        **args))
            assert response.status == '302 Found'
            came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.location).query)['came_from'][0]
            came_from_qs = urllib.parse.parse_qsl(urllib.parse.urlparse(came_from).query)
            assert sorted(came_from_qs) == sorted(args.items())

    @base.parametrize('args,args_encoded', [
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
        ({'blue': 'blå', 'green':'grøn'},
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
    ])
    def test_login_form_preserves_get_args(self, args, args_encoded):
        response = self.app.get(base.url(controller='login', action='index',
                                    came_from=base.url('/_admin/users', **args)))
        came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.form.action).query)['came_from'][0]
        for encoded in args_encoded:
            assert encoded in came_from

    @base.parametrize('args,args_encoded', [
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
        ({'blue': 'blå', 'green':'grøn'},
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
    ])
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
        response = self.app.post(base.url(controller='login', action='index',
                                     came_from=base.url('/_admin/users', **args)),
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
                                  'password': base.TEST_USER_ADMIN_PASS,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
        assert response.status == '302 Found'
        for encoded in args_encoded:
            assert encoded in response.location

    @base.parametrize('args,args_encoded', [
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
        ({'blue': 'blå', 'green':'grøn'},
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
    ])
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
        response = self.app.post(base.url(controller='login', action='index',
                                     came_from=base.url('/_admin/users', **args)),
                                 {'username': 'error',
                                  'password': 'test12',
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('Invalid username or password')
        came_from = urllib.parse.parse_qs(urllib.parse.urlparse(response.form.action).query)['came_from'][0]
        for encoded in args_encoded:
            assert encoded in came_from

    #==========================================================================
    # REGISTRATIONS
    #==========================================================================
    def test_register(self):
        response = self.app.get(base.url(controller='login', action='register'))
        response.mustcontain('Sign Up')

    def test_register_err_same_username(self):
        uname = base.TEST_USER_ADMIN_LOGIN
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': uname,
                                             'password': 'test12',
                                             'password_confirmation': 'test12',
                                             'email': 'goodmail@example.com',
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})

        with test_context(self.app):
            msg = validators.ValidUsername()._messages['username_exists']
        msg = h.html_escape(msg % {'username': uname})
        response.mustcontain(msg)

    def test_register_err_same_email(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': 'test_admin_0',
                                             'password': 'test12',
                                             'password_confirmation': 'test12',
                                             'email': base.TEST_USER_ADMIN_EMAIL,
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})

        with test_context(self.app):
            msg = validators.UniqSystemEmail()()._messages['email_taken']
        response.mustcontain(msg)

    def test_register_err_same_email_case_sensitive(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': 'test_admin_1',
                                             'password': 'test12',
                                             'password_confirmation': 'test12',
                                             'email': base.TEST_USER_ADMIN_EMAIL.title(),
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
        with test_context(self.app):
            msg = validators.UniqSystemEmail()()._messages['email_taken']
        response.mustcontain(msg)

    def test_register_err_wrong_data(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': 'xs',
                                             'password': 'test',
                                             'password_confirmation': 'test',
                                             'email': 'goodmailm',
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
        assert response.status == '200 OK'
        response.mustcontain('An email address must contain a single @')
        response.mustcontain('Enter a value 6 characters long or more')

    def test_register_err_username(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': 'error user',
                                             'password': 'test12',
                                             'password_confirmation': 'test12',
                                             'email': 'goodmailm',
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('An email address must contain a single @')
        response.mustcontain('Username may only contain '
                'alphanumeric characters underscores, '
                'periods or dashes and must begin with an '
                'alphanumeric character')

    def test_register_err_case_sensitive(self):
        usr = base.TEST_USER_ADMIN_LOGIN.title()
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': usr,
                                             'password': 'test12',
                                             'password_confirmation': 'test12',
                                             'email': 'goodmailm',
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('An email address must contain a single @')
        with test_context(self.app):
            msg = validators.ValidUsername()._messages['username_exists']
        msg = h.html_escape(msg % {'username': usr})
        response.mustcontain(msg)

    def test_register_special_chars(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                        {'username': 'xxxaxn',
                                         'password': 'ąćźżąśśśś',
                                         'password_confirmation': 'ąćźżąśśśś',
                                         'email': 'goodmailm@test.plx',
                                         'firstname': 'test',
                                         'lastname': 'test',
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()})

        with test_context(self.app):
            msg = validators.ValidPassword()._messages['invalid_password']
        response.mustcontain(msg)

    def test_register_password_mismatch(self):
        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': 'xs',
                                             'password': '123qwe',
                                             'password_confirmation': 'qwe123',
                                             'email': 'goodmailm@test.plxa',
                                             'firstname': 'test',
                                             'lastname': 'test',
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
        with test_context(self.app):
            msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
        response.mustcontain(msg)

    def test_register_ok(self):
        username = 'test_regular4'
        password = 'qweqwe'
        email = 'user4@example.com'
        name = 'testname'
        lastname = 'testlastname'

        response = self.app.post(base.url(controller='login', action='register'),
                                            {'username': username,
                                             'password': password,
                                             'password_confirmation': password,
                                             'email': email,
                                             'firstname': name,
                                             'lastname': lastname,
                                             'admin': True,
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})  # This should be overridden
        assert response.status == '302 Found'
        self.checkSessionFlash(response, 'You have successfully registered with Kallithea')

        ret = meta.Session().query(db.User).filter(db.User.username == 'test_regular4').one()
        assert ret.username == username
        assert check_password(password, ret.password) == True
        assert ret.email == email
        assert ret.name == name
        assert ret.lastname == lastname
        assert ret.api_key is not None
        assert ret.admin == False

    #==========================================================================
    # PASSWORD RESET
    #==========================================================================

    def test_forgot_password_wrong_mail(self):
        bad_email = 'username%wrongmail.org'
        response = self.app.post(
                        base.url(controller='login', action='password_reset'),
                            {'email': bad_email,
                             '_session_csrf_secret_token': self.session_csrf_secret_token()})

        response.mustcontain('An email address must contain a single @')

    def test_forgot_password(self):
        response = self.app.get(base.url(controller='login',
                                    action='password_reset'))
        assert response.status == '200 OK'

        username = 'test_password_reset_1'
        password = 'qweqwe'
        email = 'username@example.com'
        name = 'passwd'
        lastname = 'reset'
        timestamp = int(time.time())

        new = db.User()
        new.username = username
        new.password = password
        new.email = email
        new.name = name
        new.lastname = lastname
        new.api_key = generate_api_key()
        meta.Session().add(new)
        meta.Session().commit()

        token = UserModel().get_reset_password_token(
            db.User.get_by_username(username), timestamp, self.session_csrf_secret_token())

        collected = []
        def mock_send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
            collected.append((recipients, subject, body, html_body))

        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', mock_send_email), \
                mock.patch.object(time, 'time', lambda: timestamp):
            response = self.app.post(base.url(controller='login',
                                         action='password_reset'),
                                     {'email': email,
                                      '_session_csrf_secret_token': self.session_csrf_secret_token()})

        self.checkSessionFlash(response, 'A password reset confirmation code has been sent')

        ((recipients, subject, body, html_body),) = collected
        assert recipients == ['username@example.com']
        assert subject == 'Password reset link'
        assert '\n%s\n' % token in body
        (confirmation_url,) = (line for line in body.splitlines() if line.startswith('http://'))
        assert ' href="%s"' % confirmation_url.replace('&', '&amp;').replace('@', '%40') in html_body

        d = urllib.parse.parse_qs(urllib.parse.urlparse(confirmation_url).query)
        assert d['token'] == [token]
        assert d['timestamp'] == [str(timestamp)]
        assert d['email'] == [email]

        response = response.follow()

        # BAD TOKEN

        bad_token = "bad"

        response = self.app.post(base.url(controller='login',
                                     action='password_reset_confirmation'),
                                 {'email': email,
                                  'timestamp': timestamp,
                                  'password': "p@ssw0rd",
                                  'password_confirm': "p@ssw0rd",
                                  'token': bad_token,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
                                 })
        assert response.status == '200 OK'
        response.mustcontain('Invalid password reset token')

        # GOOD TOKEN

        response = self.app.get(confirmation_url)
        assert response.status == '200 OK'
        response.mustcontain("You are about to set a new password for the email address %s" % email)
        response.mustcontain('<form action="%s" method="post">' % base.url(controller='login', action='password_reset_confirmation'))
        response.mustcontain('value="%s"' % self.session_csrf_secret_token())
        response.mustcontain('value="%s"' % token)
        response.mustcontain('value="%s"' % timestamp)
        response.mustcontain('value="username@example.com"')

        # fake a submit of that form
        response = self.app.post(base.url(controller='login',
                                     action='password_reset_confirmation'),
                                 {'email': email,
                                  'timestamp': timestamp,
                                  'password': "p@ssw0rd",
                                  'password_confirm': "p@ssw0rd",
                                  'token': token,
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
                                 })
        assert response.status == '302 Found'
        self.checkSessionFlash(response, 'Successfully updated password')

        response = response.follow()

    #==========================================================================
    # API
    #==========================================================================

    def _api_key_test(self, api_key, status):
        """Verifies HTTP status code for accessing an auth-requiring page,
        using the given api_key URL parameter as well as using the API key
        with bearer authentication.

        If api_key is None, no api_key is passed at all. If api_key is True,
        a real, working API key is used.
        """
        with fixture.anon_access(False):
            if api_key is None:
                params = {}
                headers = {}
            else:
                if api_key is True:
                    api_key = db.User.get_first_admin().api_key
                params = {'api_key': api_key}
                headers = {'Authorization': 'Bearer ' + str(api_key)}

            self.app.get(base.url(controller='changeset', action='changeset_raw',
                             repo_name=base.HG_REPO, revision='tip', **params),
                         status=status)

            self.app.get(base.url(controller='changeset', action='changeset_raw',
                             repo_name=base.HG_REPO, revision='tip'),
                         headers=headers,
                         status=status)

    @base.parametrize('test_name,api_key,code', [
        ('none', None, 302),
        ('empty_string', '', 403),
        ('fake_number', '123456', 403),
        ('fake_not_alnum', 'a-z', 403),
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
        ('proper_api_key', True, 200)
    ])
    def test_access_page_via_api_key(self, test_name, api_key, code):
        self._api_key_test(api_key, code)

    def test_access_page_via_extra_api_key(self):
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, 'test')
        meta.Session().commit()
        self._api_key_test(new_api_key.api_key, status=200)

    def test_access_page_via_expired_api_key(self):
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, 'test')
        meta.Session().commit()
        # patch the API key and make it expired
        new_api_key.expires = 0
        meta.Session().commit()
        self._api_key_test(new_api_key.api_key, status=403)