200 lines
8.3 KiB
Python
200 lines
8.3 KiB
Python
"""
|
|
pms/signals.py
|
|
"""
|
|
|
|
import copy
|
|
import logging
|
|
import threading
|
|
import types
|
|
from datetime import date
|
|
|
|
from django.db.models.signals import m2m_changed, post_delete, post_save, pre_save
|
|
from django.dispatch import receiver
|
|
|
|
from employee.methods.methods import check_relationship_with_employee_model
|
|
from horilla.horilla_middlewares import _thread_locals
|
|
from horilla.signals import pre_bulk_update
|
|
from pms.models import BonusPointSetting
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
SIGNAL_HANDLERS = []
|
|
INSTANCE_HANDLERS = []
|
|
|
|
|
|
def start_automation():
|
|
"""
|
|
Automation signals
|
|
"""
|
|
from horilla_automations.methods.methods import get_model_class, split_query_string
|
|
|
|
@receiver(post_delete, sender=BonusPointSetting)
|
|
@receiver(post_save, sender=BonusPointSetting)
|
|
def automation_pre_create(sender, instance, **kwargs):
|
|
"""
|
|
signal method to handle automation post save
|
|
"""
|
|
start_connection()
|
|
track_previous_instance()
|
|
|
|
def clear_connection():
|
|
"""
|
|
Method to clear signals handlers
|
|
"""
|
|
for handler in SIGNAL_HANDLERS:
|
|
post_save.disconnect(handler, sender=handler.model_class)
|
|
SIGNAL_HANDLERS.clear()
|
|
|
|
def start_connection():
|
|
"""
|
|
Method to start signal connection accordingly to the automation
|
|
"""
|
|
clear_connection()
|
|
bonus_point_settings = BonusPointSetting.objects.filter(is_active=True)
|
|
for bonus_point_setting in bonus_point_settings:
|
|
model_path = bonus_point_setting.model
|
|
model_class = get_model_class(model_path)
|
|
related_fields = check_relationship_with_employee_model(model_class)
|
|
field = None
|
|
type = None
|
|
for field_name, relation_type in related_fields:
|
|
if (
|
|
bonus_point_setting.applicable_for == "members"
|
|
and relation_type == "ManyToManyField"
|
|
):
|
|
field = field_name
|
|
type = relation_type
|
|
break
|
|
elif (
|
|
bonus_point_setting.applicable_for == "managers"
|
|
or bonus_point_setting.applicable_for == "owner"
|
|
and relation_type == "ForeignKey"
|
|
):
|
|
field = field_name
|
|
type = relation_type
|
|
break
|
|
|
|
def create_signal_handler(name, bonus_point_setting, type=None, field=None):
|
|
def signal_handler(sender, instance, *args, **kwargs):
|
|
"""
|
|
Signal handler for post-save events of the model instances.
|
|
"""
|
|
if type == "ManyToManyField":
|
|
|
|
@receiver(
|
|
m2m_changed, sender=getattr(model_class, field).through
|
|
)
|
|
def members_changed(sender, instance, action, **kwargs):
|
|
"""
|
|
Handle m2m_changed signal for the members field in YourModel.
|
|
"""
|
|
if (
|
|
action == "post_add"
|
|
or action == "post_remove"
|
|
or action == "post_clear"
|
|
or action == "post_save"
|
|
):
|
|
# These actions occur after members are added, removed, or cleared
|
|
field_1 = date.today()
|
|
field_2 = instance.end_date
|
|
if bonus_point_setting.bonus_for == instance.status:
|
|
if field and type:
|
|
field_value = getattr(instance, field)
|
|
if type == "ManyToManyField":
|
|
# Now this should give the updated members
|
|
employees = field_value.all()
|
|
else:
|
|
employees = field_value
|
|
for employee in employees:
|
|
bonus_point_setting.create_employee_bonus(
|
|
employee, field_1, field_2, instance
|
|
)
|
|
else:
|
|
logger("No type and field")
|
|
else:
|
|
logger("Not post add.")
|
|
|
|
else:
|
|
field_1 = date.today()
|
|
field_2 = instance.end_date
|
|
if bonus_point_setting.bonus_for == instance.status:
|
|
if field and type:
|
|
field_value = getattr(instance, field)
|
|
if type == "ManyToManyField":
|
|
# Now this should give the updated members
|
|
employees = field_value.all()
|
|
for employee in employees:
|
|
bonus_point_setting.create_employee_bonus(
|
|
employee, field_1, field_2, instance
|
|
)
|
|
else:
|
|
employee = field_value
|
|
bonus_point_setting.create_employee_bonus(
|
|
employee, field_1, field_2, instance
|
|
)
|
|
else:
|
|
logger("No type and field")
|
|
|
|
signal_handler.__name__ = name
|
|
signal_handler.model_class = model_class
|
|
signal_handler.type = type
|
|
signal_handler.field = field
|
|
signal_handler.bonus_point_setting = bonus_point_setting
|
|
return signal_handler
|
|
|
|
# Create and connect the signal handler
|
|
handler_name = f"{bonus_point_setting.id}_signal_handler"
|
|
dynamic_signal_handler = create_signal_handler(
|
|
handler_name, bonus_point_setting, type=type, field=field
|
|
)
|
|
SIGNAL_HANDLERS.append(dynamic_signal_handler)
|
|
post_save.connect(
|
|
dynamic_signal_handler, sender=dynamic_signal_handler.model_class
|
|
)
|
|
|
|
def track_previous_instance():
|
|
"""
|
|
method to add signal to track the automations model previous instances
|
|
"""
|
|
|
|
def clear_instance_signal_connection():
|
|
"""
|
|
Method to clear instance handler signals
|
|
"""
|
|
for handler in INSTANCE_HANDLERS:
|
|
pre_save.disconnect(handler, sender=handler.model_class)
|
|
pre_bulk_update.disconnect(handler, sender=handler.model_class)
|
|
INSTANCE_HANDLERS.clear()
|
|
|
|
clear_instance_signal_connection()
|
|
bonus_point_settings = BonusPointSetting.objects.filter(is_active=True)
|
|
for bonus_setting in bonus_point_settings:
|
|
model_class = get_model_class(bonus_setting.model)
|
|
|
|
@receiver(pre_save, sender=model_class)
|
|
def instance_handler(sender, instance, **kwargs):
|
|
"""
|
|
Signal handler for pres-save events of the model instances.
|
|
"""
|
|
# prevented storing the scheduled activities
|
|
request = getattr(_thread_locals, "request", None)
|
|
if instance.pk:
|
|
# to get the previous instance
|
|
instance = model_class.objects.filter(id=instance.pk).first()
|
|
if request:
|
|
_thread_locals.previous_record = {
|
|
"bonus_setting": bonus_setting,
|
|
"instance": instance,
|
|
}
|
|
instance_handler.__name__ = f"{bonus_setting.id}_instance_handler"
|
|
return instance_handler
|
|
|
|
instance_handler.model_class = model_class
|
|
instance_handler.bonus_setting = bonus_setting
|
|
|
|
INSTANCE_HANDLERS.append(instance_handler)
|
|
|
|
track_previous_instance()
|
|
start_connection()
|