Created
October 27, 2020 20:26
-
-
Save frennkie/2d66bd8383efa102a27e16c9a09b7701 to your computer and use it in GitHub Desktop.
Revisions
-
frennkie renamed this gist
Oct 27, 2020 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
frennkie created this gist
Oct 27, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,152 @@ import logging import ssl import uuid from urllib.parse import urlparse from cryptography import x509 from cryptography.hazmat.backends import default_backend from django.db import models from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from ldap3 import Connection, Tls, Server from ldap3.core.exceptions import LDAPBindError logger = logging.getLogger(__name__) class LdapCertBackend(models.Model): id = models.UUIDField( primary_key=True, editable=False, default=uuid.uuid4, help_text='Unique ID for backend', verbose_name=_('id') ) name = models.CharField( help_text=_('The name of this backend'), max_length=140, unique=True, verbose_name=_('name') ) uri = models.CharField( help_text='URI (e.g. ldap://server1.example.com:389)', max_length=2100, verbose_name=_('uri') ) ca_cert = models.ForeignKey( "CaCertificate", blank=True, null=True, on_delete=models.SET_NULL, ) use_starttls = models.BooleanField( default=False, help_text='Use STARTTLS after connection start', verbose_name=_('use_starttls') ) username = models.CharField( blank=True, default="", help_text='Bind DN (leave empty for anonymous).', max_length=100, verbose_name=_('username') ) password = models.CharField( blank=True, default="", help_text='Bind Password (hidden/masked).', max_length=100, verbose_name=_('password') ) is_ad = models.BooleanField( default=False, help_text='Use NTLM auth for Microsoft Active Directory', verbose_name=_('is_ad') ) base = models.CharField( blank=True, default="", help_text='Search Base', max_length=100, verbose_name=_('search base') ) field = models.CharField( default='userCertificate;binary', help_text=_('LDAP field of certificate (default: userCertificate;binary)'), max_length=1000, verbose_name=_('field') ) class Meta: verbose_name = _('LDAP Backend') verbose_name_plural = _('LDAP Backends') def __str__(self): if self.uri: return '%s (%s)' % (self.name, self.uri) return '%s (ldap://...)' % self.name @cached_property def parsed_uri(self): return urlparse(self.uri) @property def ldap_hostname(self): return self.parsed_uri.hostname @property def ldap_port(self): return self.parsed_uri.hostname @property def ldap_scheme(self): return self.parsed_uri.scheme def get_conn(self): """This will raise exceptions""" tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1) server = Server(self.uri, tls=tls_configuration) conn = Connection(server, self.username, self.password, auto_bind=False) if self.use_starttls: conn.start_tls() if not conn.bind(): logger.error(f'error on ldap bind: {conn.result}') raise LDAPBindError(conn.result.get('message')) return conn def fetch(self, value=None, conn=None): if conn is None: conn = self.get_conn() if value is None: print('not specified what to fetch.. check DB and fetch all for backend') values = [] else: values = [value] x509_certs = [] for val in values: if conn.search(self.base, f'(mail={val})', attributes=[self.field]): for entry in conn.entries: binary_cert = entry[self.field].values[0] x509_cert = x509.load_der_x509_certificate(binary_cert, backend=default_backend()) x509_certs.append(x509_cert) conn.unbind() return x509_certs