Files
ihrm/base/methods.py
2024-02-17 14:38:13 +05:30

625 lines
22 KiB
Python

from datetime import date, datetime, time
import io
import json
import random
from django.apps import apps
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import ForeignKey, ManyToManyField, OneToOneField
from django.forms.models import ModelChoiceField
from django.http import HttpResponse
from django.template.loader import render_to_string
from django.utils.translation import gettext as _
import pandas as pd
from xhtml2pdf import pisa
from base.models import Company, DynamicPagination
from employee.models import Employee, EmployeeWorkInformation
from horilla.decorators import login_required
from leave.models import LeaveRequest, LeaveRequestConditionApproval
def filtersubordinates(request, queryset, perm=None, field=None):
"""
This method is used to filter out subordinates queryset element.
"""
user = request.user
if user.has_perm(perm):
return queryset
manager = Employee.objects.filter(employee_user_id=user).first()
if field:
filter_expression = f"{field}__employee_work_info__reporting_manager_id"
queryset = queryset.filter(**{filter_expression: manager})
return queryset
queryset = queryset.filter(
employee_id__employee_work_info__reporting_manager_id=manager
)
return queryset
def filter_own_records(request, queryset, perm=None):
"""
This method is used to filter out subordinates queryset element.
"""
user = request.user
if user.has_perm(perm):
return queryset
queryset = queryset.filter(employee_id=request.user.employee_get)
return queryset
def filter_own_and_subordinate_recordes(request, queryset, perm=None):
"""
This method is used to filter out subordinates queryset along with own queryset element.
"""
user = request.user
if user.has_perm(perm):
return queryset
queryset = filter_own_records(request, queryset, perm) | filtersubordinates(
request, queryset, perm
)
return queryset
def filtersubordinatesemployeemodel(request, queryset, perm=None):
"""
This method is used to filter out subordinates queryset element.
"""
user = request.user
if user.has_perm(perm):
return queryset
manager = Employee.objects.filter(employee_user_id=user).first()
queryset = queryset.filter(employee_work_info__reporting_manager_id=manager)
return queryset
def is_reportingmanager(request):
"""
This method is used to check weather the employee is reporting manager or not.
"""
try:
user = request.user
return user.employee_get.reporting_manager.all().exists()
except:
return False
def choosesubordinates(
request,
form,
perm,
):
user = request.user
if user.has_perm(perm):
return form
manager = Employee.objects.filter(employee_user_id=user).first()
queryset = Employee.objects.filter(employee_work_info__reporting_manager_id=manager)
form.fields["employee_id"].queryset = queryset
return form
def choosesubordinatesemployeemodel(request, form, perm):
user = request.user
if user.has_perm(perm):
return form
manager = Employee.objects.filter(employee_user_id=user).first()
queryset = Employee.objects.filter(employee_work_info__reporting_manager_id=manager)
form.fields["employee_id"].queryset = queryset
return form
orderingList = [
{
"id": "",
"field": "",
"ordering": "",
}
]
def sortby(request, queryset, key):
"""
This method is used to sort query set by asc or desc
"""
global orderingList
id = request.user.id
# here will create dictionary object to the global orderingList if not exists,
# if exists then method will switch corresponding object ordering.
filtered_list = [x for x in orderingList if x["id"] == id]
ordering = filtered_list[0] if filtered_list else None
if ordering is None:
ordering = {
"id": id,
"field": None,
"ordering": "-",
}
orderingList.append(ordering)
sortby = request.GET.get(key)
if sortby is not None and sortby != "":
# here will update the orderingList
ordering["field"] = sortby
queryset = queryset.order_by(f'{ordering["ordering"]}{sortby}')
if ordering["ordering"] == "-":
ordering["ordering"] = ""
else:
ordering["ordering"] = "-"
orderingList = [item for item in orderingList if item["id"] != id]
orderingList.append(ordering)
order = f'{ordering["ordering"]}{sortby}'
setattr(request, "sort_option", {})
request.sort_option["order"] = order
return queryset
def random_color_generator():
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
if r == g or g == b or b == r:
random_color_generator()
return f"rgba({r}, {g}, {b} , 0.7)"
# color_palette=[]
# Function to generate distinct colors for each object
def generate_colors(num_colors):
# Define a color palette with distinct colors
color_palette = [
"rgba(255, 99, 132, 1)", # Red
"rgba(54, 162, 235, 1)", # Blue
"rgba(255, 206, 86, 1)", # Yellow
"rgba(75, 192, 192, 1)", # Green
"rgba(153, 102, 255, 1)", # Purple
"rgba(255, 159, 64, 1)", # Orange
]
if num_colors > len(color_palette):
for i in range(num_colors - len(color_palette)):
color_palette.append(random_color_generator())
colors = []
for i in range(num_colors):
# color=random_color_generator()
colors.append(color_palette[i % len(color_palette)])
return colors
def get_key_instances(model, data_dict):
# Get all the models in the Django project
all_models = apps.get_models()
# Initialize a list to store related models include the function argument model as foreignkey
related_models = []
# Iterate through all models
for other_model in all_models:
# Iterate through fields of the model
for field in other_model._meta.fields:
# Check if the field is a ForeignKey and related to the function argument model
if isinstance(field, ForeignKey) and field.related_model == model:
related_models.append(other_model)
break
# Iterate through related models to filter instances
for related_model in related_models:
# Get all fields of the related model
related_model_fields = related_model._meta.get_fields()
# Iterate through fields to find ForeignKey fields
for field in related_model_fields:
if isinstance(field, ForeignKey):
# Get the related name and field name
related_name = field.related_query_name()
field_name = field.name
# Check if the related name exists in data_dict
if related_name in data_dict:
# Get the related_id from data_dict
related_id_list = data_dict[related_name]
related_id = int(related_id_list[0])
# Filter instances based on the field and related_id
filtered_instance = related_model.objects.filter(
**{field_name: related_id}
).first()
# Store the filtered instance back in data_dict
data_dict[related_name] = [str(filtered_instance)]
# Get all the fields in the argument model
model_fields = model._meta.get_fields()
foreign_key_field_names = [
field.name
for field in model_fields
if isinstance(field, ForeignKey or OneToOneField)
]
# Create a list of field names that are present in data_dict
present_foreign_key_field_names = [
key for key in foreign_key_field_names if key in data_dict
]
for field_name in present_foreign_key_field_names:
try:
# Get the list of integer values from data_dict for the field
field_values = [int(value) for value in data_dict[field_name]]
# Get the related model of the ForeignKey field
related_model = model._meta.get_field(field_name).remote_field.model
# Get the instances of the related model using the field values
related_instances = related_model.objects.filter(id__in=field_values)
# Create a list of string representations of the instances
related_strings = [str(instance) for instance in related_instances]
# Update data_dict with the list of string representations
data_dict[field_name] = related_strings
except (ObjectDoesNotExist, ValueError):
pass
# Create a list of field names that are ManyToManyField
many_to_many_field_names = [
field.name for field in model_fields if isinstance(field, ManyToManyField)
]
# Create a list of field names that are present in data_dict for ManyToManyFields
present_many_to_many_field_names = [
key for key in many_to_many_field_names if key in data_dict
]
for field_name in present_many_to_many_field_names:
try:
# Get the related model of the ManyToMany field
related_model = model._meta.get_field(field_name).remote_field.model
# Get a list of integer values from data_dict for the field
field_values = [int(value) for value in data_dict[field_name]]
# Filter instances of the related model based on the field values
related_instances = related_model.objects.filter(id__in=field_values)
# Update data_dict with the string representations of related instances
data_dict[field_name] = [str(instance) for instance in related_instances]
except (ObjectDoesNotExist, ValueError):
pass
nested_fields = [
key
for key in data_dict
if "__" in key and not key.endswith("gte") and not key.endswith("lte")
]
for key in nested_fields:
field_names = key.split("__")
field_values = data_dict[key]
if (
field_values != ["unknown"]
and field_values != ["true"]
and field_values != ["false"]
):
nested_instance = get_nested_instances(model, field_names, field_values)
if nested_instance is not None:
data_dict[key] = nested_instance
if "csrfmiddlewaretoken" in data_dict:
del data_dict["csrfmiddlewaretoken"]
if "vpage" in data_dict:
del data_dict["vpage"]
if "id" in data_dict:
id = data_dict["id"][0]
object = model.objects.filter(id=id).first()
object = str(object)
del data_dict["id"]
data_dict["Object"] = [object]
if "sortby" in data_dict:
del data_dict["sortby"]
keys_to_remove = [
key
for key, value in data_dict.items()
if value == ["unknown"]
or key in ["sortby", "orderby", "view", "page", "group_by", "target",'rpage']
or "dynamic_page" in key
]
for key in keys_to_remove:
del data_dict[key]
return data_dict
def get_nested_instances(model, field_names, field_values):
try:
related_model = model
for field_name in field_names:
try:
related_field = related_model._meta.get_field(field_name)
except:
pass
try:
related_model = related_field.remote_field.model
except:
pass
object_ids = [int(value) for value in field_values if value != "not_set"]
related_instances = related_model.objects.filter(id__in=object_ids)
result = [str(instance) for instance in related_instances]
if "not_set" in field_values:
result.insert(0, "not_set")
return result
except (ObjectDoesNotExist, ValueError):
return None
def closest_numbers(numbers: list, input_number: int) -> tuple:
"""
This method is used to find previous and next of numbers
"""
previous_number = input_number
next_number = input_number
try:
index = numbers.index(input_number)
if index > 0:
previous_number = numbers[index - 1]
else:
previous_number = numbers[-1]
if index + 1 == len(numbers):
next_number = numbers[0]
elif index < len(numbers):
next_number = numbers[index + 1]
else:
next_number = numbers[0]
except:
pass
return (previous_number, next_number)
@login_required
def export_data(request, model, form_class, filter_class, file_name):
fields_mapping = {
"male": _("Male"),
"female": _("Female"),
"other": _("Other"),
"draft": _("Draft"),
"active": _("Active"),
"expired": _("Expired"),
"terminated": _("Terminated"),
"weekly": _("Weekly"),
"monthly": _("Monthly"),
"after": _("After"),
"semi_monthly": _("Semi-Monthly"),
"hourly": _("Hourly"),
"daily": _("Daily"),
"monthly": _("Monthly"),
"full_day": _("Full Day"),
"first_half": _("First Half"),
"second_half": _("Second Half"),
"requested": _("Requested"),
"approved": _("Approved"),
"cancelled": _("Cancelled"),
"rejected": _("Rejected"),
"cancelled_and_rejected": _("Cancelled & Rejected"),
"late_come": _("Late Come"),
"early_out": _("Early Out"),
}
selected_columns = []
today_date = date.today().strftime("%Y-%m-%d")
file_name = f"{file_name}_{today_date}.xlsx"
data_export = {}
form = form_class()
model_fields = model._meta.get_fields()
export_objects = filter_class(request.GET).qs
selected_fields = request.GET.getlist("selected_fields")
if not selected_fields:
selected_fields = form.fields["selected_fields"].initial
ids = request.GET.get("ids")
id_list = json.loads(ids)
export_objects = model.objects.filter(id__in=id_list)
for field in form.fields["selected_fields"].choices:
value = field[0]
key = field[1]
if value in selected_fields:
selected_columns.append((value, key))
for field_name, verbose_name in selected_columns:
if field_name in selected_fields:
data_export[verbose_name] = []
for obj in export_objects:
value = obj
nested_attributes = field_name.split("__")
for attr in nested_attributes:
value = getattr(value, attr, None)
if value is None:
break
if value is True:
value = _("Yes")
elif value is False:
value = _("No")
if value in fields_mapping:
value = fields_mapping[value]
if value == "None":
value = " "
if field_name == "month":
value = _(value.title())
# Check if the type of 'value' is time
if isinstance(value, time):
user = request.user
employee = user.employee_get
# Taking the company_name of the user
info = EmployeeWorkInformation.objects.filter(employee_id=employee)
if info.exists():
for data in info:
employee_company = data.company_id
company_name = Company.objects.filter(id=employee_company.id)
emp_company = company_name.first()
# Access the date_format attribute directly
time_format = (
emp_company.time_format if emp_company else "hh:mm A"
)
else:
time_format = "hh:mm A"
time_formats = {
"hh:mm A": "%I:%M %p", # 12-hour format
"HH:mm": "%H:%M", # 24-hour format
}
# Convert the string to a datetime.time object
check_in_time = datetime.strptime(str(value), "%H:%M:%S").time()
# Print the formatted time for each format
for format_name, format_string in time_formats.items():
if format_name == time_format:
value = check_in_time.strftime(format_string)
# Check if the type of 'value' is date
if type(value) == date:
user = request.user
employee = user.employee_get
# Taking the company_name of the user
info = EmployeeWorkInformation.objects.filter(employee_id=employee)
if info.exists():
for data in info:
employee_company = data.company_id
company_name = Company.objects.filter(company=employee_company)
emp_company = company_name.first()
# Access the date_format attribute directly
date_format = (
emp_company.date_format if emp_company else "MMM. D, YYYY"
)
else:
date_format = "MMM. D, YYYY"
# Define date formats
date_formats = {
"DD-MM-YYYY": "%d-%m-%Y",
"DD.MM.YYYY": "%d.%m.%Y",
"DD/MM/YYYY": "%d/%m/%Y",
"MM/DD/YYYY": "%m/%d/%Y",
"YYYY-MM-DD": "%Y-%m-%d",
"YYYY/MM/DD": "%Y/%m/%d",
"MMMM D, YYYY": "%B %d, %Y",
"DD MMMM, YYYY": "%d %B, %Y",
"MMM. D, YYYY": "%b. %d, %Y",
"D MMM. YYYY": "%d %b. %Y",
"dddd, MMMM D, YYYY": "%A, %B %d, %Y",
}
# Convert the string to a datetime.date object
start_date = datetime.strptime(str(value), "%Y-%m-%d").date()
# Print the formatted date for each format
for format_name, format_string in date_formats.items():
if format_name == date_format:
value = start_date.strftime(format_string)
data_export[verbose_name].append(value)
data_frame = pd.DataFrame(data=data_export)
styled_data_frame = data_frame.style.applymap(
lambda x: "text-align: center", subset=pd.IndexSlice[:, :]
)
response = HttpResponse(content_type="application/ms-excel")
response["Content-Disposition"] = f'attachment; filename="{file_name}"'
writer = pd.ExcelWriter(response, engine="xlsxwriter")
styled_data_frame.to_excel(writer, index=False, sheet_name="Sheet1")
worksheet = writer.sheets["Sheet1"]
worksheet.set_column("A:Z", 18)
writer.close()
return response
def reload_queryset(fields):
"""
This method is used to reload the querysets in the form
"""
for k, v in fields.items():
if isinstance(v, ModelChoiceField):
if v.queryset.model == Employee:
v.queryset = v.queryset.model.objects.filter(is_active=True)
else:
v.queryset = v.queryset.model.objects.all()
return
def check_manager(employee, instance):
try:
if isinstance(instance, Employee):
return instance.employee_work_info.reporting_manager_id == employee
return employee == instance.employee_id.employee_work_info.reporting_manager_id
except:
return False
def check_owner(employee, instance):
try:
if isinstance(instance, Employee):
return employee == instance
return employee == instance.employee_id
except:
return False
def generate_pdf(template_path, context, path=True, title=None):
html = template_path
title = (
f"""{context.get("employee")}'s payslip for {context.get("range")}.pdf"""
if not title
else title
)
if path:
html = render_to_string(template_path, context)
result = io.BytesIO()
pdf = pisa.pisaDocument(io.BytesIO(html.encode("utf-8")), result)
if not pdf.err:
response = HttpResponse(result.getvalue(), content_type="application/pdf")
response["Content-Disposition"] = f'''attachment;filename="{title}"'''
return response
return None
def filter_conditional_leave_request(request):
approval_manager = Employee.objects.filter(employee_user_id=request.user).first()
leave_request_ids = []
multiple_approval_requests = LeaveRequestConditionApproval.objects.filter(
manager_id=approval_manager
)
for instance in multiple_approval_requests:
if instance.sequence > 1:
pre_sequence = instance.sequence - 1
leave_request_id = instance.leave_request_id
instance = LeaveRequestConditionApproval.objects.filter(
leave_request_id=leave_request_id, sequence=pre_sequence
).first()
if instance.is_approved:
leave_request_ids.append(instance.leave_request_id.id)
else:
leave_request_ids.append(instance.leave_request_id.id)
return LeaveRequest.objects.filter(pk__in=leave_request_ids)
def get_pagination():
from base.thread_local_middleware import _thread_locals
request = getattr(_thread_locals, "request", None)
user = request.user
page = DynamicPagination.objects.filter(user_id=user).first()
count = 50
if page:
count = page.pagination
return count