[UPDT] EMPLOYEE: Optimized employee import method for faster execution

This commit is contained in:
Horilla
2025-05-26 14:22:47 +05:30
parent 34c4d5ce61
commit c7311f5471
2 changed files with 276 additions and 195 deletions

View File

@@ -6,12 +6,12 @@ import logging
import re
import threading
from datetime import date, datetime
from itertools import groupby
from itertools import chain, groupby
import pandas as pd
from django.apps import apps
from django.contrib.auth.models import User
from django.db import models
from django.db import connection, models, transaction
from django.utils.translation import gettext as _
from base.context_processors import get_initial_prefix
@@ -28,6 +28,8 @@ from employee.models import Employee, EmployeeWorkInformation
logger = logging.getLogger(__name__)
is_postgres = connection.vendor == "postgresql"
error_data_template = {
field: []
for field in [
@@ -66,6 +68,11 @@ error_data_template = {
}
def chunked(iterable, size):
for i in range(0, len(iterable), size):
yield iterable[i : i + size]
def normalize_phone(phone):
phone = str(phone).strip()
if phone.startswith("+"):
@@ -244,26 +251,34 @@ def valid_import_file_headers(data_frame):
def process_employee_records(data_frame):
created_count = 0
success_list, error_list = [], []
employee_dicts = data_frame.to_dict("records")
email_regex = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
phone_regex = re.compile(r"^\+?\d{10,15}$")
allowed_genders = {choice[0] for choice in Employee.choice_gender}
existing_badge_ids = set(Employee.objects.values_list("badge_id", flat=True))
existing_usernames = set(User.objects.values_list("username", flat=True))
existing_name_emails = set(
Employee.objects.values_list(
allowed_genders = frozenset(choice[0] for choice in Employee.choice_gender)
existing_badge_ids = frozenset(Employee.objects.values_list("badge_id", flat=True))
existing_usernames = frozenset(User.objects.values_list("username", flat=True))
existing_name_emails = frozenset(
(fname, lname, email)
for fname, lname, email in Employee.objects.values_list(
"employee_first_name", "employee_last_name", "email"
)
)
existing_companies = set(Company.objects.values_list("company", flat=True))
existing_companies = frozenset(Company.objects.values_list("company", flat=True))
success_list, error_list = [], []
employee_dicts = data_frame.to_dict("records")
created_count = 0
seen_badge_ids = set(existing_badge_ids)
seen_usernames = set(existing_usernames)
seen_name_emails = set(existing_name_emails)
today = date.today()
for emp in employee_dicts:
errors, save = {}, True
errors = {}
save = True
email = str(emp.get("Email", "")).strip()
email = str(emp.get("Email", "")).strip().lower()
raw_phone = emp.get("Phone", "")
phone = normalize_phone(raw_phone)
badge_id = clean_badge_id(emp.get("Badge ID"))
@@ -274,14 +289,14 @@ def process_employee_records(data_frame):
basic_salary = convert_nan("Basic Salary", emp)
salary_hour = convert_nan("Salary Hour", emp)
# Date validation
joining_date = import_valid_date(
emp.get("Date Joining"), "Joining Date", errors, "Joining Date Error"
)
if "Joining Date Error" in errors:
save = False
if joining_date and joining_date > date.today():
errors["Joining Date Error"] = "Joining date cannot be in the future."
save = False
if joining_date:
if joining_date > today:
errors["Joining Date Error"] = "Joining date cannot be in the future."
save = False
contract_end_date = import_valid_date(
emp.get("Contract End Date"),
@@ -289,58 +304,64 @@ def process_employee_records(data_frame):
errors,
"Contract Date Error",
)
if "Contract Error" in errors:
save = False
if contract_end_date and joining_date and contract_end_date < joining_date:
errors["Contract Date Error"] = (
"Contract end date cannot be before joining date."
)
save = False
# Email validation
if not email or not email_regex.match(email):
errors["Email Error"] = "Invalid email address."
save = False
# Name validation
if not first_name:
errors["First Name Error"] = "First name cannot be empty."
save = False
# Phone validation
if not phone_regex.match(phone):
errors["Phone Error"] = "Invalid phone number format."
save = False
if badge_id in existing_badge_ids:
# Badge ID validation
if badge_id in seen_badge_ids:
errors["Badge ID Error"] = "An employee with this badge ID already exists."
save = False
else:
emp["Badge ID"] = badge_id
existing_badge_ids.add(badge_id)
seen_badge_ids.add(badge_id)
if email in existing_usernames:
# Username/email uniqueness
if email in seen_usernames:
errors["User ID Error"] = "User with this email already exists."
save = False
else:
existing_usernames.add(email)
seen_usernames.add(email)
# Name+email uniqueness
name_email_tuple = (first_name, last_name, email)
if name_email_tuple in existing_name_emails:
if name_email_tuple in seen_name_emails:
errors["Name and Email Error"] = (
"This employee already exists in the system."
)
save = False
else:
existing_name_emails.add(name_email_tuple)
seen_name_emails.add(name_email_tuple)
# Gender validation
if gender and gender not in allowed_genders:
errors["Gender Error"] = (
f"Invalid gender. Allowed values: {', '.join(allowed_genders)}."
)
save = False
# Company validation
if company and company not in existing_companies:
errors["Company Error"] = f"Company '{company}' does not exist."
save = False
# Salary validation
if basic_salary not in [None, ""]:
try:
basic_salary_val = float(basic_salary)
@@ -361,6 +382,7 @@ def process_employee_records(data_frame):
)
save = False
# Final processing
if save:
emp["Phone"] = phone
emp["Date Joining"] = joining_date
@@ -376,73 +398,87 @@ def process_employee_records(data_frame):
def bulk_create_user_import(success_lists):
"""
Bulk creation of user instances based on the excel import of employees
Creates new User instances in bulk from a list of dictionaries containing user data.
Returns:
list: A list of created User instances. If no new users are created, returns an empty list.
"""
user_obj_list = []
existing_usernames = {
user.username
for user in User.objects.filter(
username__in=[row["Email"] for row in success_lists]
emails = [row["Email"] for row in success_lists]
existing_usernames = (
set(User.objects.filter(username__in=emails).values_list("username", flat=True))
if is_postgres
else set(
chain.from_iterable(
User.objects.filter(username__in=chunk).values_list(
"username", flat=True
)
for chunk in chunked(emails, 999)
)
)
}
)
for work_info in success_lists:
email = work_info["Email"]
if email in existing_usernames:
continue
phone = work_info["Phone"]
user_obj = User(
username=email,
email=email,
password=str(phone).strip(),
users_to_create = [
User(
username=row["Email"],
email=row["Email"],
password=str(row["Phone"]).strip(),
is_superuser=False,
)
user_obj_list.append(user_obj)
result = []
if user_obj_list:
result = User.objects.bulk_create(user_obj_list, batch_size=1000)
return result
for row in success_lists
if row["Email"] not in existing_usernames
]
created_users = []
if users_to_create:
with transaction.atomic():
created_users = User.objects.bulk_create(
users_to_create, batch_size=None if is_postgres else 999
)
return created_users
def bulk_create_employee_import(success_lists):
"""
Bulk creation of employee instances based on the excel import of employees
Creates Employee instances in bulk based on imported data.
Uses adaptive chunking for compatibility with SQLite, avoids chunking in PostgreSQL.
"""
employee_obj_list = []
emails = [row["Email"] for row in success_lists]
is_postgres = connection.vendor == "postgresql"
existing_users = {
user.username: user
for user in User.objects.filter(
username__in=[row["Email"] for row in success_lists]
for user in (
User.objects.filter(username__in=emails).only("id", "username")
if is_postgres
else chain.from_iterable(
User.objects.filter(username__in=chunk).only("id", "username")
for chunk in chunked(emails, 999)
)
)
}
for work_info in success_lists:
email = work_info["Email"]
user = existing_users.get(email)
if not user:
continue
badge_id = work_info["Badge ID"]
first_name = convert_nan("First Name", work_info)
last_name = convert_nan("Last Name", work_info)
phone = work_info["Phone"]
gender = work_info.get("Gender", "").lower()
employee_obj = Employee(
employee_user_id=user,
badge_id=badge_id,
employee_first_name=first_name,
employee_last_name=last_name,
email=email,
phone=phone,
gender=gender,
employees_to_create = [
Employee(
employee_user_id=existing_users[row["Email"]],
badge_id=row["Badge ID"],
employee_first_name=convert_nan("First Name", row),
employee_last_name=convert_nan("Last Name", row),
email=row["Email"],
phone=row["Phone"],
gender=row.get("Gender", "").lower(),
)
employee_obj_list.append(employee_obj)
result = []
if employee_obj_list:
result = Employee.objects.bulk_create(employee_obj_list, batch_size=1000)
for row in success_lists
if row["Email"] in existing_users
]
return result
created_employees = []
if employees_to_create:
with transaction.atomic():
created_employees = Employee.objects.bulk_create(
employees_to_create, batch_size=None if is_postgres else 999
)
return created_employees
def set_initial_password(employees):
@@ -483,162 +519,191 @@ def bulk_create_department_import(success_lists):
Bulk creation of department instances based on the excel import of employees
"""
departments_to_import = {
convert_nan("Department", work_info) for work_info in success_lists
dept
for work_info in success_lists
if (dept := convert_nan("Department", work_info))
}
existing_departments = {dep.department for dep in Department.objects.all()}
department_obj_list = []
for department in departments_to_import:
if department and department not in existing_departments:
department_obj = Department(department=department)
department_obj_list.append(department_obj)
existing_departments.add(department)
existing_departments = set(Department.objects.values_list("department", flat=True))
if department_obj_list:
Department.objects.bulk_create(department_obj_list)
new_departments = [
Department(department=dept)
for dept in departments_to_import - existing_departments
]
if new_departments:
with transaction.atomic():
Department.objects.bulk_create(
new_departments, batch_size=None if is_postgres else 999
)
def bulk_create_job_position_import(success_lists):
"""
Bulk creation of job position instances based on the excel import of employees
Optimized: Bulk creation of job position instances based on the Excel import of employees.
"""
# Step 1: Extract unique (job_position, department_name) pairs
job_positions_to_import = {
(convert_nan("Job Position", work_info), convert_nan("Department", work_info))
for work_info in success_lists
(convert_nan("Job Position", item), convert_nan("Department", item))
for item in success_lists
if convert_nan("Job Position", item) and convert_nan("Department", item)
}
departments = {dep.department: dep for dep in Department.objects.all()}
existing_job_positions = {
(job_position.job_position, job_position.department_id): job_position
for job_position in JobPosition.objects.all()
}
job_position_obj_list = []
for job_position, department_name in job_positions_to_import:
if not job_position or not department_name:
continue
department_obj = departments.get(department_name)
if not department_obj:
continue
if not job_positions_to_import:
return # No valid data to import
# Check if this job position already exists for this department
if (job_position, department_obj.id) not in existing_job_positions:
job_position_obj = JobPosition(
department_id=department_obj, job_position=job_position
# Step 2: Fetch all departments at once and build a name -> object map
department_objs = Department.objects.only("id", "department")
department_lookup = {dep.department: dep for dep in department_objs}
# Step 3: Filter out entries with unknown departments
valid_pairs = [
(jp, department_lookup[dept])
for jp, dept in job_positions_to_import
if dept in department_lookup
]
if not valid_pairs:
return # No valid (job_position, department_id) pairs to process
# Step 4: Fetch existing job positions
existing_pairs = set(
JobPosition.objects.filter(
department_id__in={dept_id for _, dept_id in valid_pairs}
).values_list("job_position", "department_id")
)
# Step 5: Create list of new JobPosition instances
new_positions = [
JobPosition(job_position=jp, department_id=dept_id)
for jp, dept_id in valid_pairs
if (jp, dept_id) not in existing_pairs
]
# Step 6: Bulk create in a transaction
if new_positions:
with transaction.atomic():
JobPosition.objects.bulk_create(
new_positions, batch_size=None if is_postgres else 999
)
job_position_obj_list.append(job_position_obj)
existing_job_positions[(job_position, department_obj.id)] = job_position_obj
if job_position_obj_list:
JobPosition.objects.bulk_create(job_position_obj_list)
def bulk_create_job_role_import(success_lists):
"""
Bulk creation of job role instances based on the excel import of employees
"""
# Collect job role names and their associated job positions into a set as tubles
# Extract unique (job_role, job_position) pairs, filtering out empty values
job_roles_to_import = {
(convert_nan("Job Role", work_info), convert_nan("Job Position", work_info))
(role, pos)
for work_info in success_lists
if (role := convert_nan("Job Role", work_info))
and (pos := convert_nan("Job Position", work_info))
}
job_positions = {jp.job_position: jp for jp in JobPosition.objects.all()}
existing_job_roles = {
(jr.job_role, jr.job_position_id): jr for jr in JobRole.objects.all()
}
# Prefetch existing data efficiently
job_positions = JobPosition.objects.only("id", "job_position")
existing_job_roles = set(JobRole.objects.values_list("job_role", "job_position_id"))
job_role_obj_list = []
# Create new job roles
new_job_roles = [
JobRole(job_role=role, job_position_id=job_positions[pos].id)
for role, pos in job_roles_to_import
if pos in job_positions
and (role, job_positions[pos].id) not in existing_job_roles
]
for job_role, job_position_name in job_roles_to_import:
if not job_role or not job_position_name:
continue
job_position_obj = job_positions.get(job_position_name)
if not job_position_obj:
continue
if (job_role, job_position_obj.id) not in existing_job_roles:
job_role_obj = JobRole(job_position_id=job_position_obj, job_role=job_role)
job_role_obj_list.append(job_role_obj)
existing_job_roles[(job_role, job_position_obj.id)] = job_role_obj
if job_role_obj_list:
JobRole.objects.bulk_create(job_role_obj_list)
# Bulk create if there are new roles
if new_job_roles:
with transaction.atomic():
JobRole.objects.bulk_create(
new_job_roles, batch_size=None if is_postgres else 999
)
def bulk_create_work_types(success_lists):
"""
Bulk creation of work type instances based on the excel import of employees
"""
# Collect unique work types
# Extract unique work types, filtering out None values
work_types_to_import = {
convert_nan("Work Type", work_info) for work_info in success_lists
wt for work_info in success_lists if (wt := convert_nan("Work Type", work_info))
}
work_types_to_import.discard(None)
# Fetch existing work types
existing_work_types = {wt.work_type: wt for wt in WorkType.objects.all()}
# Get existing work types in one optimized query
existing_work_types = set(WorkType.objects.values_list("work_type", flat=True))
# Prepare list for new WorkType objects
work_type_obj_list = [
WorkType(work_type=work_type)
for work_type in work_types_to_import
if work_type not in existing_work_types
# Create new work type objects
new_work_types = [
WorkType(work_type=wt) for wt in work_types_to_import - existing_work_types
]
# Bulk create new work types
if work_type_obj_list:
WorkType.objects.bulk_create(work_type_obj_list)
# Bulk create if there are new work types
if new_work_types:
with transaction.atomic():
WorkType.objects.bulk_create(
new_work_types, batch_size=None if is_postgres else 999
)
def bulk_create_shifts(success_lists):
"""
Bulk creation of shift instances based on the excel import of employees
"""
# Collect unique shifts
shifts_to_import = {convert_nan("Shift", work_info) for work_info in success_lists}
shifts_to_import.discard(None)
# Fetch existing shifts
existing_shifts = {
shift.employee_shift: shift for shift in EmployeeShift.objects.all()
# Extract unique shifts, filtering out None values
shifts_to_import = {
shift
for work_info in success_lists
if (shift := convert_nan("Shift", work_info))
}
# Prepare list for new EmployeeShift objects
shift_obj_list = [
# Get existing shifts in one optimized query
existing_shifts = set(
EmployeeShift.objects.values_list("employee_shift", flat=True)
)
# Create new shift objects
new_shifts = [
EmployeeShift(employee_shift=shift)
for shift in shifts_to_import
if shift not in existing_shifts
for shift in shifts_to_import - existing_shifts
]
# Bulk create new shifts
if shift_obj_list:
EmployeeShift.objects.bulk_create(shift_obj_list)
# Bulk create if there are new shifts
if new_shifts:
with transaction.atomic():
EmployeeShift.objects.bulk_create(
new_shifts, batch_size=None if is_postgres else 999
)
def bulk_create_employee_types(success_lists):
"""
Bulk creation of employee type instances based on the excel import of employees
"""
# Collect unique employee types
# Extract unique employee types, filtering out None values
employee_types_to_import = {
convert_nan("Employee Type", work_info) for work_info in success_lists
}
employee_types_to_import.discard(None)
# Fetch existing employee types
existing_employee_types = {
et.employee_type: et for et in EmployeeType.objects.all()
et
for work_info in success_lists
if (et := convert_nan("Employee Type", work_info))
}
# Prepare list for new EmployeeType objects
employee_type_obj_list = [
EmployeeType(employee_type=employee_type)
for employee_type in employee_types_to_import
if employee_type not in existing_employee_types
# Get existing employee types in one optimized query
existing_employee_types = set(
EmployeeType.objects.values_list("employee_type", flat=True)
)
# Create new employee type objects
new_employee_types = [
EmployeeType(employee_type=et)
for et in employee_types_to_import - existing_employee_types
]
# Bulk create new employee types
if employee_type_obj_list:
EmployeeType.objects.bulk_create(employee_type_obj_list)
# Bulk create if there are new types
if new_employee_types:
with transaction.atomic():
EmployeeType.objects.bulk_create(
new_employee_types, batch_size=None if is_postgres else 999
)
def create_contracts_in_thread(new_work_info_list, update_work_info_list):
@@ -687,18 +752,34 @@ def bulk_create_work_info_import(success_lists):
shifts = set(row.get("Shift") for row in success_lists)
companies = set(row.get("Company") for row in success_lists)
existing_employees = {
emp.badge_id: emp
for emp in Employee.objects.entire()
.filter(badge_id__in=badge_ids)
.only("badge_id")
}
chunk_size = None if is_postgres else 999
employee_qs = (
chain.from_iterable(
Employee.objects.entire().filter(badge_id__in=chunk).only("badge_id")
for chunk in chunked(badge_ids, chunk_size)
)
if chunk_size
else Employee.objects.entire().filter(badge_id__in=badge_ids).only("badge_id")
)
existing_employees = {emp.badge_id: emp for emp in employee_qs}
existing_employee_work_infos = {
emp.employee_id: emp
for emp in EmployeeWorkInformation.objects.filter(
employee_id__in=existing_employees.values()
).only("employee_id")
for emp in (
EmployeeWorkInformation.objects.filter(
employee_id__in=existing_employees.values()
).only("employee_id")
if is_postgres
else chain.from_iterable(
EmployeeWorkInformation.objects.filter(employee_id__in=chunk).only(
"employee_id"
)
for chunk in chunked(list(existing_employees.values()), 900)
)
)
}
existing_departments = {
dep.department: dep
for dep in Department.objects.filter(department__in=departments).only(
@@ -841,9 +922,10 @@ def bulk_create_work_info_import(success_lists):
employee_work_info.basic_salary = basic_salary
employee_work_info.salary_hour = salary_hour
update_work_info_list.append(employee_work_info)
if new_work_info_list:
EmployeeWorkInformation.objects.bulk_create(new_work_info_list, batch_size=1000)
EmployeeWorkInformation.objects.bulk_create(
new_work_info_list, batch_size=None if is_postgres else 999
)
if update_work_info_list:
EmployeeWorkInformation.objects.bulk_update(
update_work_info_list,
@@ -863,7 +945,7 @@ def bulk_create_work_info_import(success_lists):
"basic_salary",
"salary_hour",
],
batch_size=1000,
batch_size=None if is_postgres else 999,
)
if apps.is_installed("payroll"):

View File

@@ -2572,11 +2572,6 @@ def work_info_import(request):
try:
users = bulk_create_user_import(success_list)
employees = bulk_create_employee_import(success_list)
thread = threading.Thread(
target=set_initial_password, args=(employees,)
)
thread.start()
bulk_create_department_import(success_list)
bulk_create_job_position_import(success_list)
bulk_create_job_role_import(success_list)
@@ -2584,6 +2579,10 @@ def work_info_import(request):
bulk_create_shifts(success_list)
bulk_create_employee_types(success_list)
bulk_create_work_info_import(success_list)
thread = threading.Thread(
target=set_initial_password, args=(employees,)
)
thread.start()
except Exception as e:
messages.error(request, _("Error Occured {}").format(e))