[RMV] EMPLOYEE: Removd old management commands for LDAP-Django data transfer
This commit is contained in:
@@ -1,78 +0,0 @@
|
||||
import ldap
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db.models import Q
|
||||
|
||||
from employee.models import Employee
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Imports employees from LDAP into the Django database"
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
try:
|
||||
connection = ldap.initialize(settings.AUTH_LDAP_SERVER_URI)
|
||||
connection.simple_bind_s(
|
||||
settings.AUTH_LDAP_BIND_DN, settings.AUTH_LDAP_BIND_PASSWORD
|
||||
)
|
||||
|
||||
search_base = (
|
||||
"ou=users,dc=test,dc=com" # Replace with your actual search base
|
||||
)
|
||||
search_filter = "(objectClass=inetOrgPerson)"
|
||||
|
||||
results = connection.search_s(
|
||||
search_base, ldap.SCOPE_SUBTREE, search_filter
|
||||
)
|
||||
|
||||
for dn, entry in results:
|
||||
|
||||
user_id = entry.get("uid", [b""])[0].decode("utf-8")
|
||||
email = entry.get("mail", [b""])[0].decode("utf-8")
|
||||
first_name = entry.get("givenName", [b""])[0].decode("utf-8")
|
||||
last_name = entry.get("sn", [b""])[0].decode("utf-8")
|
||||
name = entry.get("cn", [b""])[0].decode("utf-8")
|
||||
phone = entry.get("telephoneNumber", [b""])[0].decode("utf-8")
|
||||
|
||||
# Get the password from LDAP
|
||||
ldap_password = entry.get("userPassword", [b""])[0].decode("utf-8")
|
||||
|
||||
# Create or update the Employee record, storing the LDAP password
|
||||
employee, created = Employee.objects.update_or_create(
|
||||
email=email,
|
||||
defaults={
|
||||
"employee_first_name": first_name,
|
||||
"employee_last_name": last_name,
|
||||
"email": email,
|
||||
"phone": phone,
|
||||
},
|
||||
)
|
||||
|
||||
# Retrieve the associated User if it exists
|
||||
try:
|
||||
user = User.objects.get(
|
||||
Q(username=email) | Q(username=user_id) | Q(email=email)
|
||||
)
|
||||
user.username = user_id
|
||||
user.set_password(
|
||||
ldap_password
|
||||
) # Hash and set the password securely
|
||||
user.save() # Save the changes to the User instance
|
||||
action = "Updated"
|
||||
except User.DoesNotExist:
|
||||
# If the user does not exist, handle it accordingly (e.g., log a message or create a new user)
|
||||
self.stdout.write(
|
||||
self.style.WARNING(f"User for employee {name} does not exist.")
|
||||
)
|
||||
continue
|
||||
|
||||
action = "Created" if created else "Updated"
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(f"{action} employee {name} with LDAP password")
|
||||
)
|
||||
|
||||
connection.unbind_s()
|
||||
|
||||
except ldap.LDAPError as e:
|
||||
self.stderr.write(self.style.ERROR(f"LDAP Error: {e}"))
|
||||
@@ -1,73 +0,0 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.management.base import BaseCommand
|
||||
from ldap3 import ALL, ALL_ATTRIBUTES, Connection, Server
|
||||
|
||||
from employee.models import Employee
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Import users from Django to LDAP"
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
# LDAP server details
|
||||
ldap_server = "localhost"
|
||||
bind_dn = "cn=admin,dc=test,dc=com" # Replace with your bind DN
|
||||
bind_password = "cool" # Change to your LDAP admin password
|
||||
|
||||
# Connect to the LDAP server
|
||||
server = Server(ldap_server, get_info=ALL)
|
||||
|
||||
try:
|
||||
conn = Connection(server, bind_dn, bind_password, auto_bind=True)
|
||||
|
||||
# Fetch all users from Django
|
||||
users = Employee.objects.all()
|
||||
|
||||
for user in users:
|
||||
|
||||
# Prepare user data for LDAPclear
|
||||
dn = f"uid={user.employee_user_id.username},ou=users,dc=test,dc=com"
|
||||
attributes = {
|
||||
"objectClass": ["inetOrgPerson"],
|
||||
"givenName": user.employee_first_name,
|
||||
"sn": user.employee_last_name,
|
||||
"cn": f"{user.employee_first_name} {user.employee_last_name}",
|
||||
"uid": user.email,
|
||||
"mail": user.email,
|
||||
"telephoneNumber": user.phone,
|
||||
"userPassword": user.phone,
|
||||
}
|
||||
|
||||
# Check if the user already exists in LDAP
|
||||
conn.search(
|
||||
"ou=users,dc=test,dc=com",
|
||||
f"(uid={user.employee_user_id.username})",
|
||||
attributes=ALL_ATTRIBUTES,
|
||||
)
|
||||
|
||||
if conn.entries:
|
||||
self.stdout.write(
|
||||
self.style.WARNING(
|
||||
f"{user.employee_first_name} {user.employee_last_name} already exists in LDAP. Skipping..."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Add user to LDAP
|
||||
if not conn.add(dn, attributes=attributes):
|
||||
self.stdout.write(
|
||||
self.style.ERROR(
|
||||
f"Failed to add {user.employee_first_name} {user.employee_last_name}: {conn.result}"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
f"Successfully added {user.employee_first_name} {user.employee_last_name} to LDAP."
|
||||
)
|
||||
)
|
||||
|
||||
conn.unbind()
|
||||
except Exception as e:
|
||||
self.stdout.write(self.style.ERROR(f"An error occurred: {e}"))
|
||||
Reference in New Issue
Block a user