Commit c143b1a5 authored by Daniel Klaffenbach's avatar Daniel Klaffenbach 🐍

tests: Use `ldaptor` LDAP server for testing with dummy data

parent 100c2d6d
# Common requirements for running the test suite
ldaptor==16.0.1
ldap3
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
from twisted.internet.protocol import ServerFactory
from twisted.python.components import registerAdapter
from ldaptor.inmemory import fromLDIFFile
from ldaptor.interfaces import IConnectedLDAPEntry
from ldaptor.protocols.ldap.ldapserver import LDAPServer
from cStringIO import StringIO
"""
This is a pure Python implementation of a very simple LDAP
server, based on "ldaptor" and the example from the "ldaptor"
documentation.
"""
# Dummy LDAP data. This needs to be valid LDIF, so the
# indentation and newlines are important here.
LDIF = """\
dn: dc=org
dc: org
objectClass: dcObject
dn: dc=example,dc=org
dc: example
objectClass: dcObject
objectClass: organization
dn: ou=Users,dc=example,dc=org
objectClass: organizationalUnit
ou: Users
dn: uid=test,ou=Users,dc=example,dc=org
uid: test
givenName: Test
mail: test@example.org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
sn: User
userPassword: t3st
dn: cn=Alice,ou=Users,dc=example,dc=org
uid: alice
givenName: Alice
mail: alice@example.org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
sn: Surname
userPassword: al1ce
"""
class Tree(object):
def __init__(self):
global LDIF
self.f = StringIO(LDIF)
d = fromLDIFFile(self.f)
d.addCallback(self.ldifRead)
def ldifRead(self, result):
self.f.close()
self.db = result
class LDAPServerFactory(ServerFactory):
protocol = LDAPServer
def __init__(self, root):
self.root = root
def buildProtocol(self, addr):
proto = self.protocol()
proto.debug = self.debug
proto.factory = self
return proto
def get_reactor():
"""
:returns: the twisted reactor and the port number for the LDAP server
:rtype: tuple
"""
from twisted.internet import reactor
# We initialize our tree
tree = Tree()
# When the LDAP Server protocol wants to manipulate the DIT, it invokes
# `root = interfaces.IConnectedLDAPEntry(self.factory)` to get the root
# of the DIT. The factory that creates the protocol must therefore
# be adapted to the IConnectedLDAPEntry interface.
registerAdapter(
lambda x: x.root,
LDAPServerFactory,
IConnectedLDAPEntry)
factory = LDAPServerFactory(tree.db)
factory.debug = False
listener = reactor.listenTCP(0, factory)
port = listener.getHost().port
return reactor, port
if __name__ == '__main__':
reactor, port = get_reactor()
print("Running on port %s" %port)
reactor.run()
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import threading
from django.core import management
from django.contrib.auth import get_user_model
from django.test import Client, TestCase
from .server import get_reactor
class LdapTestCase(TestCase):
def setUp(self):
self.USER_MODEL = get_user_model()
self.client = Client(REMOTE_USER='otto')
self.client = Client(REMOTE_USER='test')
def test_sync_attributes(self):
@classmethod
def setUpClass(cls):
"""
Testet, ob die Attribute des Nuters "otto" beim Anlegen
aus dem LDAP gelesen werden.
@attention: TUC-specific
"""
user = self.USER_MODEL.objects.create(username='otto')
self.assertEquals(user.first_name, 'Otto')
self.assertEquals(user.last_name, 'Normalverbraucher')
self.assertNotEqual(user.email, '')
Starts the Twisted-based "ldaptor" LDAP server in the
background with some dummy data.
"""
cls.reactor, cls.port = get_reactor()
threading.Thread(target=cls.reactor.run, args=(False,)).start()
@classmethod
def tearDownClass(cls):
"""
Stops the "ldaptor" LDAP server.
"""
cls.reactor.callFromThread(cls.reactor.stop)
user = self.USER_MODEL.objects.get(username='otto')
self.assertEquals(user.first_name, 'Otto')
self.assertEquals(user.last_name, 'Normalverbraucher')
self.assertNotEqual(user.email, '')
def settings(self, **kwargs):
"""
Adds the `LDAP_SYNC_URI` setting for running the tests.
"""
if 'LDAP_SYNC_URI' not in kwargs:
kwargs['LDAP_SYNC_URI'] = 'ldap://localhost:%s/ou=Users,dc=example,dc=org' %self.port
return super(LdapTestCase, self).settings(**kwargs)
def test_sync_attributes(self):
"""
Checks if the attributes of the user "test" are automatically
synced from LDAP when creating the user.
"""
with self.settings():
user = self.USER_MODEL.objects.create(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'test@example.org')
def test_autocreate(self):
response = self.client.get('/admin/')
self.assertEquals(response.status_code, 302)
user = self.USER_MODEL.objects.get(username='otto')
self.assertEquals(user.first_name, 'Otto')
self.assertEquals(user.last_name, 'Normalverbraucher')
self.assertNotEqual(user.email, '')
with self.settings():
response = self.client.get('/admin/')
self.assertEquals(response.status_code, 302)
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
def test_management_command(self):
self.USER_MODEL.objects.create(username='otto')
# Clear user attributes for this test
self.USER_MODEL.objects.all().update(first_name='', last_name='', email='')
user = self.USER_MODEL.objects.get(username='otto')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
with self.settings():
self.USER_MODEL.objects.create(username='test')
# Clear user attributes for this test
self.USER_MODEL.objects.all().update(first_name='', last_name='', email='')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
# Now run management command to test if sync is working
management.call_command('ldap_sync')
user = self.USER_MODEL.objects.get(username='otto')
self.assertEquals(user.first_name, 'Otto')
self.assertEquals(user.last_name, 'Normalverbraucher')
self.assertNotEqual(user.email, '')
\ No newline at end of file
# Now run management command to test if sync is working
management.call_command('ldap_sync')
user = self.USER_MODEL.objects.get(username='test')
self.assertEquals(user.first_name, 'Test')
self.assertEquals(user.last_name, 'User')
self.assertNotEqual(user.email, '')
def test_invalid_user(self):
with self.settings():
self.USER_MODEL.objects.create(username='unknown')
user = self.USER_MODEL.objects.get(username='unknown')
self.assertEquals(user.first_name, '')
self.assertEquals(user.last_name, '')
self.assertEquals(user.email, '')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment