[ADD] HORILLA LDAP: New app for handling LDAP authentication in Horilla

This commit is contained in:
Horilla
2025-03-05 14:57:05 +05:30
parent 4004fe730e
commit 34e7c1bb63
14 changed files with 367 additions and 10 deletions

View File

@@ -0,0 +1,117 @@
import re
import sys
import platform
from django.core.management.base import BaseCommand
from django.db.models import Q
from django.contrib.auth.models import User
from horilla_ldap.models import LDAPSettings
from employee.models import Employee
if platform.system() == "Linux":
import ldap # Use python-ldap for Linux
else:
from ldap3 import Server, Connection, ALL # Use ldap3 for Windows
class Command(BaseCommand):
help = "Imports employees from LDAP into the Django database using LDAP settings from the database"
def handle(self, *args, **kwargs):
# Detect OS
os_name = platform.system()
# self.stdout.write(self.style.NOTICE(f"Running on {os_name}"))
# Fetch LDAP settings from the database
settings = LDAPSettings.objects.first()
if not settings:
self.stdout.write(self.style.ERROR("LDAP settings are not configured."))
return
ldap_server = settings.ldap_server
bind_dn = settings.bind_dn
bind_password = settings.bind_password
base_dn = settings.base_dn
if not all([ldap_server, bind_dn, bind_password, base_dn]):
self.stdout.write(self.style.ERROR("LDAP settings are incomplete. Please check your configuration."))
return
try:
if os_name == "Linux":
# LDAP connection for Linux (python-ldap)
connection = ldap.initialize(ldap_server)
connection.simple_bind_s(bind_dn, bind_password)
search_filter = "(objectClass=inetOrgPerson)"
results = connection.search_s(base_dn, ldap.SCOPE_SUBTREE, search_filter)
for dn, entry in results:
user_id = entry.get("uid", [b""])[0].decode("utf-8")
email = entry.get("mail", [b""])[0].decode("utf-8")
first_name = entry.get("givenName", [b""])[0].decode("utf-8")
last_name = entry.get("sn", [b""])[0].decode("utf-8")
name = entry.get("cn", [b""])[0].decode("utf-8")
phone = entry.get("telephoneNumber", [b""])[0].decode("utf-8")
# Get the password from LDAP
ldap_password = entry.get("telephoneNumber", [b""])[0].decode("utf-8")
# Remove non-numeric characters but keep numbers
clean_phone = re.sub(r"[^\d]", "", phone)
ldap_password = clean_phone
self.create_or_update_employee(user_id, email, first_name, last_name, phone, ldap_password)
connection.unbind_s()
else:
# LDAP connection for Windows (ldap3)
server = Server(ldap_server, get_info=ALL)
connection = Connection(server, user=bind_dn, password=bind_password)
if not connection.bind():
self.stdout.write(self.style.ERROR(f"Failed to bind to LDAP server: {connection.last_error}"))
return
search_filter = "(objectClass=inetOrgPerson)"
connection.search(base_dn, search_filter, attributes=['uid', 'mail', 'givenName', 'sn', 'cn', 'telephoneNumber', 'userPassword'])
for entry in connection.entries:
user_id = entry.uid.value if entry.uid else ""
email = entry.mail.value if entry.mail else ""
first_name = entry.givenName.value if entry.givenName else ""
last_name = entry.sn.value if entry.sn else ""
name = entry.cn.value if entry.cn else ""
phone = entry.telephoneNumber.value if entry.telephoneNumber else ""
# Get the password from LDAP
clean_phone = re.sub(r"[^\d]", "", phone)
ldap_password = clean_phone
self.create_or_update_employee(user_id, email, first_name, last_name, phone, ldap_password)
connection.unbind()
except Exception as e:
self.stderr.write(self.style.ERROR(f"Error: {e}"))
def create_or_update_employee(self, user_id, email, first_name, last_name, phone, ldap_password):
employee, created = Employee.objects.update_or_create(
email=email,
defaults={
"employee_first_name": first_name or "",
"employee_last_name": last_name or "",
"phone": phone or "",
}
)
try:
user = User.objects.get(Q(username=email) | Q(username=user_id) | Q(email=email))
user.username = user_id
user.set_password(ldap_password) # Hash and store password securely
user.save()
action = "Updated"
except User.DoesNotExist:
self.stdout.write(self.style.WARNING(f"User for employee {first_name} {last_name} does not exist."))
return
action = "Created" if created else "Updated"
self.stdout.write(self.style.SUCCESS(f"{action} employee {first_name} {last_name}."))

View File

@@ -0,0 +1,76 @@
import hashlib
import base64
from django.core.management.base import BaseCommand
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES
from horilla_ldap.models import LDAPSettings
from employee.models import Employee
class Command(BaseCommand):
help = 'Import users from Django to LDAP using LDAP settings from the database'
def handle(self, *args, **kwargs):
# Get LDAP settings from the database
settings = LDAPSettings.objects.first()
if not settings:
self.stdout.write(self.style.ERROR("LDAP settings are not configured."))
return
# Fetch LDAP server details from settings
ldap_server = settings.ldap_server
bind_dn = settings.bind_dn
bind_password = settings.bind_password
base_dn = settings.base_dn
if not all([ldap_server, bind_dn, bind_password, base_dn]):
self.stdout.write(self.style.ERROR("LDAP settings are incomplete. Please check your configuration."))
return
# Connect to the LDAP server
server = Server(ldap_server, get_info=ALL)
try:
conn = Connection(server, bind_dn, bind_password, auto_bind=True)
# Fetch all users from Django
users = Employee.objects.all()
for user in users:
if not user.employee_user_id:
self.stdout.write(self.style.WARNING(f"Skipping user {user} due to missing employee_user_id"))
continue
dn = f"uid={user.employee_user_id.username},{base_dn}"
# Securely hash the password using SHA
hashed_password = "{SHA}" + base64.b64encode(hashlib.sha1(user.phone.encode()).digest()).decode()
if user.employee_last_name is None:
user.employee_last_name = " "
attributes = {
'objectClass': ['inetOrgPerson'],
'givenName': user.employee_first_name or "",
'sn': user.employee_last_name or "",
'cn': f"{user.employee_first_name} {user.employee_last_name}",
'uid': user.email or "",
'mail': user.email or "",
"telephoneNumber": user.phone or "",
'userPassword': hashed_password, # Securely store password
}
# Check if the user already exists in LDAP
conn.search(base_dn, f'(uid={user.employee_user_id.username})', attributes=ALL_ATTRIBUTES)
if conn.entries:
self.stdout.write(self.style.WARNING(f'{user.employee_first_name} {user.employee_last_name} already exists in LDAP. Skipping...'))
else:
# Add user to LDAP
if not conn.add(dn, attributes=attributes):
self.stdout.write(self.style.ERROR(f'Failed to add {user.employee_first_name} {user.employee_last_name}: {conn.result}'))
else:
self.stdout.write(self.style.SUCCESS(f'Successfully added {user.employee_first_name} {user.employee_last_name} to LDAP.'))
conn.unbind()
except Exception as e:
self.stdout.write(self.style.ERROR(f'An error occurred: {e}'))