Files
ihrm/payroll/methods/methods.py
Ashwanth Balakrishnan 58be33a8d7 Added Pre-Commit Hooks (#175)
* Added pre commit hook

* Run pre commit hook on all files

---------

Co-authored-by: Horilla <131998600+horilla-opensource@users.noreply.github.com>
2024-05-07 12:23:36 +05:30

642 lines
21 KiB
Python

"""
methods.py
Payroll related module to write custom calculation methods
"""
import calendar
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
from django.core.paginator import Paginator
from django.db.models import F, Q
from attendance.models import Attendance
from base.methods import get_pagination
from leave.models import CompanyLeave, Holiday
from payroll.models.models import Contract, Deduction, Payslip
def get_holiday_dates(range_start: date, range_end: date) -> list:
"""
:return: this functions returns a list of all holiday dates.
"""
pay_range_dates = get_date_range(start_date=range_start, end_date=range_end)
query = Q()
for check_date in pay_range_dates:
query |= Q(start_date__lte=check_date, end_date__gte=check_date)
holidays = Holiday.objects.filter(query)
holiday_dates = set([])
for holiday in holidays:
holiday_dates = holiday_dates | (
set(
get_date_range(start_date=holiday.start_date, end_date=holiday.end_date)
)
)
return list(set(holiday_dates))
def get_company_leave_dates(year):
"""
:return: This function returns a list of all company leave dates
"""
company_leaves = CompanyLeave.objects.all()
company_leave_dates = []
for company_leave in company_leaves:
based_on_week = company_leave.based_on_week
based_on_week_day = company_leave.based_on_week_day
for month in range(1, 13):
if based_on_week is not None:
# Set Sunday as the first day of the week
calendar.setfirstweekday(6)
month_calendar = calendar.monthcalendar(year, month)
weeks = month_calendar[int(based_on_week)]
weekdays_in_weeks = [day for day in weeks if day != 0]
for day in weekdays_in_weeks:
leave_date = datetime.strptime(
f"{year}-{month:02}-{day:02}", "%Y-%m-%d"
).date()
if (
leave_date.weekday() == int(based_on_week_day)
and leave_date not in company_leave_dates
):
company_leave_dates.append(leave_date)
else:
# Set Monday as the first day of the week
calendar.setfirstweekday(0)
month_calendar = calendar.monthcalendar(year, month)
for week in month_calendar:
if week[int(based_on_week_day)] != 0:
leave_date = datetime.strptime(
f"{year}-{month:02}-{week[int(based_on_week_day)]:02}",
"%Y-%m-%d",
).date()
if leave_date not in company_leave_dates:
company_leave_dates.append(leave_date)
return company_leave_dates
def get_date_range(start_date, end_date):
"""
Returns a list of all dates within a given date range.
Args:
start_date (date): The start date of the range.
end_date (date): The end date of the range.
Returns:
list: A list of date objects representing all dates within the range.
Example:
start_date = date(2023, 1, 1)
end_date = date(2023, 1, 10)
date_range = get_date_range(start_date, end_date)
for date_obj in date_range:
print(date_obj)
"""
date_list = []
delta = end_date - start_date
for i in range(delta.days + 1):
current_date = start_date + timedelta(days=i)
date_list.append(current_date)
return date_list
def get_total_days(start_date, end_date):
"""
Calculates the total number of days in a given period.
Args:
start_date (date): The start date of the period.
end_date (date): The end date of the period.
Returns:
int: The total number of days in the period, including the end date.
Example:
start_date = date(2023, 1, 1)
end_date = date(2023, 1, 10)
days_on_period = get_total_days(start_date, end_date)
print(days_on_period) # Output: 10
"""
delta = end_date - start_date
total_days = delta.days + 1 # Add 1 to include the end date itself
return total_days
def get_working_days(start_date, end_date):
"""
This method is used to calculate the total working days, total leave, worked days on that period
Args:
start_date (_type_): the start date from the data needed
end_date (_type_): the end date till the date needed
"""
holiday_dates = get_holiday_dates(start_date, end_date)
# appending company/holiday leaves
# Note: Duplicate entry may exist
company_leave_dates = (
list(
set(
get_company_leave_dates(start_date.year)
+ get_company_leave_dates(end_date.year)
)
)
+ holiday_dates
)
date_range = get_date_range(start_date, end_date)
# making unique list of company/holiday leave dates then filtering
# the leave dates only between the start and end date
company_leave_dates = [
date
for date in list(set(company_leave_dates))
if start_date <= date <= end_date
]
working_days_between_ranges = list(set(date_range) - set(company_leave_dates))
total_working_days = len(working_days_between_ranges)
return {
# Total working days on that period
"total_working_days": total_working_days,
# All the working dates between the start and end date
"working_days_on": working_days_between_ranges,
# All the company/holiday leave dates between the range
"company_leave_dates": company_leave_dates,
}
def get_leaves(employee, start_date, end_date):
"""
This method is used to return all the leaves taken by the employee
between the period.
Args:
employee (obj): Employee model instance
start_date (obj): the start date from the data needed
end_date (obj): the end date till the date needed
"""
approved_leaves = employee.leaverequest_set.filter(status="approved")
paid_leave = 0
unpaid_leave = 0
paid_half = 0
unpaid_half = 0
paid_leave_dates = []
unpaid_leave_dates = []
company_leave_dates = get_working_days(start_date, end_date)["company_leave_dates"]
if approved_leaves.exists():
for instance in approved_leaves:
if instance.leave_type_id.payment == "paid":
# if the taken leave is paid
# for the start date
all_the_paid_leave_taken_dates = instance.requested_dates()
paid_leave_dates = paid_leave_dates + [
date
for date in all_the_paid_leave_taken_dates
if start_date <= date <= end_date
]
else:
# if the taken leave is unpaid
# for the start date
all_unpaid_leave_taken_dates = instance.requested_dates()
unpaid_leave_dates = unpaid_leave_dates + [
date
for date in all_unpaid_leave_taken_dates
if start_date <= date <= end_date
]
half_day_data = find_half_day_leaves()
unpaid_half = half_day_data["half_unpaid_leaves"]
paid_half = half_day_data["half_paid_leaves"]
paid_leave_dates = list(set(paid_leave_dates) - set(company_leave_dates))
unpaid_leave_dates = list(set(unpaid_leave_dates) - set(company_leave_dates))
paid_leave = len(paid_leave_dates) - paid_half
unpaid_leave = len(unpaid_leave_dates) - unpaid_half
return {
"paid_leave": paid_leave,
"unpaid_leaves": unpaid_leave,
"total_leaves": paid_leave + unpaid_leave,
# List of paid leave date between range
"paid_leave_dates": paid_leave_dates,
# List of un paid date between range
"unpaid_leave_dates": unpaid_leave_dates,
"leave_dates": unpaid_leave_dates + paid_leave_dates,
}
def get_attendance(employee, start_date, end_date):
"""
This method is used to render attendance details between the range
Args:
employee (obj): Employee user instance
start_date (obj): start date of the period
end_date (obj): end date of the period
"""
attendances_on_period = Attendance.objects.filter(
employee_id=employee,
attendance_date__range=(start_date, end_date),
attendance_validated=True,
)
present_on = [attendance.attendance_date for attendance in attendances_on_period]
working_days_between_range = get_working_days(start_date, end_date)[
"working_days_on"
]
leave_dates = get_leaves(employee, start_date, end_date)["leave_dates"]
conflict_dates = list(
set(working_days_between_range) - set(attendances_on_period) - set(leave_dates)
)
conflict_dates = conflict_dates + [
date
for date in present_on
if date in get_holiday_dates(start_date, end_date)
or date
in list(
set(
get_company_leave_dates(start_date.year)
+ get_company_leave_dates(end_date.year)
)
)
]
return {
"attendances_on_period": attendances_on_period,
"present_on": present_on,
"conflict_dates": conflict_dates,
}
def hourly_computation(employee, wage, start_date, end_date):
"""
Hourly salary computation for period.
Args:
employee (obj): Employee instance
wage (float): wage of the employee
start_date (obj): start of the pay period
end_date (obj): end date of the period
"""
attendance_data = get_attendance(employee, start_date, end_date)
attendances_on_period = attendance_data["attendances_on_period"]
total_worked_hour_in_second = 0
for attendance in attendances_on_period:
total_worked_hour_in_second = total_worked_hour_in_second + (
attendance.at_work_second - attendance.overtime_second
)
# to find wage per second
# wage_per_second = wage_per_hour / total_seconds_in_hour
wage_in_second = wage / 3600
basic_pay = float(f"{(wage_in_second * total_worked_hour_in_second):.2f}")
return {
"basic_pay": basic_pay,
"loss_of_pay": 0,
}
def find_half_day_leaves():
"""
This method is used to return the half day leave details
Args:
employee (obj): Employee model instance
start_date (obj): start date of the period
end_date (obj): end date of the period
"""
paid_queryset = []
unpaid_queryset = []
paid_leaves = list(filter(None, list(set(paid_queryset))))
unpaid_leaves = list(filter(None, list(set(unpaid_queryset))))
paid_half = len(paid_leaves) * 0.5
unpaid_half = len(unpaid_leaves) * 0.5
queryset = paid_leaves + unpaid_leaves
total_leaves = len(queryset) * 0.50
return {
"half_day_query_set": queryset,
"half_day_leaves": total_leaves,
"half_paid_leaves": paid_half,
"half_unpaid_leaves": unpaid_half,
}
def daily_computation(employee, wage, start_date, end_date):
"""
Hourly salary computation for period.
Args:
employee (obj): Employee instance
wage (float): wage of the employee
start_date (obj): start of the pay period
end_date (obj): end date of the period
"""
working_day_data = get_working_days(start_date, end_date)
total_working_days = working_day_data["total_working_days"]
leave_data = get_leaves(employee, start_date, end_date)
contract = employee.contract_set.filter(contract_status="active").first()
basic_pay = wage * total_working_days
loss_of_pay = 0
date_range = get_date_range(start_date, end_date)
half_day_leaves_between_period_on_start_date = (
employee.leaverequest_set.filter(
leave_type_id__payment="unpaid",
start_date__in=date_range,
status="approved",
)
.exclude(start_date_breakdown="full_day")
.count()
)
half_day_leaves_between_period_on_end_date = (
employee.leaverequest_set.filter(
leave_type_id__payment="unpaid", end_date__in=date_range, status="approved"
)
.exclude(end_date_breakdown="full_day")
.exclude(start_date=F("end_date"))
.count()
)
unpaid_half_leaves = (
half_day_leaves_between_period_on_start_date
+ half_day_leaves_between_period_on_end_date
) * 0.5
contract = employee.contract_set.filter(
is_active=True, contract_status="active"
).first()
unpaid_leaves = leave_data["unpaid_leaves"] - unpaid_half_leaves
if contract.calculate_daily_leave_amount:
loss_of_pay = (unpaid_leaves) * wage
else:
fixed_penalty = contract.deduction_for_one_leave_amount
loss_of_pay = (unpaid_leaves) * fixed_penalty
if contract.deduct_leave_from_basic_pay:
basic_pay = basic_pay - loss_of_pay
return {
"basic_pay": basic_pay,
"loss_of_pay": loss_of_pay,
}
def get_daily_salary(wage, wage_date) -> dict:
"""
This method is used to calculate daily salary for the date
"""
last_day = calendar.monthrange(wage_date.year, wage_date.month)[1]
end_date = date(wage_date.year, wage_date.month, last_day)
start_date = date(wage_date.year, wage_date.month, 1)
working_days = get_working_days(start_date, end_date)["total_working_days"]
day_wage = wage / working_days # if working_days != 0 else 0
return {
"day_wage": day_wage,
}
def months_between_range(wage, start_date, end_date):
"""
This method is used to find the months between range
"""
months_data = []
for current_date in (
start_date + relativedelta(months=i)
for i in range(
(end_date.year - start_date.year) * 12
+ end_date.month
- start_date.month
+ 1
)
):
month = current_date.month
year = current_date.year
days_in_month = (
current_date + relativedelta(day=1, months=1) - relativedelta(days=1)
).day
# Calculate the end date for the current month
current_end_date = current_date + relativedelta(day=days_in_month)
current_end_date = min(current_end_date, end_date)
working_days_on_month = get_working_days(
current_date.replace(day=1), current_date.replace(day=days_in_month)
)["total_working_days"]
month_start_date = (
date(year=year, month=month, day=1)
if start_date < date(year=year, month=month, day=1)
else start_date
)
total_working_days_on_period = get_working_days(
month_start_date, current_end_date
)["total_working_days"]
month_info = {
"month": month,
"year": year,
"days": days_in_month,
"start_date": month_start_date.strftime("%Y-%m-%d"),
"end_date": current_end_date.strftime("%Y-%m-%d"),
# month period
"working_days_on_period": total_working_days_on_period,
"working_days_on_month": working_days_on_month,
"per_day_amount": wage
/ working_days_on_month, # if working_days_on_month != 0 else 0,
}
months_data.append(month_info)
# Set the start date for the next month as the first day of the next month
current_date = (current_date + relativedelta(day=1, months=1)).replace(day=1)
return months_data
def monthly_computation(employee, wage, start_date, end_date):
"""
Hourly salary computation for period.
Args:
employee (obj): Employee instance
wage (float): wage of the employee
start_date (obj): start of the pay period
end_date (obj): end date of the period
"""
basic_pay = 0
month_data = months_between_range(wage, start_date, end_date)
leave_data = get_leaves(employee, start_date, end_date)
for data in month_data:
basic_pay = basic_pay + (
data["working_days_on_period"] * data["per_day_amount"]
)
contract = employee.contract_set.filter(contract_status="active").first()
loss_of_pay = 0
date_range = get_date_range(start_date, end_date)
half_day_leaves_between_period_on_start_date = (
employee.leaverequest_set.filter(
leave_type_id__payment="unpaid",
start_date__in=date_range,
status="approved",
)
.exclude(start_date_breakdown="full_day")
.count()
)
half_day_leaves_between_period_on_end_date = (
employee.leaverequest_set.filter(
leave_type_id__payment="unpaid", end_date__in=date_range, status="approved"
)
.exclude(end_date_breakdown="full_day")
.exclude(start_date=F("end_date"))
.count()
)
unpaid_half_leaves = (
half_day_leaves_between_period_on_start_date
+ half_day_leaves_between_period_on_end_date
) * 0.5
contract = employee.contract_set.filter(
is_active=True, contract_status="active"
).first()
unpaid_leaves = abs(leave_data["unpaid_leaves"] - unpaid_half_leaves)
paid_days = month_data[0]["working_days_on_period"] - unpaid_leaves
daily_computed_salary = get_daily_salary(wage=wage, wage_date=start_date)[
"day_wage"
]
if contract.calculate_daily_leave_amount:
loss_of_pay = (unpaid_leaves) * daily_computed_salary
else:
fixed_penalty = contract.deduction_for_one_leave_amount
loss_of_pay = (unpaid_leaves) * fixed_penalty
if contract.deduct_leave_from_basic_pay:
basic_pay = basic_pay - loss_of_pay
return {
"basic_pay": basic_pay,
"loss_of_pay": loss_of_pay,
"month_data": month_data,
"unpaid_days": unpaid_leaves,
"paid_days": paid_days,
}
def compute_salary_on_period(employee, start_date, end_date, wage=None):
"""
This method is used to compute salary on the start to end date period
Args:
employee (obj): Employee instance
start_date (obj): start date of the period
end_date (obj): end date of the period
"""
contract = Contract.objects.filter(
employee_id=employee, contract_status="active"
).first()
if contract is None:
return contract
wage = contract.wage if wage is None else wage
wage_type = contract.wage_type
data = None
if wage_type == "hourly":
data = hourly_computation(employee, wage, start_date, end_date)
month_data = months_between_range(wage, start_date, end_date)
data["month_data"] = month_data
elif wage_type == "daily":
data = daily_computation(employee, wage, start_date, end_date)
month_data = months_between_range(wage, start_date, end_date)
data["month_data"] = month_data
else:
data = monthly_computation(employee, wage, start_date, end_date)
data["contract_wage"] = wage
data["contract"] = contract
return data
def paginator_qry(qryset, page_number):
"""
This method is used to paginate queryset
"""
paginator = Paginator(qryset, get_pagination())
qryset = paginator.get_page(page_number)
return qryset
def calculate_employer_contribution(data):
"""
This method is used to calculate the employer contribution
"""
pay_head_data = data["pay_data"]
deductions_to_process = [
pay_head_data.get("pretax_deductions"),
pay_head_data.get("post_tax_deductions"),
pay_head_data.get("tax_deductions"),
pay_head_data.get("net_deductions"),
]
for deductions in deductions_to_process:
if deductions:
for deduction in deductions:
if (
deduction.get("deduction_id")
and deduction.get("employer_contribution_rate", 0) > 0
):
object = Deduction.objects.filter(
id=deduction.get("deduction_id")
).first()
if object:
amount = pay_head_data.get(object.based_on)
employer_contribution_amount = (
amount * object.employer_rate
) / 100
deduction["based_on"] = object.based_on
deduction["employer_contribution_amount"] = (
employer_contribution_amount
)
return data
def save_payslip(**kwargs):
"""
This method is used to save the generated payslip
"""
filtered_instance = Payslip.objects.filter(
employee_id=kwargs["employee"],
start_date=kwargs["start_date"],
end_date=kwargs["end_date"],
).first()
instance = filtered_instance if filtered_instance is not None else Payslip()
instance.employee_id = kwargs["employee"]
instance.group_name = kwargs.get("group_name")
instance.start_date = kwargs["start_date"]
instance.end_date = kwargs["end_date"]
instance.status = kwargs["status"]
instance.basic_pay = round(kwargs["basic_pay"], 2)
instance.contract_wage = round(kwargs["contract_wage"], 2)
instance.gross_pay = round(kwargs["gross_pay"], 2)
instance.deduction = round(kwargs["deduction"], 2)
instance.net_pay = round(kwargs["net_pay"], 2)
instance.pay_head_data = kwargs["pay_data"]
instance.save()
instance.installment_ids.set(kwargs["installments"])
return instance