Mercurial > kallithea
view kallithea/lib/auth_modules/auth_ldap.py @ 8998:a80ac2033a8a draft i18n tip
i18n: updated translation for Greek
Currently translated at 100.0% (1082 of 1082 strings)
author | Aristotelis Stageiritis <aristotelis79@gmail.com> |
---|---|
date | Sun, 07 Apr 2024 08:07:49 +0200 |
parents | 6f9970a36190 |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ kallithea.lib.auth_modules.auth_ldap ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Kallithea authentication plugin for LDAP This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Created on Nov 17, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import logging from kallithea.lib import auth_modules from kallithea.lib.compat import hybrid_property from kallithea.lib.exceptions import LdapConnectionError, LdapImportError, LdapPasswordError, LdapUsernameError from kallithea.lib.utils2 import safe_str log = logging.getLogger(__name__) try: import ldap import ldap.filter except ImportError: # means that python-ldap is not installed ldap = None class AuthLdap(object): def __init__(self, server, base_dn, port=None, bind_dn='', bind_pass='', tls_kind='LDAPS', tls_reqcert='DEMAND', cacertdir=None, ldap_version=3, ldap_filter='(&(objectClass=user)(!(objectClass=computer)))', search_scope='SUBTREE', attr_login='uid'): if ldap is None: raise LdapImportError self.ldap_version = ldap_version self.TLS_KIND = tls_kind OPT_X_TLS_DEMAND = 2 self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert, OPT_X_TLS_DEMAND) self.cacertdir = cacertdir protocol = 'ldaps' if self.TLS_KIND == 'LDAPS' else 'ldap' if not port: port = 636 if self.TLS_KIND == 'LDAPS' else 389 self.LDAP_SERVER = str(', '.join( "%s://%s:%s" % (protocol, host.strip(), port) for host in server.split(','))) self.LDAP_BIND_DN = bind_dn self.LDAP_BIND_PASS = bind_pass self.BASE_DN = base_dn self.LDAP_FILTER = ldap_filter self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope) self.attr_login = attr_login def authenticate_ldap(self, username, password): """ Authenticate a user via LDAP and return his/her LDAP properties. Raises AuthenticationError if the credentials are rejected, or EnvironmentError if the LDAP server can't be reached. :param username: username :param password: password """ if not password: log.debug("Attempt to authenticate LDAP user " "with blank password rejected.") raise LdapPasswordError() if "," in username: raise LdapUsernameError("invalid character in username: ,") try: if self.cacertdir: if hasattr(ldap, 'OPT_X_TLS_CACERTDIR'): ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, self.cacertdir) else: log.debug("OPT_X_TLS_CACERTDIR is not available - can't set %s", self.cacertdir) ldap.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF) ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON) ldap.set_option(ldap.OPT_TIMEOUT, 20) ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) ldap.set_option(ldap.OPT_TIMELIMIT, 15) if self.TLS_KIND != 'PLAIN': ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, self.TLS_REQCERT) server = ldap.initialize(self.LDAP_SERVER) if self.ldap_version == 2: server.protocol = ldap.VERSION2 else: server.protocol = ldap.VERSION3 if self.TLS_KIND == 'START_TLS': server.start_tls_s() if self.LDAP_BIND_DN and self.LDAP_BIND_PASS: log.debug('Trying simple_bind with password and given DN: %s', self.LDAP_BIND_DN) server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS) filter_ = '(&%s(%s=%s))' % (self.LDAP_FILTER, ldap.filter.escape_filter_chars(self.attr_login), ldap.filter.escape_filter_chars(username)) log.debug("Authenticating %r filter %s at %s", self.BASE_DN, filter_, self.LDAP_SERVER) lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE, filter_) if not lobjects: raise ldap.NO_SUCH_OBJECT() for (dn, _attrs) in lobjects: if dn is None: continue try: log.debug('Trying simple bind with %s', dn) server.simple_bind_s(dn, password) results = server.search_ext_s(dn, ldap.SCOPE_BASE, '(objectClass=*)') if len(results) == 1: dn_, attrs = results[0] assert dn_ == dn return dn, attrs except ldap.INVALID_CREDENTIALS: log.debug("LDAP rejected password for user '%s': %s", username, dn) continue # accept authentication as another ldap user with same username log.debug("No matching LDAP objects for authentication " "of '%s'", username) raise LdapPasswordError() except ldap.NO_SUCH_OBJECT: log.debug("LDAP says no such user '%s'", username) raise LdapUsernameError() except ldap.SERVER_DOWN: # [0] might be {'info': "TLS error -8179:Peer's Certificate issuer is not recognized.", 'desc': "Can't contact LDAP server"} raise LdapConnectionError("LDAP can't connect to authentication server") class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin): def __init__(self): self._logger = logging.getLogger(__name__) self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"] self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"] self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"] @hybrid_property def name(self): return "ldap" def settings(self): settings = [ { "name": "host", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "Host of the LDAP Server", "formname": "LDAP Host" }, { "name": "port", "validator": self.validators.Number(strip=True), "type": "string", "description": "Port that the LDAP server is listening on. Defaults to 389 for PLAIN/START_TLS and 636 for LDAPS.", "default": "", "formname": "Custom LDAP Port" }, { "name": "dn_user", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "User to connect to LDAP", "formname": "Account" }, { "name": "dn_pass", "validator": self.validators.UnicodeString(strip=True), "type": "password", "description": "Password to connect to LDAP", "formname": "Password" }, { "name": "tls_kind", "validator": self.validators.OneOf(self._tls_kind_values), "type": "select", "values": self._tls_kind_values, "description": "TLS Type", "default": 'LDAPS', "formname": "Connection Security" }, { "name": "tls_reqcert", "validator": self.validators.OneOf(self._tls_reqcert_values), "type": "select", "values": self._tls_reqcert_values, "description": "Require Cert over TLS?", "formname": "Certificate Checks" }, { "name": "cacertdir", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "Optional: Custom CA certificate directory for validating LDAPS", "formname": "Custom CA Certificates" }, { "name": "base_dn", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "Base DN to search (e.g., dc=mydomain,dc=com)", "formname": "Base DN" }, { "name": "filter", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "Filter to narrow results (e.g., ou=Users, etc)", "formname": "LDAP Search Filter" }, { "name": "search_scope", "validator": self.validators.OneOf(self._search_scopes), "type": "select", "values": self._search_scopes, "description": "How deep to search LDAP", "formname": "LDAP Search Scope" }, { "name": "attr_login", "validator": self.validators.AttrLoginValidator(not_empty=True, strip=True), "type": "string", "description": "LDAP Attribute to map to user name", "formname": "Login Attribute" }, { "name": "attr_firstname", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "LDAP Attribute to map to first name", "formname": "First Name Attribute" }, { "name": "attr_lastname", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "LDAP Attribute to map to last name", "formname": "Last Name Attribute" }, { "name": "attr_email", "validator": self.validators.UnicodeString(strip=True), "type": "string", "description": "LDAP Attribute to map to email address", "formname": "Email Attribute" } ] return settings def use_fake_password(self): return True def auth(self, userobj, username, password, settings, **kwargs): """ Given a user object (which may be null), username, a plaintext password, and a settings object (containing all the keys needed as listed in settings()), authenticate this user's login attempt. Return None on failure. On success, return a dictionary of the form: see: KallitheaAuthPluginBase.auth_func_attrs This is later validated for correctness """ if not username or not password: log.debug('Empty username or password skipping...') return None kwargs = { 'server': settings.get('host', ''), 'base_dn': settings.get('base_dn', ''), 'port': settings.get('port'), 'bind_dn': settings.get('dn_user'), 'bind_pass': settings.get('dn_pass'), 'tls_kind': settings.get('tls_kind'), 'tls_reqcert': settings.get('tls_reqcert'), 'cacertdir': settings.get('cacertdir'), 'ldap_filter': settings.get('filter'), 'search_scope': settings.get('search_scope'), 'attr_login': settings.get('attr_login'), 'ldap_version': 3, } if kwargs['bind_dn'] and not kwargs['bind_pass']: log.debug('Using dynamic binding.') kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username) kwargs['bind_pass'] = password log.debug('Checking for ldap authentication') try: aldap = AuthLdap(**kwargs) (user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password) log.debug('Got ldap DN response %s', user_dn) def get_ldap_attr(k): return safe_str(ldap_attrs.get(settings.get(k), [b''])[0]) # old attrs fetched from Kallithea database admin = getattr(userobj, 'admin', False) email = getattr(userobj, 'email', '') firstname = getattr(userobj, 'firstname', '') lastname = getattr(userobj, 'lastname', '') user_data = { 'username': username, 'firstname': get_ldap_attr('attr_firstname') or firstname, 'lastname': get_ldap_attr('attr_lastname') or lastname, 'groups': [], 'email': get_ldap_attr('attr_email') or email, 'admin': admin, 'extern_name': user_dn, } log.info('user %s authenticated correctly', user_data['username']) return user_data except LdapUsernameError: log.info('Error authenticating %s with LDAP: User not found', username) except LdapPasswordError: log.info('Error authenticating %s with LDAP: Password error', username) except LdapImportError: log.error('Error authenticating %s with LDAP: LDAP not available', username) return None def get_managed_fields(self): return ['username', 'firstname', 'lastname', 'email', 'password']