changeset 7773:3b147c38b674

ssh: error checking for ssh key management Based on work by Ilya Beda <ir4y.ix@gmail.com> on https://bitbucket.org/ir4y/rhodecode/commits/branch/ssh_server_support , and also heavily modified by Mads Kiilerich.
author Christian Oyarzun <oyarzun@gmail.com>
date Mon, 17 Nov 2014 14:42:45 -0500
parents 66c208bf56fe
children e53f1db2b839
files kallithea/controllers/admin/my_account.py kallithea/controllers/admin/users.py kallithea/lib/ssh.py kallithea/model/ssh_key.py
diffstat 4 files changed, 142 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/kallithea/controllers/admin/my_account.py	Mon Nov 17 14:40:35 2014 -0500
+++ b/kallithea/controllers/admin/my_account.py	Mon Nov 17 14:42:45 2014 -0500
@@ -46,7 +46,7 @@
 from kallithea.model.user import UserModel
 from kallithea.model.repo import RepoModel
 from kallithea.model.api_key import ApiKeyModel
-from kallithea.model.ssh_key import SshKeyModel
+from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 from kallithea.model.meta import Session
 
 log = logging.getLogger(__name__)
@@ -272,16 +272,22 @@
     def my_account_ssh_keys_add(self):
         description = request.POST.get('description')
         public_key = request.POST.get('public_key')
-        new_ssh_key = SshKeyModel().create(request.authuser.user_id,
-                                           description, public_key)
-        Session().commit()
-        h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
+        try:
+            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
+                                               description, public_key)
+            Session().commit()
+            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
+        except SshKeyModelException as errors:
+            h.flash(errors.message, category='error')
         raise HTTPFound(location=url('my_account_ssh_keys'))
 
     @IfSshEnabled
     def my_account_ssh_keys_delete(self):
         public_key = request.POST.get('del_public_key')
-        SshKeyModel().delete(public_key, request.authuser.user_id)
-        Session().commit()
-        h.flash(_("SSH key successfully deleted"), category='success')
+        try:
+            SshKeyModel().delete(public_key, request.authuser.user_id)
+            Session().commit()
+            h.flash(_("SSH key successfully deleted"), category='success')
+        except SshKeyModelException as errors:
+            h.flash(errors.message, category='error')
         raise HTTPFound(location=url('my_account_ssh_keys'))
--- a/kallithea/controllers/admin/users.py	Mon Nov 17 14:40:35 2014 -0500
+++ b/kallithea/controllers/admin/users.py	Mon Nov 17 14:42:45 2014 -0500
@@ -45,7 +45,7 @@
 from kallithea.lib import auth_modules
 from kallithea.lib.base import BaseController, render, IfSshEnabled
 from kallithea.model.api_key import ApiKeyModel
-from kallithea.model.ssh_key import SshKeyModel
+from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
 from kallithea.model.user import UserModel
@@ -448,10 +448,13 @@
 
         description = request.POST.get('description')
         public_key = request.POST.get('public_key')
-        new_ssh_key = SshKeyModel().create(c.user.user_id,
-                                       description, public_key)
-        Session().commit()
-        h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
+        try:
+            new_ssh_key = SshKeyModel().create(c.user.user_id,
+                                               description, public_key)
+            Session().commit()
+            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
+        except SshKeyModelException as errors:
+            h.flash(errors.message, category='error')
         raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 
     @IfSshEnabled
@@ -459,7 +462,10 @@
         c.user = self._get_user_or_raise_if_default(id)
 
         public_key = request.POST.get('del_public_key')
-        SshKeyModel().delete(public_key, c.user.user_id)
-        Session().commit()
-        h.flash(_("SSH key successfully deleted"), category='success')
+        try:
+            SshKeyModel().delete(public_key, c.user.user_id)
+            Session().commit()
+            h.flash(_("SSH key successfully deleted"), category='success')
+        except SshKeyModelException as errors:
+            h.flash(errors.message, category='error')
         raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/kallithea/lib/ssh.py	Mon Nov 17 14:42:45 2014 -0500
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+"""
+    kallithea.lib.ssh
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    :created_on: Dec 10, 2012
+    :author: ir4y
+    :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com>
+    :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import binascii
+import re
+
+from tg.i18n import ugettext as _
+
+log = logging.getLogger(__name__)
+
+
+class SshKeyParseError(Exception):
+    """Exception raised by parse_pub_key"""
+
+
+def parse_pub_key(ssh_key):
+    r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment
+
+    >>> getfixture('doctest_mock_ugettext')
+    >>> parse_pub_key('')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: SSH key is missing
+    >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part
+    >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
+    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
+    >>> parse_pub_key('''ssh-rsa  AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
+    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''')
+    Traceback (most recent call last):
+    ...
+    SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
+    >>> parse_pub_key(''' ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment
+    ... ''')
+    ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
+    """
+    if not ssh_key:
+        raise SshKeyParseError(_("SSH key is missing"))
+
+    parts = ssh_key.split(None, 2)
+    if len(parts) < 2:
+        raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part"))
+
+    keytype, keyvalue, comment = (parts + [''])[:3]
+    if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'):
+        raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'"))
+
+    if re.search(r'[^a-zA-Z0-9+/=]', keyvalue):
+        raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue)
+
+    try:
+        decoded = keyvalue.decode('base64')
+    except binascii.Error:
+        raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue)
+
+    if not decoded.startswith('\x00\x00\x00\x07' + str(keytype) + '\x00'):
+        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (str(keytype), str(decoded[4:].split('\0', 1)[0])))
+
+    return keytype, decoded, comment
--- a/kallithea/model/ssh_key.py	Mon Nov 17 14:40:35 2014 -0500
+++ b/kallithea/model/ssh_key.py	Mon Nov 17 14:42:45 2014 -0500
@@ -21,11 +21,20 @@
 
 import logging
 
+from tg.i18n import ugettext as _
+
+from kallithea.lib.utils2 import safe_str
 from kallithea.model.db import UserSshKeys, User
 from kallithea.model.meta import Session
+from kallithea.lib import ssh
 
 log = logging.getLogger(__name__)
 
+
+class SshKeyModelException(Exception):
+    """Exception raised by SshKeyModel methods to report errors"""
+
+
 class SshKeyModel(object):
 
     def create(self, user, description, public_key):
@@ -33,13 +42,24 @@
         :param user: user or user_id
         :param description: description of SshKey
         :param publickey: public key text
+        Will raise SshKeyModelException on errors
         """
+        try:
+            ssh.parse_pub_key(public_key)
+        except ssh.SshKeyParseError as e:
+            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (safe_str(public_key), e.message))
+
         user = User.guess_instance(user)
 
         new_ssh_key = UserSshKeys()
         new_ssh_key.user_id = user.user_id
         new_ssh_key.description = description
         new_ssh_key.public_key = public_key
+
+        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
+            raise SshKeyModelException(_('SSH key %s is already used by %s') %
+                                       (new_ssh_key.fingerprint, ssh_key.user.username))
+
         Session().add(new_ssh_key)
 
         return new_ssh_key
@@ -48,6 +68,7 @@
         """
         Deletes given public_key, if user is set it also filters the object for
         deletion by given user.
+        Will raise SshKeyModelException on errors
         """
         ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 
@@ -56,6 +77,8 @@
             ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 
         ssh_key = ssh_key.scalar()
+        if ssh_key is None:
+            raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key))
         Session().delete(ssh_key)
 
     def get_ssh_keys(self, user):