Files
ihrm/pms/signals.py

200 lines
8.3 KiB
Python

"""
pms/signals.py
"""
import copy
import logging
import threading
import types
from datetime import date
from django.db.models.signals import m2m_changed, post_delete, post_save, pre_save
from django.dispatch import receiver
from employee.methods.methods import check_relationship_with_employee_model
from horilla.horilla_middlewares import _thread_locals
from horilla.signals import pre_bulk_update
from pms.models import BonusPointSetting
logger = logging.getLogger(__name__)
SIGNAL_HANDLERS = []
INSTANCE_HANDLERS = []
def start_automation():
"""
Automation signals
"""
from horilla_automations.methods.methods import get_model_class, split_query_string
@receiver(post_delete, sender=BonusPointSetting)
@receiver(post_save, sender=BonusPointSetting)
def automation_pre_create(sender, instance, **kwargs):
"""
signal method to handle automation post save
"""
start_connection()
track_previous_instance()
def clear_connection():
"""
Method to clear signals handlers
"""
for handler in SIGNAL_HANDLERS:
post_save.disconnect(handler, sender=handler.model_class)
SIGNAL_HANDLERS.clear()
def start_connection():
"""
Method to start signal connection accordingly to the automation
"""
clear_connection()
bonus_point_settings = BonusPointSetting.objects.filter(is_active=True)
for bonus_point_setting in bonus_point_settings:
model_path = bonus_point_setting.model
model_class = get_model_class(model_path)
related_fields = check_relationship_with_employee_model(model_class)
field = None
type = None
for field_name, relation_type in related_fields:
if (
bonus_point_setting.applicable_for == "members"
and relation_type == "ManyToManyField"
):
field = field_name
type = relation_type
break
elif (
bonus_point_setting.applicable_for == "managers"
or bonus_point_setting.applicable_for == "owner"
and relation_type == "ForeignKey"
):
field = field_name
type = relation_type
break
def create_signal_handler(name, bonus_point_setting, type=None, field=None):
def signal_handler(sender, instance, *args, **kwargs):
"""
Signal handler for post-save events of the model instances.
"""
if type == "ManyToManyField":
@receiver(
m2m_changed, sender=getattr(model_class, field).through
)
def members_changed(sender, instance, action, **kwargs):
"""
Handle m2m_changed signal for the members field in YourModel.
"""
if (
action == "post_add"
or action == "post_remove"
or action == "post_clear"
or action == "post_save"
):
# These actions occur after members are added, removed, or cleared
field_1 = date.today()
field_2 = instance.end_date
if bonus_point_setting.bonus_for == instance.status:
if field and type:
field_value = getattr(instance, field)
if type == "ManyToManyField":
# Now this should give the updated members
employees = field_value.all()
else:
employees = field_value
for employee in employees:
bonus_point_setting.create_employee_bonus(
employee, field_1, field_2, instance
)
else:
logger("No type and field")
else:
logger("Not post add.")
else:
field_1 = date.today()
field_2 = instance.end_date
if bonus_point_setting.bonus_for == instance.status:
if field and type:
field_value = getattr(instance, field)
if type == "ManyToManyField":
# Now this should give the updated members
employees = field_value.all()
for employee in employees:
bonus_point_setting.create_employee_bonus(
employee, field_1, field_2, instance
)
else:
employee = field_value
bonus_point_setting.create_employee_bonus(
employee, field_1, field_2, instance
)
else:
logger("No type and field")
signal_handler.__name__ = name
signal_handler.model_class = model_class
signal_handler.type = type
signal_handler.field = field
signal_handler.bonus_point_setting = bonus_point_setting
return signal_handler
# Create and connect the signal handler
handler_name = f"{bonus_point_setting.id}_signal_handler"
dynamic_signal_handler = create_signal_handler(
handler_name, bonus_point_setting, type=type, field=field
)
SIGNAL_HANDLERS.append(dynamic_signal_handler)
post_save.connect(
dynamic_signal_handler, sender=dynamic_signal_handler.model_class
)
def track_previous_instance():
"""
method to add signal to track the automations model previous instances
"""
def clear_instance_signal_connection():
"""
Method to clear instance handler signals
"""
for handler in INSTANCE_HANDLERS:
pre_save.disconnect(handler, sender=handler.model_class)
pre_bulk_update.disconnect(handler, sender=handler.model_class)
INSTANCE_HANDLERS.clear()
clear_instance_signal_connection()
bonus_point_settings = BonusPointSetting.objects.filter(is_active=True)
for bonus_setting in bonus_point_settings:
model_class = get_model_class(bonus_setting.model)
@receiver(pre_save, sender=model_class)
def instance_handler(sender, instance, **kwargs):
"""
Signal handler for pres-save events of the model instances.
"""
# prevented storing the scheduled activities
request = getattr(_thread_locals, "request", None)
if instance.pk:
# to get the previous instance
instance = model_class.objects.filter(id=instance.pk).first()
if request:
_thread_locals.previous_record = {
"bonus_setting": bonus_setting,
"instance": instance,
}
instance_handler.__name__ = f"{bonus_setting.id}_instance_handler"
return instance_handler
instance_handler.model_class = model_class
instance_handler.bonus_setting = bonus_setting
INSTANCE_HANDLERS.append(instance_handler)
track_previous_instance()
start_connection()