From c7311f54715abab1e41dbd3276623a0bd2b594a5 Mon Sep 17 00:00:00 2001 From: Horilla Date: Mon, 26 May 2025 14:22:47 +0530 Subject: [PATCH] [UPDT] EMPLOYEE: Optimized employee import method for faster execution --- employee/methods/methods.py | 462 +++++++++++++++++++++--------------- employee/views.py | 9 +- 2 files changed, 276 insertions(+), 195 deletions(-) diff --git a/employee/methods/methods.py b/employee/methods/methods.py index 71332dc43..f66a656c5 100644 --- a/employee/methods/methods.py +++ b/employee/methods/methods.py @@ -6,12 +6,12 @@ import logging import re import threading from datetime import date, datetime -from itertools import groupby +from itertools import chain, groupby import pandas as pd from django.apps import apps from django.contrib.auth.models import User -from django.db import models +from django.db import connection, models, transaction from django.utils.translation import gettext as _ from base.context_processors import get_initial_prefix @@ -28,6 +28,8 @@ from employee.models import Employee, EmployeeWorkInformation logger = logging.getLogger(__name__) +is_postgres = connection.vendor == "postgresql" + error_data_template = { field: [] for field in [ @@ -66,6 +68,11 @@ error_data_template = { } +def chunked(iterable, size): + for i in range(0, len(iterable), size): + yield iterable[i : i + size] + + def normalize_phone(phone): phone = str(phone).strip() if phone.startswith("+"): @@ -244,26 +251,34 @@ def valid_import_file_headers(data_frame): def process_employee_records(data_frame): - created_count = 0 - success_list, error_list = [], [] - employee_dicts = data_frame.to_dict("records") + email_regex = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") phone_regex = re.compile(r"^\+?\d{10,15}$") - allowed_genders = {choice[0] for choice in Employee.choice_gender} - - existing_badge_ids = set(Employee.objects.values_list("badge_id", flat=True)) - existing_usernames = set(User.objects.values_list("username", flat=True)) - existing_name_emails = set( - Employee.objects.values_list( + allowed_genders = frozenset(choice[0] for choice in Employee.choice_gender) + existing_badge_ids = frozenset(Employee.objects.values_list("badge_id", flat=True)) + existing_usernames = frozenset(User.objects.values_list("username", flat=True)) + existing_name_emails = frozenset( + (fname, lname, email) + for fname, lname, email in Employee.objects.values_list( "employee_first_name", "employee_last_name", "email" ) ) - existing_companies = set(Company.objects.values_list("company", flat=True)) + existing_companies = frozenset(Company.objects.values_list("company", flat=True)) + success_list, error_list = [], [] + employee_dicts = data_frame.to_dict("records") + + created_count = 0 + seen_badge_ids = set(existing_badge_ids) + seen_usernames = set(existing_usernames) + seen_name_emails = set(existing_name_emails) + + today = date.today() for emp in employee_dicts: - errors, save = {}, True + errors = {} + save = True - email = str(emp.get("Email", "")).strip() + email = str(emp.get("Email", "")).strip().lower() raw_phone = emp.get("Phone", "") phone = normalize_phone(raw_phone) badge_id = clean_badge_id(emp.get("Badge ID")) @@ -274,14 +289,14 @@ def process_employee_records(data_frame): basic_salary = convert_nan("Basic Salary", emp) salary_hour = convert_nan("Salary Hour", emp) + # Date validation joining_date = import_valid_date( emp.get("Date Joining"), "Joining Date", errors, "Joining Date Error" ) - if "Joining Date Error" in errors: - save = False - if joining_date and joining_date > date.today(): - errors["Joining Date Error"] = "Joining date cannot be in the future." - save = False + if joining_date: + if joining_date > today: + errors["Joining Date Error"] = "Joining date cannot be in the future." + save = False contract_end_date = import_valid_date( emp.get("Contract End Date"), @@ -289,58 +304,64 @@ def process_employee_records(data_frame): errors, "Contract Date Error", ) - if "Contract Error" in errors: - save = False if contract_end_date and joining_date and contract_end_date < joining_date: errors["Contract Date Error"] = ( "Contract end date cannot be before joining date." ) save = False + # Email validation if not email or not email_regex.match(email): errors["Email Error"] = "Invalid email address." save = False + # Name validation if not first_name: errors["First Name Error"] = "First name cannot be empty." save = False + # Phone validation if not phone_regex.match(phone): errors["Phone Error"] = "Invalid phone number format." save = False - if badge_id in existing_badge_ids: + # Badge ID validation + if badge_id in seen_badge_ids: errors["Badge ID Error"] = "An employee with this badge ID already exists." save = False else: - emp["Badge ID"] = badge_id - existing_badge_ids.add(badge_id) + seen_badge_ids.add(badge_id) - if email in existing_usernames: + # Username/email uniqueness + if email in seen_usernames: errors["User ID Error"] = "User with this email already exists." save = False else: - existing_usernames.add(email) + seen_usernames.add(email) + # Name+email uniqueness name_email_tuple = (first_name, last_name, email) - if name_email_tuple in existing_name_emails: + if name_email_tuple in seen_name_emails: errors["Name and Email Error"] = ( "This employee already exists in the system." ) save = False else: - existing_name_emails.add(name_email_tuple) + seen_name_emails.add(name_email_tuple) + # Gender validation if gender and gender not in allowed_genders: errors["Gender Error"] = ( f"Invalid gender. Allowed values: {', '.join(allowed_genders)}." ) save = False + # Company validation if company and company not in existing_companies: errors["Company Error"] = f"Company '{company}' does not exist." save = False + # Salary validation if basic_salary not in [None, ""]: try: basic_salary_val = float(basic_salary) @@ -361,6 +382,7 @@ def process_employee_records(data_frame): ) save = False + # Final processing if save: emp["Phone"] = phone emp["Date Joining"] = joining_date @@ -376,73 +398,87 @@ def process_employee_records(data_frame): def bulk_create_user_import(success_lists): """ - Bulk creation of user instances based on the excel import of employees + Creates new User instances in bulk from a list of dictionaries containing user data. + + Returns: + list: A list of created User instances. If no new users are created, returns an empty list. """ - user_obj_list = [] - existing_usernames = { - user.username - for user in User.objects.filter( - username__in=[row["Email"] for row in success_lists] + emails = [row["Email"] for row in success_lists] + existing_usernames = ( + set(User.objects.filter(username__in=emails).values_list("username", flat=True)) + if is_postgres + else set( + chain.from_iterable( + User.objects.filter(username__in=chunk).values_list( + "username", flat=True + ) + for chunk in chunked(emails, 999) + ) ) - } + ) - for work_info in success_lists: - email = work_info["Email"] - if email in existing_usernames: - continue - - phone = work_info["Phone"] - user_obj = User( - username=email, - email=email, - password=str(phone).strip(), + users_to_create = [ + User( + username=row["Email"], + email=row["Email"], + password=str(row["Phone"]).strip(), is_superuser=False, ) - user_obj_list.append(user_obj) - result = [] - if user_obj_list: - result = User.objects.bulk_create(user_obj_list, batch_size=1000) - return result + for row in success_lists + if row["Email"] not in existing_usernames + ] + + created_users = [] + if users_to_create: + with transaction.atomic(): + created_users = User.objects.bulk_create( + users_to_create, batch_size=None if is_postgres else 999 + ) + return created_users def bulk_create_employee_import(success_lists): """ - Bulk creation of employee instances based on the excel import of employees + Creates Employee instances in bulk based on imported data. + Uses adaptive chunking for compatibility with SQLite, avoids chunking in PostgreSQL. """ - employee_obj_list = [] + emails = [row["Email"] for row in success_lists] + is_postgres = connection.vendor == "postgresql" + existing_users = { user.username: user - for user in User.objects.filter( - username__in=[row["Email"] for row in success_lists] + for user in ( + User.objects.filter(username__in=emails).only("id", "username") + if is_postgres + else chain.from_iterable( + User.objects.filter(username__in=chunk).only("id", "username") + for chunk in chunked(emails, 999) + ) ) } - for work_info in success_lists: - email = work_info["Email"] - user = existing_users.get(email) - if not user: - continue - - badge_id = work_info["Badge ID"] - first_name = convert_nan("First Name", work_info) - last_name = convert_nan("Last Name", work_info) - phone = work_info["Phone"] - gender = work_info.get("Gender", "").lower() - employee_obj = Employee( - employee_user_id=user, - badge_id=badge_id, - employee_first_name=first_name, - employee_last_name=last_name, - email=email, - phone=phone, - gender=gender, + employees_to_create = [ + Employee( + employee_user_id=existing_users[row["Email"]], + badge_id=row["Badge ID"], + employee_first_name=convert_nan("First Name", row), + employee_last_name=convert_nan("Last Name", row), + email=row["Email"], + phone=row["Phone"], + gender=row.get("Gender", "").lower(), ) - employee_obj_list.append(employee_obj) - result = [] - if employee_obj_list: - result = Employee.objects.bulk_create(employee_obj_list, batch_size=1000) + for row in success_lists + if row["Email"] in existing_users + ] - return result + created_employees = [] + if employees_to_create: + with transaction.atomic(): + created_employees = Employee.objects.bulk_create( + employees_to_create, batch_size=None if is_postgres else 999 + ) + + return created_employees def set_initial_password(employees): @@ -483,162 +519,191 @@ def bulk_create_department_import(success_lists): Bulk creation of department instances based on the excel import of employees """ departments_to_import = { - convert_nan("Department", work_info) for work_info in success_lists + dept + for work_info in success_lists + if (dept := convert_nan("Department", work_info)) } - existing_departments = {dep.department for dep in Department.objects.all()} - department_obj_list = [] - for department in departments_to_import: - if department and department not in existing_departments: - department_obj = Department(department=department) - department_obj_list.append(department_obj) - existing_departments.add(department) + existing_departments = set(Department.objects.values_list("department", flat=True)) - if department_obj_list: - Department.objects.bulk_create(department_obj_list) + new_departments = [ + Department(department=dept) + for dept in departments_to_import - existing_departments + ] + + if new_departments: + with transaction.atomic(): + Department.objects.bulk_create( + new_departments, batch_size=None if is_postgres else 999 + ) def bulk_create_job_position_import(success_lists): """ - Bulk creation of job position instances based on the excel import of employees + Optimized: Bulk creation of job position instances based on the Excel import of employees. """ + + # Step 1: Extract unique (job_position, department_name) pairs job_positions_to_import = { - (convert_nan("Job Position", work_info), convert_nan("Department", work_info)) - for work_info in success_lists + (convert_nan("Job Position", item), convert_nan("Department", item)) + for item in success_lists + if convert_nan("Job Position", item) and convert_nan("Department", item) } - departments = {dep.department: dep for dep in Department.objects.all()} - existing_job_positions = { - (job_position.job_position, job_position.department_id): job_position - for job_position in JobPosition.objects.all() - } - job_position_obj_list = [] - for job_position, department_name in job_positions_to_import: - if not job_position or not department_name: - continue - department_obj = departments.get(department_name) - if not department_obj: - continue + if not job_positions_to_import: + return # No valid data to import - # Check if this job position already exists for this department - if (job_position, department_obj.id) not in existing_job_positions: - job_position_obj = JobPosition( - department_id=department_obj, job_position=job_position + # Step 2: Fetch all departments at once and build a name -> object map + department_objs = Department.objects.only("id", "department") + department_lookup = {dep.department: dep for dep in department_objs} + + # Step 3: Filter out entries with unknown departments + valid_pairs = [ + (jp, department_lookup[dept]) + for jp, dept in job_positions_to_import + if dept in department_lookup + ] + + if not valid_pairs: + return # No valid (job_position, department_id) pairs to process + + # Step 4: Fetch existing job positions + existing_pairs = set( + JobPosition.objects.filter( + department_id__in={dept_id for _, dept_id in valid_pairs} + ).values_list("job_position", "department_id") + ) + + # Step 5: Create list of new JobPosition instances + new_positions = [ + JobPosition(job_position=jp, department_id=dept_id) + for jp, dept_id in valid_pairs + if (jp, dept_id) not in existing_pairs + ] + + # Step 6: Bulk create in a transaction + if new_positions: + with transaction.atomic(): + JobPosition.objects.bulk_create( + new_positions, batch_size=None if is_postgres else 999 ) - job_position_obj_list.append(job_position_obj) - existing_job_positions[(job_position, department_obj.id)] = job_position_obj - - if job_position_obj_list: - JobPosition.objects.bulk_create(job_position_obj_list) def bulk_create_job_role_import(success_lists): """ Bulk creation of job role instances based on the excel import of employees """ - # Collect job role names and their associated job positions into a set as tubles + # Extract unique (job_role, job_position) pairs, filtering out empty values job_roles_to_import = { - (convert_nan("Job Role", work_info), convert_nan("Job Position", work_info)) + (role, pos) for work_info in success_lists + if (role := convert_nan("Job Role", work_info)) + and (pos := convert_nan("Job Position", work_info)) } - job_positions = {jp.job_position: jp for jp in JobPosition.objects.all()} - existing_job_roles = { - (jr.job_role, jr.job_position_id): jr for jr in JobRole.objects.all() - } + # Prefetch existing data efficiently + job_positions = JobPosition.objects.only("id", "job_position") + existing_job_roles = set(JobRole.objects.values_list("job_role", "job_position_id")) - job_role_obj_list = [] + # Create new job roles + new_job_roles = [ + JobRole(job_role=role, job_position_id=job_positions[pos].id) + for role, pos in job_roles_to_import + if pos in job_positions + and (role, job_positions[pos].id) not in existing_job_roles + ] - for job_role, job_position_name in job_roles_to_import: - - if not job_role or not job_position_name: - continue - - job_position_obj = job_positions.get(job_position_name) - if not job_position_obj: - continue - - if (job_role, job_position_obj.id) not in existing_job_roles: - job_role_obj = JobRole(job_position_id=job_position_obj, job_role=job_role) - job_role_obj_list.append(job_role_obj) - existing_job_roles[(job_role, job_position_obj.id)] = job_role_obj - - if job_role_obj_list: - JobRole.objects.bulk_create(job_role_obj_list) + # Bulk create if there are new roles + if new_job_roles: + with transaction.atomic(): + JobRole.objects.bulk_create( + new_job_roles, batch_size=None if is_postgres else 999 + ) def bulk_create_work_types(success_lists): """ Bulk creation of work type instances based on the excel import of employees """ - # Collect unique work types + # Extract unique work types, filtering out None values work_types_to_import = { - convert_nan("Work Type", work_info) for work_info in success_lists + wt for work_info in success_lists if (wt := convert_nan("Work Type", work_info)) } - work_types_to_import.discard(None) - # Fetch existing work types - existing_work_types = {wt.work_type: wt for wt in WorkType.objects.all()} + # Get existing work types in one optimized query + existing_work_types = set(WorkType.objects.values_list("work_type", flat=True)) - # Prepare list for new WorkType objects - work_type_obj_list = [ - WorkType(work_type=work_type) - for work_type in work_types_to_import - if work_type not in existing_work_types + # Create new work type objects + new_work_types = [ + WorkType(work_type=wt) for wt in work_types_to_import - existing_work_types ] - # Bulk create new work types - if work_type_obj_list: - WorkType.objects.bulk_create(work_type_obj_list) + + # Bulk create if there are new work types + if new_work_types: + with transaction.atomic(): + WorkType.objects.bulk_create( + new_work_types, batch_size=None if is_postgres else 999 + ) def bulk_create_shifts(success_lists): """ Bulk creation of shift instances based on the excel import of employees """ - # Collect unique shifts - shifts_to_import = {convert_nan("Shift", work_info) for work_info in success_lists} - shifts_to_import.discard(None) - - # Fetch existing shifts - existing_shifts = { - shift.employee_shift: shift for shift in EmployeeShift.objects.all() + # Extract unique shifts, filtering out None values + shifts_to_import = { + shift + for work_info in success_lists + if (shift := convert_nan("Shift", work_info)) } - # Prepare list for new EmployeeShift objects - shift_obj_list = [ + # Get existing shifts in one optimized query + existing_shifts = set( + EmployeeShift.objects.values_list("employee_shift", flat=True) + ) + + # Create new shift objects + new_shifts = [ EmployeeShift(employee_shift=shift) - for shift in shifts_to_import - if shift not in existing_shifts + for shift in shifts_to_import - existing_shifts ] - # Bulk create new shifts - if shift_obj_list: - EmployeeShift.objects.bulk_create(shift_obj_list) + + # Bulk create if there are new shifts + if new_shifts: + with transaction.atomic(): + EmployeeShift.objects.bulk_create( + new_shifts, batch_size=None if is_postgres else 999 + ) def bulk_create_employee_types(success_lists): """ Bulk creation of employee type instances based on the excel import of employees """ - # Collect unique employee types + # Extract unique employee types, filtering out None values employee_types_to_import = { - convert_nan("Employee Type", work_info) for work_info in success_lists - } - employee_types_to_import.discard(None) - - # Fetch existing employee types - existing_employee_types = { - et.employee_type: et for et in EmployeeType.objects.all() + et + for work_info in success_lists + if (et := convert_nan("Employee Type", work_info)) } - # Prepare list for new EmployeeType objects - employee_type_obj_list = [ - EmployeeType(employee_type=employee_type) - for employee_type in employee_types_to_import - if employee_type not in existing_employee_types + # Get existing employee types in one optimized query + existing_employee_types = set( + EmployeeType.objects.values_list("employee_type", flat=True) + ) + + # Create new employee type objects + new_employee_types = [ + EmployeeType(employee_type=et) + for et in employee_types_to_import - existing_employee_types ] - # Bulk create new employee types - if employee_type_obj_list: - EmployeeType.objects.bulk_create(employee_type_obj_list) + + # Bulk create if there are new types + if new_employee_types: + with transaction.atomic(): + EmployeeType.objects.bulk_create( + new_employee_types, batch_size=None if is_postgres else 999 + ) def create_contracts_in_thread(new_work_info_list, update_work_info_list): @@ -687,18 +752,34 @@ def bulk_create_work_info_import(success_lists): shifts = set(row.get("Shift") for row in success_lists) companies = set(row.get("Company") for row in success_lists) - existing_employees = { - emp.badge_id: emp - for emp in Employee.objects.entire() - .filter(badge_id__in=badge_ids) - .only("badge_id") - } + chunk_size = None if is_postgres else 999 + employee_qs = ( + chain.from_iterable( + Employee.objects.entire().filter(badge_id__in=chunk).only("badge_id") + for chunk in chunked(badge_ids, chunk_size) + ) + if chunk_size + else Employee.objects.entire().filter(badge_id__in=badge_ids).only("badge_id") + ) + + existing_employees = {emp.badge_id: emp for emp in employee_qs} + existing_employee_work_infos = { emp.employee_id: emp - for emp in EmployeeWorkInformation.objects.filter( - employee_id__in=existing_employees.values() - ).only("employee_id") + for emp in ( + EmployeeWorkInformation.objects.filter( + employee_id__in=existing_employees.values() + ).only("employee_id") + if is_postgres + else chain.from_iterable( + EmployeeWorkInformation.objects.filter(employee_id__in=chunk).only( + "employee_id" + ) + for chunk in chunked(list(existing_employees.values()), 900) + ) + ) } + existing_departments = { dep.department: dep for dep in Department.objects.filter(department__in=departments).only( @@ -841,9 +922,10 @@ def bulk_create_work_info_import(success_lists): employee_work_info.basic_salary = basic_salary employee_work_info.salary_hour = salary_hour update_work_info_list.append(employee_work_info) - if new_work_info_list: - EmployeeWorkInformation.objects.bulk_create(new_work_info_list, batch_size=1000) + EmployeeWorkInformation.objects.bulk_create( + new_work_info_list, batch_size=None if is_postgres else 999 + ) if update_work_info_list: EmployeeWorkInformation.objects.bulk_update( update_work_info_list, @@ -863,7 +945,7 @@ def bulk_create_work_info_import(success_lists): "basic_salary", "salary_hour", ], - batch_size=1000, + batch_size=None if is_postgres else 999, ) if apps.is_installed("payroll"): diff --git a/employee/views.py b/employee/views.py index d39c7d305..8afcdbb06 100755 --- a/employee/views.py +++ b/employee/views.py @@ -2572,11 +2572,6 @@ def work_info_import(request): try: users = bulk_create_user_import(success_list) employees = bulk_create_employee_import(success_list) - thread = threading.Thread( - target=set_initial_password, args=(employees,) - ) - thread.start() - bulk_create_department_import(success_list) bulk_create_job_position_import(success_list) bulk_create_job_role_import(success_list) @@ -2584,6 +2579,10 @@ def work_info_import(request): bulk_create_shifts(success_list) bulk_create_employee_types(success_list) bulk_create_work_info_import(success_list) + thread = threading.Thread( + target=set_initial_password, args=(employees,) + ) + thread.start() except Exception as e: messages.error(request, _("Error Occured {}").format(e))