Mercurial > kallithea
changeset 7773:3b147c38b674
ssh: error checking for ssh key management
Based on work by Ilya Beda <ir4y.ix@gmail.com> on
https://bitbucket.org/ir4y/rhodecode/commits/branch/ssh_server_support , and
also heavily modified by Mads Kiilerich.
author | Christian Oyarzun <oyarzun@gmail.com> |
---|---|
date | Mon, 17 Nov 2014 14:42:45 -0500 |
parents | 66c208bf56fe |
children | e53f1db2b839 |
files | kallithea/controllers/admin/my_account.py kallithea/controllers/admin/users.py kallithea/lib/ssh.py kallithea/model/ssh_key.py |
diffstat | 4 files changed, 142 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/kallithea/controllers/admin/my_account.py Mon Nov 17 14:40:35 2014 -0500 +++ b/kallithea/controllers/admin/my_account.py Mon Nov 17 14:42:45 2014 -0500 @@ -46,7 +46,7 @@ from kallithea.model.user import UserModel from kallithea.model.repo import RepoModel from kallithea.model.api_key import ApiKeyModel -from kallithea.model.ssh_key import SshKeyModel +from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException from kallithea.model.meta import Session log = logging.getLogger(__name__) @@ -272,16 +272,22 @@ def my_account_ssh_keys_add(self): description = request.POST.get('description') public_key = request.POST.get('public_key') - new_ssh_key = SshKeyModel().create(request.authuser.user_id, - description, public_key) - Session().commit() - h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success') + try: + new_ssh_key = SshKeyModel().create(request.authuser.user_id, + description, public_key) + Session().commit() + h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success') + except SshKeyModelException as errors: + h.flash(errors.message, category='error') raise HTTPFound(location=url('my_account_ssh_keys')) @IfSshEnabled def my_account_ssh_keys_delete(self): public_key = request.POST.get('del_public_key') - SshKeyModel().delete(public_key, request.authuser.user_id) - Session().commit() - h.flash(_("SSH key successfully deleted"), category='success') + try: + SshKeyModel().delete(public_key, request.authuser.user_id) + Session().commit() + h.flash(_("SSH key successfully deleted"), category='success') + except SshKeyModelException as errors: + h.flash(errors.message, category='error') raise HTTPFound(location=url('my_account_ssh_keys'))
--- a/kallithea/controllers/admin/users.py Mon Nov 17 14:40:35 2014 -0500 +++ b/kallithea/controllers/admin/users.py Mon Nov 17 14:42:45 2014 -0500 @@ -45,7 +45,7 @@ from kallithea.lib import auth_modules from kallithea.lib.base import BaseController, render, IfSshEnabled from kallithea.model.api_key import ApiKeyModel -from kallithea.model.ssh_key import SshKeyModel +from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm from kallithea.model.user import UserModel @@ -448,10 +448,13 @@ description = request.POST.get('description') public_key = request.POST.get('public_key') - new_ssh_key = SshKeyModel().create(c.user.user_id, - description, public_key) - Session().commit() - h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success') + try: + new_ssh_key = SshKeyModel().create(c.user.user_id, + description, public_key) + Session().commit() + h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success') + except SshKeyModelException as errors: + h.flash(errors.message, category='error') raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id)) @IfSshEnabled @@ -459,7 +462,10 @@ c.user = self._get_user_or_raise_if_default(id) public_key = request.POST.get('del_public_key') - SshKeyModel().delete(public_key, c.user.user_id) - Session().commit() - h.flash(_("SSH key successfully deleted"), category='success') + try: + SshKeyModel().delete(public_key, c.user.user_id) + Session().commit() + h.flash(_("SSH key successfully deleted"), category='success') + except SshKeyModelException as errors: + h.flash(errors.message, category='error') raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/kallithea/lib/ssh.py Mon Nov 17 14:42:45 2014 -0500 @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +""" + kallithea.lib.ssh + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + :created_on: Dec 10, 2012 + :author: ir4y + :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com> + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +import binascii +import re + +from tg.i18n import ugettext as _ + +log = logging.getLogger(__name__) + + +class SshKeyParseError(Exception): + """Exception raised by parse_pub_key""" + + +def parse_pub_key(ssh_key): + r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment + + >>> getfixture('doctest_mock_ugettext') + >>> parse_pub_key('') + Traceback (most recent call last): + ... + SshKeyParseError: SSH key is missing + >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''') + Traceback (most recent call last): + ... + SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part + >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''') + Traceback (most recent call last): + ... + SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)' + >>> parse_pub_key('''ssh-rsa AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''') + Traceback (most recent call last): + ... + SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ' + >>> parse_pub_key('''ssh-rsa AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''') + Traceback (most recent call last): + ... + SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa' + >>> parse_pub_key('''ssh-rsa AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''') + Traceback (most recent call last): + ... + SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ" + >>> parse_pub_key(''' ssh-rsa AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment + ... ''') + ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n') + """ + if not ssh_key: + raise SshKeyParseError(_("SSH key is missing")) + + parts = ssh_key.split(None, 2) + if len(parts) < 2: + raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part")) + + keytype, keyvalue, comment = (parts + [''])[:3] + if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'): + raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'")) + + if re.search(r'[^a-zA-Z0-9+/=]', keyvalue): + raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue) + + try: + decoded = keyvalue.decode('base64') + except binascii.Error: + raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue) + + if not decoded.startswith('\x00\x00\x00\x07' + str(keytype) + '\x00'): + raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (str(keytype), str(decoded[4:].split('\0', 1)[0]))) + + return keytype, decoded, comment
--- a/kallithea/model/ssh_key.py Mon Nov 17 14:40:35 2014 -0500 +++ b/kallithea/model/ssh_key.py Mon Nov 17 14:42:45 2014 -0500 @@ -21,11 +21,20 @@ import logging +from tg.i18n import ugettext as _ + +from kallithea.lib.utils2 import safe_str from kallithea.model.db import UserSshKeys, User from kallithea.model.meta import Session +from kallithea.lib import ssh log = logging.getLogger(__name__) + +class SshKeyModelException(Exception): + """Exception raised by SshKeyModel methods to report errors""" + + class SshKeyModel(object): def create(self, user, description, public_key): @@ -33,13 +42,24 @@ :param user: user or user_id :param description: description of SshKey :param publickey: public key text + Will raise SshKeyModelException on errors """ + try: + ssh.parse_pub_key(public_key) + except ssh.SshKeyParseError as e: + raise SshKeyModelException(_('SSH key %r is invalid: %s') % (safe_str(public_key), e.message)) + user = User.guess_instance(user) new_ssh_key = UserSshKeys() new_ssh_key.user_id = user.user_id new_ssh_key.description = description new_ssh_key.public_key = public_key + + for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all(): + raise SshKeyModelException(_('SSH key %s is already used by %s') % + (new_ssh_key.fingerprint, ssh_key.user.username)) + Session().add(new_ssh_key) return new_ssh_key @@ -48,6 +68,7 @@ """ Deletes given public_key, if user is set it also filters the object for deletion by given user. + Will raise SshKeyModelException on errors """ ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key) @@ -56,6 +77,8 @@ ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id) ssh_key = ssh_key.scalar() + if ssh_key is None: + raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key)) Session().delete(ssh_key) def get_ssh_keys(self, user):