Skip to content

Instantly share code, notes, and snippets.

@frennkie
Created October 27, 2020 20:26
Show Gist options
  • Select an option

  • Save frennkie/2d66bd8383efa102a27e16c9a09b7701 to your computer and use it in GitHub Desktop.

Select an option

Save frennkie/2d66bd8383efa102a27e16c9a09b7701 to your computer and use it in GitHub Desktop.

Revisions

  1. frennkie renamed this gist Oct 27, 2020. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. frennkie created this gist Oct 27, 2020.
    152 changes: 152 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,152 @@
    import logging
    import ssl
    import uuid
    from urllib.parse import urlparse

    from cryptography import x509
    from cryptography.hazmat.backends import default_backend

    from django.db import models
    from django.utils.functional import cached_property
    from django.utils.translation import gettext_lazy as _

    from ldap3 import Connection, Tls, Server
    from ldap3.core.exceptions import LDAPBindError


    logger = logging.getLogger(__name__)


    class LdapCertBackend(models.Model):
    id = models.UUIDField(
    primary_key=True,
    editable=False,
    default=uuid.uuid4,
    help_text='Unique ID for backend',
    verbose_name=_('id')
    )

    name = models.CharField(
    help_text=_('The name of this backend'),
    max_length=140,
    unique=True,
    verbose_name=_('name')
    )

    uri = models.CharField(
    help_text='URI (e.g. ldap://server1.example.com:389)',
    max_length=2100,
    verbose_name=_('uri')
    )

    ca_cert = models.ForeignKey(
    "CaCertificate",
    blank=True,
    null=True,
    on_delete=models.SET_NULL,
    )

    use_starttls = models.BooleanField(
    default=False,
    help_text='Use STARTTLS after connection start',
    verbose_name=_('use_starttls')
    )

    username = models.CharField(
    blank=True,
    default="",
    help_text='Bind DN (leave empty for anonymous).',
    max_length=100,
    verbose_name=_('username')
    )

    password = models.CharField(
    blank=True,
    default="",
    help_text='Bind Password (hidden/masked).',
    max_length=100,
    verbose_name=_('password')
    )

    is_ad = models.BooleanField(
    default=False,
    help_text='Use NTLM auth for Microsoft Active Directory',
    verbose_name=_('is_ad')
    )

    base = models.CharField(
    blank=True,
    default="",
    help_text='Search Base',
    max_length=100,
    verbose_name=_('search base')
    )

    field = models.CharField(
    default='userCertificate;binary',
    help_text=_('LDAP field of certificate (default: userCertificate;binary)'),
    max_length=1000,
    verbose_name=_('field')
    )

    class Meta:
    verbose_name = _('LDAP Backend')
    verbose_name_plural = _('LDAP Backends')

    def __str__(self):
    if self.uri:
    return '%s (%s)' % (self.name, self.uri)
    return '%s (ldap://...)' % self.name

    @cached_property
    def parsed_uri(self):
    return urlparse(self.uri)

    @property
    def ldap_hostname(self):
    return self.parsed_uri.hostname

    @property
    def ldap_port(self):
    return self.parsed_uri.hostname

    @property
    def ldap_scheme(self):
    return self.parsed_uri.scheme

    def get_conn(self):
    """This will raise exceptions"""

    tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
    server = Server(self.uri, tls=tls_configuration)
    conn = Connection(server, self.username, self.password, auto_bind=False)

    if self.use_starttls:
    conn.start_tls()

    if not conn.bind():
    logger.error(f'error on ldap bind: {conn.result}')
    raise LDAPBindError(conn.result.get('message'))

    return conn

    def fetch(self, value=None, conn=None):
    if conn is None:
    conn = self.get_conn()

    if value is None:
    print('not specified what to fetch.. check DB and fetch all for backend')
    values = []
    else:
    values = [value]

    x509_certs = []
    for val in values:
    if conn.search(self.base, f'(mail={val})', attributes=[self.field]):
    for entry in conn.entries:
    binary_cert = entry[self.field].values[0]
    x509_cert = x509.load_der_x509_certificate(binary_cert, backend=default_backend())
    x509_certs.append(x509_cert)

    conn.unbind()
    return x509_certs