Commit ed46a9f2 authored by Daniel Klaffenbach's avatar Daniel Klaffenbach 🐍

tests: Use ldap3 mock tests instead of ldaptor

The twisted-based ldaptor prevents the test suite from running on Python 3
so we'll use the mock functions provided by ldap3 instead.
parent 3f18d0ae
......@@ -2,7 +2,7 @@
from __future__ import unicode_literals
import ssl
import ldap3
from ldap3 import Server, Connection, DSA, SYNC
from ldap3 import Server, Connection, SYNC, OFFLINE_SLAPD_2_4, MOCK_SYNC
from ldap3.core.exceptions import LDAPException
from ldap3.core.tls import Tls
from ldap3.utils.uri import parse_uri
......@@ -31,6 +31,10 @@ class Ldap(object):
self.LDAP_CA_CERT = getattr(settings, 'LDAP_CA_CERT', None)
self.LDAP_TIMEOUT = getattr(settings, 'LDAP_TIMEOUT', DEFAULT_LDAP_TIMEOUT)
# For testing with fake LDAP attributes we can use the following settings
self._LDAP_TESTING_FAKE_DATA = getattr(settings, 'LDAP_TESTING_FAKE_DATA', None)
self._LDAP_TESTING_FAKE_SCHEMA = getattr(settings, 'LDAP_TESTING_FAKE_SCHEMA', OFFLINE_SLAPD_2_4)
# Get the `max_length` of synced attributes from the installed User model.
# This is used in `get_attributes` to cut off values which are too long for
# the database schema.
......@@ -42,32 +46,34 @@ class Ldap(object):
@cached_property
def connection(self):
server_kwargs = {
'host': self.LDAP_PARAMS['host'],
'use_ssl': False,
'port': self.LDAP_PARAMS['port'],
'connect_timeout': self.LDAP_TIMEOUT,
}
if self.LDAP_PARAMS['ssl']:
server_kwargs['use_ssl'] = True
server_kwargs['tls'] = Tls(ca_certs_file=self.LDAP_CA_CERT, validate=ssl.CERT_REQUIRED)
# In case the LDAP server is the local host we assume a testing server.
# "SCHEMA" queries might not be available, so use "DSA" instead.
if self.LDAP_PARAMS['host'] == 'localhost':
server_kwargs['get_info'] = DSA
# This can be used for testing in order to load random test data
if self._LDAP_TESTING_FAKE_DATA:
fake_server = Server('fake_testing_server', get_info=self._LDAP_TESTING_FAKE_SCHEMA)
fake_conn = Connection(fake_server, user='', password='', client_strategy=MOCK_SYNC)
fake_conn.strategy.entries_from_json(self._LDAP_TESTING_FAKE_DATA)
fake_conn.bind()
return fake_conn
else:
server_kwargs = {
'host': self.LDAP_PARAMS['host'],
'use_ssl': False,
'port': self.LDAP_PARAMS['port'],
'connect_timeout': self.LDAP_TIMEOUT,
}
if self.LDAP_PARAMS['ssl']:
server_kwargs['use_ssl'] = True
server_kwargs['tls'] = Tls(ca_certs_file=self.LDAP_CA_CERT, validate=ssl.CERT_REQUIRED)
s=Server(**server_kwargs)
connection_kwargs = {
'server': s,
'auto_bind': True,
'user': self.LDAP_SYNC_BASE_USER,
'password': self.LDAP_SYNC_BASE_PASS,
'client_strategy': SYNC,
}
s=Server(**server_kwargs)
connection_kwargs = {
'server': s,
'auto_bind': True,
'user': self.LDAP_SYNC_BASE_USER,
'password': self.LDAP_SYNC_BASE_PASS,
'client_strategy': SYNC,
}
return Connection(**connection_kwargs)
return Connection(**connection_kwargs)
def get_attributes(self, username):
"""
......
# Common requirements for running the test suite
ldaptor==16.0.1
ldap3
coverage==4.3.4
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
from twisted.internet.protocol import ServerFactory
from twisted.python.components import registerAdapter
from ldaptor.inmemory import fromLDIFFile
from ldaptor.interfaces import IConnectedLDAPEntry
from ldaptor.protocols.ldap.ldapserver import LDAPServer
from cStringIO import StringIO
"""
This is a pure Python implementation of a very simple LDAP
server, based on "ldaptor" and the example from the "ldaptor"
documentation.
"""
# Dummy LDAP data. This needs to be valid LDIF, so the
# indentation and newlines are important here.
LDIF = """\
dn: dc=org
dc: org
objectClass: dcObject
dn: dc=example,dc=org
dc: example
objectClass: dcObject
objectClass: organization
dn: ou=Users,dc=example,dc=org
objectClass: organizationalUnit
ou: Users
dn: uid=test,ou=Users,dc=example,dc=org
uid: test
givenName: Test
mail: test@example.org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
sn: User
userPassword: t3st
dn: cn=Alice,ou=Users,dc=example,dc=org
uid: alice
givenName: Alice
mail: alice@example.org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
sn: Surname
userPassword: al1ce
dn: cn=Long Name,ou=Users,dc=example,dc=org
uid: long
givenName: Long Name
mail: longname@example.org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
sn: This User Has A Very Long Surname Which Exceeds The Maximum Allowed Length
userPassword: l0ng
"""
class Tree(object):
def __init__(self):
global LDIF
self.f = StringIO(LDIF)
d = fromLDIFFile(self.f)
d.addCallback(self.ldifRead)
def ldifRead(self, result):
self.f.close()
self.db = result
class LDAPServerFactory(ServerFactory):
protocol = LDAPServer
def __init__(self, root):
self.root = root
def buildProtocol(self, addr):
proto = self.protocol()
proto.debug = self.debug
proto.factory = self
return proto
def get_reactor():
"""
:returns: the twisted reactor and the port number for the LDAP server
:rtype: tuple
"""
from twisted.internet import reactor
# We initialize our tree
tree = Tree()
# When the LDAP Server protocol wants to manipulate the DIT, it invokes
# `root = interfaces.IConnectedLDAPEntry(self.factory)` to get the root
# of the DIT. The factory that creates the protocol must therefore
# be adapted to the IConnectedLDAPEntry interface.
registerAdapter(
lambda x: x.root,
LDAPServerFactory,
IConnectedLDAPEntry)
factory = LDAPServerFactory(tree.db)
factory.debug = False
listener = reactor.listenTCP(0, factory)
port = listener.getHost().port
return reactor, port
if __name__ == '__main__':
reactor, port = get_reactor()
print("Running on port %s" %port)
reactor.run()
{
"entries": [
{
"attributes": {
"dc": [
"example"
],
"objectClass": [
"dcObject",
"organization"
]
},
"dn": "dc=example,dc=org",
"raw": {
"dc": [
"example"
],
"objectClass": [
"dcObject",
"organization"
]
}
},
{
"attributes": {
"objectClass": [
"organizationalUnit"
],
"ou": [
"Users"
]
},
"dn": "ou=Users,dc=example,dc=org",
"raw": {
"objectClass": [
"organizationalUnit"
],
"ou": [
"Users"
]
}
},
{
"attributes": {
"givenName": [
"Alice"
],
"mail": [
"alice@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"Surname"
],
"uid": [
"alice"
],
"userPassword": [
"al1ce"
]
},
"dn": "cn=Alice,ou=Users,dc=example,dc=org",
"raw": {
"givenName": [
"Alice"
],
"mail": [
"alice@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"Surname"
],
"uid": [
"alice"
],
"userPassword": [
"al1ce"
]
}
},
{
"attributes": {
"givenName": [
"Test"
],
"mail": [
"test@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"User"
],
"uid": [
"test"
],
"userPassword": [
"t3st"
]
},
"dn": "uid=test,ou=Users,dc=example,dc=org",
"raw": {
"givenName": [
"Test"
],
"mail": [
"test@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"User"
],
"uid": [
"test"
],
"userPassword": [
"t3st"
]
}
},
{
"attributes": {
"givenName": [
"Long Name"
],
"mail": [
"longname@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"This User Has A Very Long Surname Which Exceeds The Maximum Allowed Length"
],
"uid": [
"long"
],
"userPassword": [
"l0ng"
]
},
"dn": "cn=Long Name,ou=Users,dc=example,dc=org",
"raw": {
"givenName": [
"Long Name"
],
"mail": [
"longname@example.org"
],
"objectClass": [
"inetOrgPerson",
"person",
"top"
],
"sn": [
"This User Has A Very Long Surname Which Exceeds The Maximum Allowed Length"
],
"uid": [
"long"
],
"userPassword": [
"l0ng"
]
}
}
]
}
\ No newline at end of file
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import StringIO
import threading
from django.core import management
from django.contrib.auth import get_user_model
from django.test import Client, TestCase
from .server import get_reactor
from django.test.utils import override_settings
_CURRENT_DIRECTORY = os.path.dirname(os.path.realpath(__file__))
_LDAP_FAKE_DATA_LOCATION = os.path.join(_CURRENT_DIRECTORY, 'test_ldap_data.json')
@override_settings(
LDAP_SYNC_URI='ldap://localhost/ou=Users,dc=example,dc=org',
LDAP_TESTING_FAKE_DATA=_LDAP_FAKE_DATA_LOCATION
)
class LdapTestCase(TestCase):
def setUp(self):
self.USER_MODEL = get_user_model()
self.client = Client(REMOTE_USER='test')
@classmethod
def setUpClass(cls):
"""
Starts the Twisted-based "ldaptor" LDAP server in the
background with some dummy data.
"""
cls.reactor, cls.port = get_reactor()
threading.Thread(target=cls.reactor.run, args=(False,)).start()
@classmethod
def tearDownClass(cls):
"""
Stops the "ldaptor" LDAP server.
"""
cls.reactor.callFromThread(cls.reactor.stop)
def settings(self, **kwargs):
"""
Adds the `LDAP_SYNC_URI` setting for running the tests.
"""
if 'LDAP_SYNC_URI' not in kwargs:
kwargs['LDAP_SYNC_URI'] = 'ldap://localhost:%s/ou=Users,dc=example,dc=org' %self.port
return super(LdapTestCase, self).settings(**kwargs)
def test_sync_attributes(self):
"""
Checks if the attributes of the user "test" are automatically
synced from LDAP when creating the user.
"""
with self.settings():
user = self.USER_MODEL.objects.create(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
user = self.USER_MODEL.objects.create(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
def test_autocreate(self):
with self.settings():
response = self.client.get('/admin/')
self.assertEquals(response.status_code, 302)
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
response = self.client.get('/admin/')
self.assertEquals(response.status_code, 302)
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
def test_management_command(self):
with self.settings():
self.USER_MODEL.objects.create(username='test')
# Clear user attributes for this test
self.USER_MODEL.objects.all().update(first_name='', last_name='', email='')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
self.USER_MODEL.objects.create(username='test')
# Now run management command to test if sync is working
management.call_command('ldap_sync')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
# Clear user attributes for this test
self.USER_MODEL.objects.all().update(first_name='', last_name='', email='')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
# Now run management command to test if sync is working
management.call_command('ldap_sync')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
def test_management_command_is_active(self):
"""
......@@ -151,12 +131,11 @@ class LdapTestCase(TestCase):
self.assertIn("Ignoring alice", output_lines)
def test_invalid_user(self):
with self.settings():
self.USER_MODEL.objects.create(username='unknown')
user = self.USER_MODEL.objects.get(username='unknown')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
self.USER_MODEL.objects.create(username='unknown')
user = self.USER_MODEL.objects.get(username='unknown')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
def test_attribute_max_lenth(self):
"""
......@@ -167,7 +146,6 @@ class LdapTestCase(TestCase):
if last_name_field.max_length >= 74:
self.skipTest("The `max_length` of the `last_name` field is too large for this test.")
with self.settings():
user = self.USER_MODEL.objects.create(username='long')
self.assertEquals(user.first_name, 'Long Name')
self.assertEquals(len(user.last_name), last_name_field.max_length)
user = self.USER_MODEL.objects.create(username='long')
self.assertEquals(user.first_name, 'Long Name')
self.assertEquals(len(user.last_name), last_name_field.max_length)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment