1155 lines
37 KiB
Python
1155 lines
37 KiB
Python
import ast
|
|
import calendar
|
|
import json
|
|
import os
|
|
import random
|
|
from datetime import date, datetime, time, timedelta
|
|
|
|
import pandas as pd
|
|
import pdfkit
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group
|
|
from django.contrib.staticfiles import finders
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.core.paginator import Paginator
|
|
from django.db import models
|
|
from django.db.models import ForeignKey, ManyToManyField, OneToOneField, Q
|
|
from django.db.models.functions import Lower
|
|
from django.forms.models import ModelChoiceField
|
|
from django.http import HttpResponse
|
|
from django.template.loader import render_to_string
|
|
from django.utils.translation import gettext as _
|
|
|
|
from base.models import Company, CompanyLeaves, DynamicPagination, Holidays
|
|
from employee.models import Employee, EmployeeWorkInformation
|
|
from horilla.horilla_apps import NESTED_SUBORDINATE_VISIBILITY
|
|
from horilla.horilla_middlewares import _thread_locals
|
|
from horilla.horilla_settings import HORILLA_DATE_FORMATS, HORILLA_TIME_FORMATS
|
|
|
|
|
|
def users_count(self):
|
|
"""
|
|
Restrict Group users_count to selected company context
|
|
"""
|
|
return Employee.objects.filter(employee_user_id__in=self.user_set.all()).count()
|
|
|
|
|
|
Group.add_to_class("users_count", property(users_count))
|
|
|
|
|
|
# def filtersubordinates(request, queryset, perm=None, field="employee_id"):
|
|
# """
|
|
# This method is used to filter out subordinates queryset element.
|
|
# """
|
|
# user = request.user
|
|
# if user.has_perm(perm):
|
|
# return queryset
|
|
|
|
# if not request:
|
|
# return queryset
|
|
# if NESTED_SUBORDINATE_VISIBILITY:
|
|
# current_managers = [
|
|
# request.user.employee_get.id,
|
|
# ]
|
|
# all_subordinates = Q(
|
|
# **{
|
|
# f"{field}__employee_work_info__reporting_manager_id__in": current_managers
|
|
# }
|
|
# )
|
|
|
|
# while True:
|
|
# sub_managers = queryset.filter(
|
|
# **{
|
|
# f"{field}__employee_work_info__reporting_manager_id__in": current_managers
|
|
# }
|
|
# ).values_list(f"{field}__id", flat=True)
|
|
# if not sub_managers.exists():
|
|
# break
|
|
# current_managers = sub_managers
|
|
# all_subordinates |= Q(
|
|
# **{
|
|
# f"{field}__employee_work_info__reporting_manager_id__in": sub_managers
|
|
# }
|
|
# )
|
|
|
|
# return queryset.filter(all_subordinates)
|
|
|
|
# manager = Employee.objects.filter(employee_user_id=user).first()
|
|
|
|
# if field:
|
|
# filter_expression = f"{field}__employee_work_info__reporting_manager_id"
|
|
# queryset = queryset.filter(**{filter_expression: manager})
|
|
# return queryset
|
|
|
|
# queryset = queryset.filter(
|
|
# employee_id__employee_work_info__reporting_manager_id=manager
|
|
# )
|
|
# return queryset
|
|
|
|
|
|
def filtersubordinates(
|
|
request,
|
|
queryset,
|
|
perm=None,
|
|
field="employee_id",
|
|
nested=NESTED_SUBORDINATE_VISIBILITY,
|
|
):
|
|
"""
|
|
Filters a queryset to include only the current user's subordinates.
|
|
Respects the user's permission: if the user has `perm`, returns full queryset.
|
|
|
|
Args:
|
|
request: HttpRequest
|
|
queryset: Django queryset to filter
|
|
perm: permission codename string
|
|
field: ForeignKey field pointing to Employee (default "employee_id")
|
|
nested: if True, include all nested subordinates; else only direct subordinates
|
|
|
|
Returns:
|
|
Filtered queryset
|
|
"""
|
|
user = request.user
|
|
|
|
if perm and user.has_perm(perm):
|
|
return queryset # User has permission to view all
|
|
|
|
if not hasattr(user, "employee_get") or user.employee_get is None:
|
|
return queryset.none() # No employee associated, return empty
|
|
|
|
# Get subordinate employee IDs
|
|
sub_ids = get_subordinate_employee_ids(request, nested=nested)
|
|
|
|
# Include own records explicitly
|
|
own_id = user.employee_get.id
|
|
|
|
# Build filter
|
|
filter_ids = sub_ids + [own_id] if sub_ids else [own_id]
|
|
|
|
# Return filtered queryset
|
|
return queryset.filter(**{f"{field}__id__in": filter_ids})
|
|
|
|
|
|
def filter_own_records(request, queryset, perm=None):
|
|
"""
|
|
This method is used to filter out subordinates queryset element.
|
|
"""
|
|
user = request.user
|
|
if user.has_perm(perm):
|
|
return queryset
|
|
queryset = queryset.filter(employee_id=request.user.employee_get)
|
|
return queryset
|
|
|
|
|
|
def filter_own_and_subordinate_recordes(request, queryset, perm=None):
|
|
"""
|
|
This method is used to filter out subordinates queryset along with own queryset element.
|
|
"""
|
|
user = request.user
|
|
if user.has_perm(perm):
|
|
return queryset
|
|
queryset = filter_own_records(request, queryset, perm) | filtersubordinates(
|
|
request, queryset, perm
|
|
)
|
|
return queryset
|
|
|
|
|
|
def filtersubordinatesemployeemodel(request, queryset, perm=None):
|
|
"""
|
|
This method is used to filter out all subordinates in the entire reporting chain.
|
|
"""
|
|
user = request.user
|
|
if user.has_perm(perm):
|
|
return queryset
|
|
|
|
if not request:
|
|
return queryset
|
|
|
|
if NESTED_SUBORDINATE_VISIBILITY:
|
|
# Initialize the set of subordinates with the current manager(s)
|
|
current_managers = [
|
|
request.user.employee_get.id,
|
|
]
|
|
all_subordinates = Q(
|
|
employee_work_info__reporting_manager_id__in=current_managers
|
|
)
|
|
|
|
# Iteratively find subordinates in the chain
|
|
while True:
|
|
sub_managers = queryset.filter(
|
|
employee_work_info__reporting_manager_id__in=current_managers
|
|
).values_list("id", flat=True)
|
|
|
|
if not sub_managers.exists():
|
|
break
|
|
|
|
current_managers = sub_managers
|
|
all_subordinates |= Q(
|
|
employee_work_info__reporting_manager_id__in=sub_managers
|
|
)
|
|
|
|
# Apply the filter to the queryset
|
|
return queryset.filter(all_subordinates).distinct()
|
|
|
|
manager = Employee.objects.filter(employee_user_id=user).first()
|
|
queryset = queryset.filter(employee_work_info__reporting_manager_id=manager)
|
|
return queryset
|
|
|
|
|
|
def is_reportingmanager(request):
|
|
"""
|
|
This method is used to check weather the employee is reporting manager or not.
|
|
"""
|
|
try:
|
|
user = request.user
|
|
return user.employee_get.reporting_manager.all().exists()
|
|
except:
|
|
return False
|
|
|
|
|
|
# def choosesubordinates(
|
|
# request,
|
|
# form,
|
|
# perm,
|
|
# ):
|
|
# user = request.user
|
|
# if user.has_perm(perm):
|
|
# return form
|
|
# manager = Employee.objects.filter(employee_user_id=user).first()
|
|
# queryset = Employee.objects.filter(employee_work_info__reporting_manager_id=manager)
|
|
# form.fields["employee_id"].queryset = queryset
|
|
# return form
|
|
|
|
|
|
def choosesubordinates(request, form, perm):
|
|
"""
|
|
Dynamically set subordinate choices for employee field based on permissions
|
|
and nested subordinate visibility.
|
|
"""
|
|
user = request.user
|
|
if user.has_perm(perm):
|
|
return form
|
|
manager = Employee.objects.filter(employee_user_id=user).first()
|
|
if not manager:
|
|
return form
|
|
|
|
# Start with direct subordinates
|
|
current_managers = [manager.id]
|
|
all_subordinates = Q(employee_work_info__reporting_manager_id__in=current_managers)
|
|
|
|
if NESTED_SUBORDINATE_VISIBILITY:
|
|
# Recursively find all subordinates in the chain
|
|
while True:
|
|
sub_managers = Employee.objects.filter(
|
|
employee_work_info__reporting_manager_id__in=current_managers
|
|
).values_list("id", flat=True)
|
|
|
|
if not sub_managers.exists():
|
|
break
|
|
|
|
current_managers = sub_managers
|
|
all_subordinates |= Q(
|
|
employee_work_info__reporting_manager_id__in=sub_managers
|
|
)
|
|
|
|
queryset = Employee.objects.filter(all_subordinates).distinct()
|
|
|
|
# Assign to form field
|
|
if "employee_id" in form.fields:
|
|
form.fields["employee_id"].queryset = queryset
|
|
|
|
return form
|
|
|
|
|
|
def get_subordinate_employee_ids(request, nested=NESTED_SUBORDINATE_VISIBILITY):
|
|
"""
|
|
Returns a list of subordinate Employee IDs under the current user.
|
|
|
|
If nested=True, includes all subordinates recursively across the reporting hierarchy.
|
|
If nested=False, includes only direct subordinates.
|
|
"""
|
|
user = request.user
|
|
if not hasattr(user, "employee_get"):
|
|
return []
|
|
|
|
manager_id = user.employee_get.id
|
|
|
|
if nested:
|
|
# Recursive approach for all levels
|
|
current_managers = [manager_id]
|
|
all_sub_ids = set()
|
|
|
|
while current_managers:
|
|
sub_ids = list(
|
|
Employee.objects.filter(
|
|
employee_work_info__reporting_manager_id__in=current_managers
|
|
).values_list("id", flat=True)
|
|
)
|
|
if not sub_ids:
|
|
break
|
|
all_sub_ids.update(sub_ids)
|
|
current_managers = sub_ids
|
|
|
|
return list(all_sub_ids)
|
|
else:
|
|
# Only direct subordinates
|
|
direct_sub_ids = list(
|
|
Employee.objects.filter(
|
|
employee_work_info__reporting_manager_id=manager_id
|
|
).values_list("id", flat=True)
|
|
)
|
|
return direct_sub_ids
|
|
|
|
|
|
def choosesubordinatesemployeemodel(request, form, perm):
|
|
user = request.user
|
|
if user.has_perm(perm):
|
|
return form
|
|
manager = Employee.objects.filter(employee_user_id=user).first()
|
|
queryset = Employee.objects.filter(employee_work_info__reporting_manager_id=manager)
|
|
|
|
form.fields["employee_id"].queryset = queryset
|
|
return form
|
|
|
|
|
|
orderingList = [
|
|
{
|
|
"id": "",
|
|
"field": "",
|
|
"ordering": "",
|
|
}
|
|
]
|
|
|
|
|
|
def sortby(request, queryset, key):
|
|
"""
|
|
This method is used to sort query set by asc or desc
|
|
"""
|
|
global orderingList
|
|
id = request.user.id
|
|
# here will create dictionary object to the global orderingList if not exists,
|
|
# if exists then method will switch corresponding object ordering.
|
|
filtered_list = [x for x in orderingList if x["id"] == id]
|
|
ordering = filtered_list[0] if filtered_list else None
|
|
if ordering is None:
|
|
ordering = {
|
|
"id": id,
|
|
"field": None,
|
|
"ordering": "-",
|
|
}
|
|
orderingList.append(ordering)
|
|
sortby = request.GET.get(key)
|
|
sort_count = request.GET.getlist(key).count(sortby)
|
|
order = None
|
|
if sortby is not None and sortby != "":
|
|
|
|
field_parts = sortby.split("__")
|
|
|
|
model_meta = queryset.model._meta
|
|
|
|
# here will update the orderingList
|
|
ordering["field"] = sortby
|
|
if sort_count % 2 == 0:
|
|
ordering["ordering"] = "-"
|
|
order = sortby
|
|
else:
|
|
ordering["ordering"] = ""
|
|
order = f"-{sortby}"
|
|
|
|
for part in field_parts:
|
|
field = model_meta.get_field(part)
|
|
if isinstance(field, models.ForeignKey):
|
|
model_meta = field.related_model._meta
|
|
else:
|
|
if isinstance(field, models.CharField):
|
|
queryset = queryset.annotate(lower_title=Lower(sortby))
|
|
queryset = queryset.order_by(f"{ordering['ordering']}lower_title")
|
|
else:
|
|
queryset = queryset.order_by(f'{ordering["ordering"]}{sortby}')
|
|
|
|
orderingList = [item for item in orderingList if item["id"] != id]
|
|
orderingList.append(ordering)
|
|
setattr(request, "sort_option", {})
|
|
request.sort_option["order"] = order
|
|
|
|
return queryset
|
|
|
|
|
|
def random_color_generator():
|
|
r = random.randint(0, 255)
|
|
g = random.randint(0, 255)
|
|
b = random.randint(0, 255)
|
|
if r == g or g == b or b == r:
|
|
random_color_generator()
|
|
return f"rgba({r}, {g}, {b} , 0.7)"
|
|
|
|
|
|
# color_palette=[]
|
|
# Function to generate distinct colors for each object
|
|
def generate_colors(num_colors):
|
|
# Define a color palette with distinct colors
|
|
color_palette = [
|
|
"rgba(255, 99, 132, 1)", # Red
|
|
"rgba(54, 162, 235, 1)", # Blue
|
|
"rgba(255, 206, 86, 1)", # Yellow
|
|
"rgba(75, 192, 192, 1)", # Green
|
|
"rgba(153, 102, 255, 1)", # Purple
|
|
"rgba(255, 159, 64, 1)", # Orange
|
|
]
|
|
|
|
if num_colors > len(color_palette):
|
|
for i in range(num_colors - len(color_palette)):
|
|
color_palette.append(random_color_generator())
|
|
|
|
colors = []
|
|
for i in range(num_colors):
|
|
# color=random_color_generator()
|
|
colors.append(color_palette[i % len(color_palette)])
|
|
|
|
return colors
|
|
|
|
|
|
def get_key_instances(model, data_dict):
|
|
# Get all the models in the Django project
|
|
all_models = apps.get_models()
|
|
|
|
# Initialize a list to store related models include the function argument model as foreignkey
|
|
related_models = []
|
|
|
|
# Iterate through all models
|
|
for other_model in all_models:
|
|
# Iterate through fields of the model
|
|
for field in other_model._meta.fields:
|
|
# Check if the field is a ForeignKey and related to the function argument model
|
|
if isinstance(field, ForeignKey) and field.related_model == model:
|
|
related_models.append(other_model)
|
|
break
|
|
|
|
# Iterate through related models to filter instances
|
|
for related_model in related_models:
|
|
# Get all fields of the related model
|
|
related_model_fields = related_model._meta.get_fields()
|
|
|
|
# Iterate through fields to find ForeignKey fields
|
|
for field in related_model_fields:
|
|
if isinstance(field, ForeignKey):
|
|
# Get the related name and field name
|
|
related_name = field.related_query_name()
|
|
field_name = field.name
|
|
|
|
# Check if the related name exists in data_dict
|
|
if related_name in data_dict:
|
|
# Get the related_id from data_dict
|
|
related_id_list = data_dict[related_name]
|
|
related_id = int(related_id_list[0])
|
|
|
|
# Filter instances based on the field and related_id
|
|
filtered_instance = related_model.objects.filter(
|
|
**{field_name: related_id}
|
|
).first()
|
|
|
|
# Store the filtered instance back in data_dict
|
|
data_dict[related_name] = [str(filtered_instance)]
|
|
|
|
# Get all the fields in the argument model
|
|
model_fields = model._meta.get_fields()
|
|
foreign_key_field_names = [
|
|
field.name
|
|
for field in model_fields
|
|
if isinstance(field, ForeignKey or OneToOneField)
|
|
]
|
|
# Create a list of field names that are present in data_dict
|
|
present_foreign_key_field_names = [
|
|
key for key in foreign_key_field_names if key in data_dict
|
|
]
|
|
|
|
for field_name in present_foreign_key_field_names:
|
|
try:
|
|
# Get the list of integer values from data_dict for the field
|
|
field_values = [int(value) for value in data_dict[field_name]]
|
|
|
|
# Get the related model of the ForeignKey field
|
|
related_model = model._meta.get_field(field_name).remote_field.model
|
|
|
|
# Get the instances of the related model using the field values
|
|
related_instances = related_model.objects.filter(id__in=field_values)
|
|
|
|
# Create a list of string representations of the instances
|
|
related_strings = [str(instance) for instance in related_instances]
|
|
|
|
# Update data_dict with the list of string representations
|
|
data_dict[field_name] = related_strings
|
|
except (ObjectDoesNotExist, ValueError):
|
|
pass
|
|
|
|
# Create a list of field names that are ManyToManyField
|
|
many_to_many_field_names = [
|
|
field.name for field in model_fields if isinstance(field, ManyToManyField)
|
|
]
|
|
# Create a list of field names that are present in data_dict for ManyToManyFields
|
|
present_many_to_many_field_names = [
|
|
key for key in many_to_many_field_names if key in data_dict
|
|
]
|
|
|
|
for field_name in present_many_to_many_field_names:
|
|
try:
|
|
# Get the related model of the ManyToMany field
|
|
related_model = model._meta.get_field(field_name).remote_field.model
|
|
# Get a list of integer values from data_dict for the field
|
|
field_values = [int(value) for value in data_dict[field_name]]
|
|
|
|
# Filter instances of the related model based on the field values
|
|
related_instances = related_model.objects.filter(id__in=field_values)
|
|
|
|
# Update data_dict with the string representations of related instances
|
|
data_dict[field_name] = [str(instance) for instance in related_instances]
|
|
except (ObjectDoesNotExist, ValueError):
|
|
pass
|
|
|
|
nested_fields = [
|
|
key
|
|
for key in data_dict
|
|
if "__" in key and not key.endswith("gte") and not key.endswith("lte")
|
|
]
|
|
for key in nested_fields:
|
|
field_names = key.split("__")
|
|
field_values = data_dict[key]
|
|
if (
|
|
field_values != ["unknown"]
|
|
and field_values != ["true"]
|
|
and field_values != ["false"]
|
|
):
|
|
nested_instance = get_nested_instances(model, field_names, field_values)
|
|
if nested_instance is not None:
|
|
data_dict[key] = nested_instance
|
|
|
|
if "id" in data_dict:
|
|
id = data_dict["id"][0]
|
|
object = model.objects.filter(id=id).first()
|
|
object = str(object)
|
|
del data_dict["id"]
|
|
data_dict["Object"] = [object]
|
|
keys_to_remove = [
|
|
key
|
|
for key, value in data_dict.items()
|
|
if value == ["unknown"]
|
|
or key
|
|
in [
|
|
"sortby",
|
|
"orderby",
|
|
"view",
|
|
"page",
|
|
"group_by",
|
|
"target",
|
|
"rpage",
|
|
"instances_ids",
|
|
"asset_list",
|
|
"vpage",
|
|
"opage",
|
|
"click_id",
|
|
"csrfmiddlewaretoken",
|
|
"assign_sortby",
|
|
"request_sortby",
|
|
"asset_under",
|
|
]
|
|
or "dynamic_page" in key
|
|
]
|
|
if not "search" in data_dict:
|
|
if "search_field" in data_dict:
|
|
del data_dict["search_field"]
|
|
|
|
for key in keys_to_remove:
|
|
del data_dict[key]
|
|
return data_dict
|
|
|
|
|
|
def get_nested_instances(model, field_names, field_values):
|
|
try:
|
|
related_model = model
|
|
for field_name in field_names:
|
|
try:
|
|
related_field = related_model._meta.get_field(field_name)
|
|
except:
|
|
pass
|
|
try:
|
|
related_model = related_field.remote_field.model
|
|
except:
|
|
pass
|
|
object_ids = [int(value) for value in field_values if value != "not_set"]
|
|
related_instances = related_model.objects.filter(id__in=object_ids)
|
|
result = [str(instance) for instance in related_instances]
|
|
if "not_set" in field_values:
|
|
result.insert(0, "not_set")
|
|
return result
|
|
except (ObjectDoesNotExist, ValueError):
|
|
return None
|
|
|
|
|
|
def closest_numbers(numbers: list, input_number: int) -> tuple:
|
|
"""
|
|
This method is used to find previous and next of numbers
|
|
"""
|
|
previous_number = input_number
|
|
next_number = input_number
|
|
try:
|
|
index = numbers.index(input_number)
|
|
if index > 0:
|
|
previous_number = numbers[index - 1]
|
|
else:
|
|
previous_number = numbers[-1]
|
|
if index + 1 == len(numbers):
|
|
next_number = numbers[0]
|
|
elif index < len(numbers):
|
|
next_number = numbers[index + 1]
|
|
else:
|
|
next_number = numbers[0]
|
|
except:
|
|
pass
|
|
return (previous_number, next_number)
|
|
|
|
|
|
def format_export_value(value, employee):
|
|
work_info = EmployeeWorkInformation.objects.filter(employee_id=employee).first()
|
|
time_format = (
|
|
work_info.company_id.time_format
|
|
if work_info and work_info.company_id
|
|
else "HH:mm"
|
|
)
|
|
date_format = (
|
|
work_info.company_id.date_format
|
|
if work_info and work_info.company_id
|
|
else "MMM. D, YYYY"
|
|
)
|
|
|
|
if isinstance(value, time):
|
|
# Convert the string to a datetime.time object
|
|
check_in_time = datetime.strptime(str(value).split(".")[0], "%H:%M:%S").time()
|
|
|
|
# Print the formatted time for each format
|
|
for format_name, format_string in HORILLA_TIME_FORMATS.items():
|
|
if format_name == time_format:
|
|
value = check_in_time.strftime(format_string)
|
|
|
|
elif type(value) == date:
|
|
# Convert the string to a datetime.date object
|
|
start_date = datetime.strptime(str(value), "%Y-%m-%d").date()
|
|
# Print the formatted date for each format
|
|
for format_name, format_string in HORILLA_DATE_FORMATS.items():
|
|
if format_name == date_format:
|
|
value = start_date.strftime(format_string)
|
|
|
|
elif isinstance(value, datetime):
|
|
value = str(value)
|
|
|
|
return value
|
|
|
|
|
|
def export_data(request, model, form_class, filter_class, file_name, perm=None):
|
|
fields_mapping = {
|
|
"male": _("Male"),
|
|
"female": _("Female"),
|
|
"other": _("Other"),
|
|
"draft": _("Draft"),
|
|
"active": _("Active"),
|
|
"expired": _("Expired"),
|
|
"terminated": _("Terminated"),
|
|
"weekly": _("Weekly"),
|
|
"monthly": _("Monthly"),
|
|
"after": _("After"),
|
|
"semi_monthly": _("Semi-Monthly"),
|
|
"hourly": _("Hourly"),
|
|
"daily": _("Daily"),
|
|
"monthly": _("Monthly"),
|
|
"full_day": _("Full Day"),
|
|
"first_half": _("First Half"),
|
|
"second_half": _("Second Half"),
|
|
"requested": _("Requested"),
|
|
"approved": _("Approved"),
|
|
"cancelled": _("Cancelled"),
|
|
"rejected": _("Rejected"),
|
|
"cancelled_and_rejected": _("Cancelled & Rejected"),
|
|
"late_come": _("Late Come"),
|
|
"early_out": _("Early Out"),
|
|
}
|
|
employee = request.user.employee_get
|
|
|
|
selected_columns = []
|
|
today_date = date.today().strftime("%Y-%m-%d")
|
|
file_name = f"{file_name}_{today_date}.xlsx"
|
|
data_export = {}
|
|
|
|
form = form_class()
|
|
model_fields = model._meta.get_fields()
|
|
export_objects = filter_class(request.GET).qs
|
|
if perm:
|
|
export_objects = filtersubordinates(request, export_objects, perm)
|
|
selected_fields = request.GET.getlist("selected_fields")
|
|
|
|
if not selected_fields:
|
|
selected_fields = form.fields["selected_fields"].initial
|
|
ids = request.GET.get("ids")
|
|
id_list = json.loads(ids)
|
|
export_objects = model.objects.filter(id__in=id_list)
|
|
|
|
for field in form.fields["selected_fields"].choices:
|
|
value = field[0]
|
|
key = field[1]
|
|
if value in selected_fields:
|
|
selected_columns.append((value, key))
|
|
|
|
for field_name, verbose_name in selected_columns:
|
|
if field_name in selected_fields:
|
|
data_export[verbose_name] = []
|
|
for obj in export_objects:
|
|
value = obj
|
|
nested_attributes = field_name.split("__")
|
|
for attr in nested_attributes:
|
|
value = getattr(value, attr, None)
|
|
if value is None:
|
|
break
|
|
if value is True:
|
|
value = _("Yes")
|
|
elif value is False:
|
|
value = _("No")
|
|
if value in fields_mapping:
|
|
value = fields_mapping[value]
|
|
if value == "None":
|
|
value = " "
|
|
if field_name == "month":
|
|
value = _(value.title())
|
|
|
|
# Check if the type of 'value' is time
|
|
value = format_export_value(value, employee)
|
|
data_export[verbose_name].append(value)
|
|
|
|
data_frame = pd.DataFrame(data=data_export)
|
|
styled_data_frame = data_frame.style.applymap(
|
|
lambda x: "text-align: center", subset=pd.IndexSlice[:, :]
|
|
)
|
|
|
|
response = HttpResponse(content_type="application/ms-excel")
|
|
response["Content-Disposition"] = f'attachment; filename="{file_name}"'
|
|
|
|
writer = pd.ExcelWriter(response, engine="xlsxwriter")
|
|
styled_data_frame.to_excel(writer, index=False, sheet_name="Sheet1")
|
|
worksheet = writer.sheets["Sheet1"]
|
|
worksheet.set_column("A:Z", 18)
|
|
writer.close()
|
|
|
|
return response
|
|
|
|
|
|
def reload_queryset(fields):
|
|
"""
|
|
Reloads querysets in the form based on active filters and selected company.
|
|
"""
|
|
request = getattr(_thread_locals, "request", None)
|
|
selected_company = request.session.get("selected_company") if request else None
|
|
|
|
recruitment_installed = apps.is_installed("recruitment")
|
|
model_filters = {
|
|
"Employee": {"is_active": True},
|
|
"Candidate": {"is_active": True} if recruitment_installed else None,
|
|
}
|
|
|
|
for field in fields.values():
|
|
if not isinstance(field, ModelChoiceField):
|
|
continue
|
|
|
|
model = field.queryset.model
|
|
model_name = model.__name__
|
|
|
|
if model_name == "Company" and selected_company and selected_company != "all":
|
|
field.queryset = model.objects.filter(id=selected_company)
|
|
elif (filters := model_filters.get(model_name)) is not None:
|
|
field.queryset = model.objects.filter(**filters)
|
|
else:
|
|
field.queryset = model.objects.all()
|
|
|
|
return fields
|
|
|
|
|
|
def check_manager(employee, instance):
|
|
|
|
try:
|
|
if isinstance(instance, Employee):
|
|
return instance.employee_work_info.reporting_manager_id == employee
|
|
return employee == instance.employee_id.employee_work_info.reporting_manager_id
|
|
except:
|
|
return False
|
|
|
|
|
|
def check_owner(employee, instance):
|
|
try:
|
|
if isinstance(instance, Employee):
|
|
return employee == instance
|
|
return employee == instance.employee_id
|
|
except:
|
|
return False
|
|
|
|
|
|
def link_callback(uri, rel):
|
|
"""
|
|
Convert HTML URIs to absolute system paths so xhtml2pdf can access those
|
|
resources
|
|
"""
|
|
if not uri.startswith("/static"):
|
|
return uri
|
|
uri = "payroll/fonts/Poppins_Regular.ttf"
|
|
result = finders.find(uri)
|
|
if result:
|
|
if not isinstance(result, (list, tuple)):
|
|
result = [result]
|
|
|
|
result = list(os.path.realpath(path) for path in result)
|
|
path = result[0]
|
|
|
|
else:
|
|
sUrl = settings.STATIC_URL
|
|
sRoot = settings.STATIC_ROOT
|
|
mUrl = settings.MEDIA_URL
|
|
mRoot = settings.MEDIA_ROOT
|
|
|
|
if uri.startswith(sUrl):
|
|
path = os.path.join(sRoot, uri.replace(sUrl, ""))
|
|
else:
|
|
return uri
|
|
|
|
if os.name == "nt":
|
|
return uri
|
|
|
|
if not os.path.isfile(path):
|
|
raise RuntimeError("media URI must start with %s or %s" % (sUrl, mUrl))
|
|
return path
|
|
|
|
|
|
# def generate_pdf(template_path, context, path=True, title=None, html=True):
|
|
# template_path = template_path
|
|
# context_data = context
|
|
# title = (
|
|
# f"""{context_data.get("employee")}'s payslip for {context_data.get("range")}.pdf"""
|
|
# if not title
|
|
# else title
|
|
# )
|
|
# response = HttpResponse(content_type="application/pdf")
|
|
# response["Content-Disposition"] = f"attachment; filename={title}"
|
|
|
|
# if html:
|
|
# html = template_path
|
|
# else:
|
|
# template = get_template(template_path)
|
|
# html = template.render(context_data)
|
|
|
|
# pisa_status = pisa.CreatePDF(
|
|
# html.encode("utf-8"),
|
|
# dest=response,
|
|
# link_callback=link_callback,
|
|
# )
|
|
|
|
# if pisa_status.err:
|
|
# return HttpResponse("We had some errors <pre>" + html + "</pre>")
|
|
|
|
# return response
|
|
|
|
|
|
def generate_pdf(template_path, context, path=True, title=None, html=True):
|
|
title = "Document" if not title else title
|
|
|
|
if html:
|
|
html = template_path
|
|
else:
|
|
html = render_to_string(template_path, context)
|
|
|
|
response = template_pdf(template=html, html=True, filename=title)
|
|
|
|
return response
|
|
|
|
|
|
def get_pagination():
|
|
from horilla.horilla_middlewares import _thread_locals
|
|
|
|
request = getattr(_thread_locals, "request", None)
|
|
user = request.user
|
|
page = DynamicPagination.objects.filter(user_id=user).first()
|
|
count = 20
|
|
if page:
|
|
count = page.pagination
|
|
return count
|
|
|
|
|
|
def paginator_qry(queryset, page_number):
|
|
"""
|
|
Common paginator method
|
|
"""
|
|
paginator = Paginator(queryset, get_pagination())
|
|
queryset = paginator.get_page(page_number)
|
|
return queryset
|
|
|
|
|
|
def is_holiday(date):
|
|
"""
|
|
Check if the given date is a holiday.
|
|
Args:
|
|
date (datetime.date): The date to check.
|
|
Returns:
|
|
Holidays or bool: The Holidays object if the date is a holiday, otherwise False.
|
|
"""
|
|
# Get holidays that either match the exact date range or are recurring
|
|
holiday = Holidays.objects.filter(
|
|
Q(start_date__lte=date, end_date__gte=date)
|
|
| Q(recurring=True, start_date__month=date.month, start_date__day=date.day)
|
|
).first()
|
|
return holiday if holiday else False
|
|
|
|
|
|
def is_company_leave(input_date):
|
|
"""
|
|
Check if the given date is a company leave.
|
|
Args:
|
|
input_date (datetime.date): The date to check.
|
|
Returns:
|
|
CompanyLeaves or bool: The CompanyLeaves object if the date is a company leave, otherwise False.
|
|
"""
|
|
# Calculate the week number within the month (0-4) and weekday (0 for Monday to 6 for Sunday)
|
|
first_day_of_month = input_date.replace(day=1)
|
|
adjusted_day = (
|
|
input_date.day + first_day_of_month.weekday()
|
|
) # Adjust day based on first day of the month
|
|
# Calculate the week number (0-based)
|
|
date_week_no = (adjusted_day - 1) // 7
|
|
# Get weekday (0 for Monday to 6 for Sunday)
|
|
date_week_day = input_date.weekday()
|
|
|
|
# Query for company leaves that match the week number and weekday
|
|
company_leave = CompanyLeaves.objects.filter(
|
|
Q(
|
|
based_on_week=None, based_on_week_day=date_week_day
|
|
) # Match week-independent leaves
|
|
| Q(
|
|
based_on_week=date_week_no, based_on_week_day=date_week_day
|
|
) # Match specific week and weekday
|
|
).first()
|
|
|
|
return company_leave if company_leave else False
|
|
|
|
|
|
def get_date_range(start_date, end_date):
|
|
"""
|
|
Returns a list of all dates within a given date range.
|
|
|
|
Args:
|
|
start_date (date): The start date of the range.
|
|
end_date (date): The end date of the range.
|
|
|
|
Returns:
|
|
list: A list of date objects representing all dates within the range.
|
|
|
|
Example:
|
|
start_date = date(2023, 1, 1)
|
|
end_date = date(2023, 1, 10)
|
|
date_range = get_date_range(start_date, end_date)
|
|
|
|
"""
|
|
date_list = []
|
|
delta = end_date - start_date
|
|
|
|
for i in range(delta.days + 1):
|
|
current_date = start_date + timedelta(days=i)
|
|
date_list.append(current_date)
|
|
return date_list
|
|
|
|
|
|
def get_holiday_dates(range_start: date, range_end: date) -> list:
|
|
"""
|
|
:return: this functions returns a list of all holiday dates.
|
|
"""
|
|
pay_range_dates = get_date_range(start_date=range_start, end_date=range_end)
|
|
query = Q()
|
|
for check_date in pay_range_dates:
|
|
query |= Q(start_date__lte=check_date, end_date__gte=check_date)
|
|
holidays = Holidays.objects.filter(query)
|
|
holiday_dates = set([])
|
|
for holiday in holidays:
|
|
holiday_dates = holiday_dates | (
|
|
set(
|
|
get_date_range(start_date=holiday.start_date, end_date=holiday.end_date)
|
|
)
|
|
)
|
|
return list(set(holiday_dates))
|
|
|
|
|
|
def get_company_leave_dates(year):
|
|
"""
|
|
:return: This function returns a list of all company leave dates
|
|
"""
|
|
company_leaves = CompanyLeaves.objects.all()
|
|
company_leave_dates = []
|
|
for company_leave in company_leaves:
|
|
based_on_week = company_leave.based_on_week
|
|
based_on_week_day = company_leave.based_on_week_day
|
|
for month in range(1, 13):
|
|
if based_on_week is not None:
|
|
# Set Sunday as the first day of the week
|
|
calendar.setfirstweekday(6)
|
|
month_calendar = calendar.monthcalendar(year, month)
|
|
weeks = month_calendar[int(based_on_week)]
|
|
weekdays_in_weeks = [day for day in weeks if day != 0]
|
|
for day in weekdays_in_weeks:
|
|
leave_date = datetime.strptime(
|
|
f"{year}-{month:02}-{day:02}", "%Y-%m-%d"
|
|
).date()
|
|
if (
|
|
leave_date.weekday() == int(based_on_week_day)
|
|
and leave_date not in company_leave_dates
|
|
):
|
|
company_leave_dates.append(leave_date)
|
|
else:
|
|
# Set Monday as the first day of the week
|
|
calendar.setfirstweekday(0)
|
|
month_calendar = calendar.monthcalendar(year, month)
|
|
for week in month_calendar:
|
|
if week[int(based_on_week_day)] != 0:
|
|
leave_date = datetime.strptime(
|
|
f"{year}-{month:02}-{week[int(based_on_week_day)]:02}",
|
|
"%Y-%m-%d",
|
|
).date()
|
|
if leave_date not in company_leave_dates:
|
|
company_leave_dates.append(leave_date)
|
|
return company_leave_dates
|
|
|
|
|
|
def get_working_days(start_date, end_date):
|
|
"""
|
|
This method is used to calculate the total working days, total leave, worked days on that period
|
|
|
|
Args:
|
|
start_date (_type_): the start date from the data needed
|
|
end_date (_type_): the end date till the date needed
|
|
"""
|
|
|
|
holiday_dates = get_holiday_dates(start_date, end_date)
|
|
|
|
# appending company/holiday leaves
|
|
# Note: Duplicate entry may exist
|
|
company_leave_dates = (
|
|
list(
|
|
set(
|
|
get_company_leave_dates(start_date.year)
|
|
+ get_company_leave_dates(end_date.year)
|
|
)
|
|
)
|
|
+ holiday_dates
|
|
)
|
|
|
|
date_range = get_date_range(start_date, end_date)
|
|
|
|
# making unique list of company/holiday leave dates then filtering
|
|
# the leave dates only between the start and end date
|
|
company_leave_dates = [
|
|
date
|
|
for date in list(set(company_leave_dates))
|
|
if start_date <= date <= end_date
|
|
]
|
|
|
|
working_days_between_ranges = list(set(date_range) - set(company_leave_dates))
|
|
total_working_days = len(working_days_between_ranges)
|
|
|
|
return {
|
|
# Total working days on that period
|
|
"total_working_days": total_working_days,
|
|
# All the working dates between the start and end date
|
|
"working_days_on": working_days_between_ranges,
|
|
# All the company/holiday leave dates between the range
|
|
"company_leave_dates": company_leave_dates,
|
|
}
|
|
|
|
|
|
def get_next_month_same_date(date_obj):
|
|
date_copy = date_obj
|
|
month = date_obj.month + 1
|
|
year = date_obj.year
|
|
if month > 12:
|
|
month = 1
|
|
year = year + 1
|
|
day = date_copy.day
|
|
total_days_in_month = calendar.monthrange(year, month)[1]
|
|
day = min(day, total_days_in_month)
|
|
return date(day=day, month=month, year=year)
|
|
|
|
|
|
def get_subordinates(request):
|
|
"""
|
|
This method is used to filter out subordinates queryset element.
|
|
"""
|
|
user = request.user.employee_get
|
|
subordinates = Employee.objects.filter(
|
|
employee_work_info__reporting_manager_id=user
|
|
)
|
|
return subordinates
|
|
|
|
|
|
def format_date(date_str):
|
|
# List of possible date formats to try
|
|
|
|
for format_name, format_string in HORILLA_DATE_FORMATS.items():
|
|
try:
|
|
return datetime.strptime(date_str, format_string).strftime("%Y-%m-%d")
|
|
except ValueError:
|
|
continue
|
|
raise ValueError(f"Invalid date format: {date_str}")
|
|
|
|
|
|
def eval_validate(value):
|
|
"""
|
|
Method to validate the dynamic value
|
|
"""
|
|
value = ast.literal_eval(value)
|
|
return value
|
|
|
|
|
|
def template_pdf(template, context={}, html=False, filename="payslip.pdf"):
|
|
"""
|
|
Generate a PDF file from an HTML template and context data.
|
|
|
|
Args:
|
|
template_path (str): The path to the HTML template.
|
|
context (dict): The context data to render the template.
|
|
html (bool): If True, return raw HTML instead of a PDF.
|
|
|
|
Returns:
|
|
HttpResponse: A response with the generated PDF file or raw HTML.
|
|
"""
|
|
try:
|
|
bootstrap_css = '<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">'
|
|
html_content = f"{bootstrap_css}\n{template}"
|
|
|
|
pdf_options = {
|
|
"page-size": "A4",
|
|
"margin-top": "10mm",
|
|
"margin-bottom": "10mm",
|
|
"margin-left": "10mm",
|
|
"margin-right": "10mm",
|
|
"encoding": "UTF-8",
|
|
"enable-local-file-access": None,
|
|
"dpi": 300,
|
|
"zoom": 1.3,
|
|
"footer-center": "[page]/[topage]",
|
|
}
|
|
|
|
pdf = pdfkit.from_string(html_content, False, options=pdf_options)
|
|
|
|
response = HttpResponse(pdf, content_type="application/pdf")
|
|
response["Content-Disposition"] = f"inline; filename={filename}"
|
|
return response
|
|
except Exception as e:
|
|
return HttpResponse(f"Error generating PDF: {str(e)}", status=500)
|
|
|
|
|
|
def generate_otp():
|
|
"""
|
|
Function to generate a random 6-digit OTP (One-Time Password).
|
|
Returns:
|
|
str: A 6-digit random OTP as a string.
|
|
"""
|
|
return str(random.randint(100000, 999999))
|