Pylint updates
This commit is contained in:
@@ -1,16 +1,18 @@
|
||||
import platform
|
||||
import re
|
||||
import sys
|
||||
import platform
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db.models import Q
|
||||
from django.contrib.auth.models import User
|
||||
from horilla_ldap.models import LDAPSettings
|
||||
|
||||
from employee.models import Employee
|
||||
from horilla_ldap.models import LDAPSettings
|
||||
|
||||
if platform.system() == "Linux":
|
||||
import ldap # Use python-ldap for Linux
|
||||
else:
|
||||
from ldap3 import Server, Connection, ALL # Use ldap3 for Windows
|
||||
from ldap3 import ALL, Connection, Server # Use ldap3 for Windows
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
@@ -33,7 +35,11 @@ class Command(BaseCommand):
|
||||
base_dn = settings.base_dn
|
||||
|
||||
if not all([ldap_server, bind_dn, bind_password, base_dn]):
|
||||
self.stdout.write(self.style.ERROR("LDAP settings are incomplete. Please check your configuration."))
|
||||
self.stdout.write(
|
||||
self.style.ERROR(
|
||||
"LDAP settings are incomplete. Please check your configuration."
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -42,7 +48,9 @@ class Command(BaseCommand):
|
||||
connection = ldap.initialize(ldap_server)
|
||||
connection.simple_bind_s(bind_dn, bind_password)
|
||||
search_filter = "(objectClass=inetOrgPerson)"
|
||||
results = connection.search_s(base_dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
results = connection.search_s(
|
||||
base_dn, ldap.SCOPE_SUBTREE, search_filter
|
||||
)
|
||||
|
||||
for dn, entry in results:
|
||||
user_id = entry.get("uid", [b""])[0].decode("utf-8")
|
||||
@@ -53,13 +61,17 @@ class Command(BaseCommand):
|
||||
phone = entry.get("telephoneNumber", [b""])[0].decode("utf-8")
|
||||
|
||||
# Get the password from LDAP
|
||||
ldap_password = entry.get("telephoneNumber", [b""])[0].decode("utf-8")
|
||||
ldap_password = entry.get("telephoneNumber", [b""])[0].decode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
# Remove non-numeric characters but keep numbers
|
||||
clean_phone = re.sub(r"[^\d]", "", phone)
|
||||
clean_phone = re.sub(r"[^\d]", "", phone)
|
||||
ldap_password = clean_phone
|
||||
|
||||
self.create_or_update_employee(user_id, email, first_name, last_name, phone, ldap_password)
|
||||
self.create_or_update_employee(
|
||||
user_id, email, first_name, last_name, phone, ldap_password
|
||||
)
|
||||
|
||||
connection.unbind_s()
|
||||
|
||||
@@ -68,11 +80,27 @@ class Command(BaseCommand):
|
||||
server = Server(ldap_server, get_info=ALL)
|
||||
connection = Connection(server, user=bind_dn, password=bind_password)
|
||||
if not connection.bind():
|
||||
self.stdout.write(self.style.ERROR(f"Failed to bind to LDAP server: {connection.last_error}"))
|
||||
self.stdout.write(
|
||||
self.style.ERROR(
|
||||
f"Failed to bind to LDAP server: {connection.last_error}"
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
search_filter = "(objectClass=inetOrgPerson)"
|
||||
connection.search(base_dn, search_filter, attributes=['uid', 'mail', 'givenName', 'sn', 'cn', 'telephoneNumber', 'userPassword'])
|
||||
connection.search(
|
||||
base_dn,
|
||||
search_filter,
|
||||
attributes=[
|
||||
"uid",
|
||||
"mail",
|
||||
"givenName",
|
||||
"sn",
|
||||
"cn",
|
||||
"telephoneNumber",
|
||||
"userPassword",
|
||||
],
|
||||
)
|
||||
|
||||
for entry in connection.entries:
|
||||
user_id = entry.uid.value if entry.uid else ""
|
||||
@@ -86,32 +114,44 @@ class Command(BaseCommand):
|
||||
clean_phone = re.sub(r"[^\d]", "", phone)
|
||||
ldap_password = clean_phone
|
||||
|
||||
self.create_or_update_employee(user_id, email, first_name, last_name, phone, ldap_password)
|
||||
self.create_or_update_employee(
|
||||
user_id, email, first_name, last_name, phone, ldap_password
|
||||
)
|
||||
|
||||
connection.unbind()
|
||||
|
||||
except Exception as e:
|
||||
self.stderr.write(self.style.ERROR(f"Error: {e}"))
|
||||
|
||||
def create_or_update_employee(self, user_id, email, first_name, last_name, phone, ldap_password):
|
||||
def create_or_update_employee(
|
||||
self, user_id, email, first_name, last_name, phone, ldap_password
|
||||
):
|
||||
employee, created = Employee.objects.update_or_create(
|
||||
email=email,
|
||||
defaults={
|
||||
"employee_first_name": first_name or "",
|
||||
"employee_last_name": last_name or "",
|
||||
"phone": phone or "",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
user = User.objects.get(Q(username=email) | Q(username=user_id) | Q(email=email))
|
||||
user = User.objects.get(
|
||||
Q(username=email) | Q(username=user_id) | Q(email=email)
|
||||
)
|
||||
user.username = user_id
|
||||
user.set_password(ldap_password) # Hash and store password securely
|
||||
user.save()
|
||||
action = "Updated"
|
||||
except User.DoesNotExist:
|
||||
self.stdout.write(self.style.WARNING(f"User for employee {first_name} {last_name} does not exist."))
|
||||
self.stdout.write(
|
||||
self.style.WARNING(
|
||||
f"User for employee {first_name} {last_name} does not exist."
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
action = "Created" if created else "Updated"
|
||||
self.stdout.write(self.style.SUCCESS(f"{action} employee {first_name} {last_name}."))
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(f"{action} employee {first_name} {last_name}.")
|
||||
)
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import hashlib
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES
|
||||
from horilla_ldap.models import LDAPSettings
|
||||
from ldap3 import ALL, ALL_ATTRIBUTES, Connection, Server
|
||||
|
||||
from employee.models import Employee
|
||||
from horilla_ldap.models import LDAPSettings
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Import users from Django to LDAP using LDAP settings from the database'
|
||||
help = "Import users from Django to LDAP using LDAP settings from the database"
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
# Get LDAP settings from the database
|
||||
@@ -23,7 +25,11 @@ class Command(BaseCommand):
|
||||
base_dn = settings.base_dn
|
||||
|
||||
if not all([ldap_server, bind_dn, bind_password, base_dn]):
|
||||
self.stdout.write(self.style.ERROR("LDAP settings are incomplete. Please check your configuration."))
|
||||
self.stdout.write(
|
||||
self.style.ERROR(
|
||||
"LDAP settings are incomplete. Please check your configuration."
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# Connect to the LDAP server
|
||||
@@ -37,40 +43,65 @@ class Command(BaseCommand):
|
||||
|
||||
for user in users:
|
||||
if not user.employee_user_id:
|
||||
self.stdout.write(self.style.WARNING(f"Skipping user {user} due to missing employee_user_id"))
|
||||
self.stdout.write(
|
||||
self.style.WARNING(
|
||||
f"Skipping user {user} due to missing employee_user_id"
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
dn = f"uid={user.employee_user_id.username},{base_dn}"
|
||||
|
||||
# Securely hash the password using SHA
|
||||
hashed_password = "{SHA}" + base64.b64encode(hashlib.sha1(user.phone.encode()).digest()).decode()
|
||||
hashed_password = (
|
||||
"{SHA}"
|
||||
+ base64.b64encode(
|
||||
hashlib.sha1(user.phone.encode()).digest()
|
||||
).decode()
|
||||
)
|
||||
|
||||
if user.employee_last_name is None:
|
||||
user.employee_last_name = " "
|
||||
|
||||
attributes = {
|
||||
'objectClass': ['inetOrgPerson'],
|
||||
'givenName': user.employee_first_name or "",
|
||||
'sn': user.employee_last_name or "",
|
||||
'cn': f"{user.employee_first_name} {user.employee_last_name}",
|
||||
'uid': user.email or "",
|
||||
'mail': user.email or "",
|
||||
"objectClass": ["inetOrgPerson"],
|
||||
"givenName": user.employee_first_name or "",
|
||||
"sn": user.employee_last_name or "",
|
||||
"cn": f"{user.employee_first_name} {user.employee_last_name}",
|
||||
"uid": user.email or "",
|
||||
"mail": user.email or "",
|
||||
"telephoneNumber": user.phone or "",
|
||||
'userPassword': hashed_password, # Securely store password
|
||||
"userPassword": hashed_password, # Securely store password
|
||||
}
|
||||
|
||||
# Check if the user already exists in LDAP
|
||||
conn.search(base_dn, f'(uid={user.employee_user_id.username})', attributes=ALL_ATTRIBUTES)
|
||||
conn.search(
|
||||
base_dn,
|
||||
f"(uid={user.employee_user_id.username})",
|
||||
attributes=ALL_ATTRIBUTES,
|
||||
)
|
||||
|
||||
if conn.entries:
|
||||
self.stdout.write(self.style.WARNING(f'{user.employee_first_name} {user.employee_last_name} already exists in LDAP. Skipping...'))
|
||||
self.stdout.write(
|
||||
self.style.WARNING(
|
||||
f"{user.employee_first_name} {user.employee_last_name} already exists in LDAP. Skipping..."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Add user to LDAP
|
||||
if not conn.add(dn, attributes=attributes):
|
||||
self.stdout.write(self.style.ERROR(f'Failed to add {user.employee_first_name} {user.employee_last_name}: {conn.result}'))
|
||||
self.stdout.write(
|
||||
self.style.ERROR(
|
||||
f"Failed to add {user.employee_first_name} {user.employee_last_name}: {conn.result}"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.stdout.write(self.style.SUCCESS(f'Successfully added {user.employee_first_name} {user.employee_last_name} to LDAP.'))
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
f"Successfully added {user.employee_first_name} {user.employee_last_name} to LDAP."
|
||||
)
|
||||
)
|
||||
|
||||
conn.unbind()
|
||||
except Exception as e:
|
||||
self.stdout.write(self.style.ERROR(f'An error occurred: {e}'))
|
||||
self.stdout.write(self.style.ERROR(f"An error occurred: {e}"))
|
||||
|
||||
Reference in New Issue
Block a user