[UPDT] HORILLA AUTOMATIONS: Updated automation method and initial load template

This commit is contained in:
Horilla
2025-05-09 11:44:07 +05:30
parent 0327a0a0c3
commit c51a31f8e5
9 changed files with 532 additions and 65 deletions

View File

@@ -34,6 +34,10 @@ class HorillaAutomationConfig(AppConfig):
path = f"{model.__module__}.{model.__name__}"
MODEL_CHOICES.append((path, model.__name__))
MODEL_CHOICES.append(("employee.models.Employee", "Employee"))
MODEL_CHOICES.append(
("pms.models.EmployeeKeyResult", "Employee Key Results")
)
MODEL_CHOICES = list(set(MODEL_CHOICES))
try:
start_automation()

View File

@@ -29,69 +29,128 @@ def get_related_models(model: HorillaModel) -> list:
return related_models
from horilla_automations.methods.recursive_relation import (
get_forward_relation_paths_separated,
)
def generate_choices(model_path):
"""
Generate mail to choice
"""
module_name, class_name = model_path.rsplit(".", 1)
module = __import__(module_name, fromlist=[class_name])
model_class: Employee = getattr(module, class_name)
fk_relation, m2m_relation = get_forward_relation_paths_separated(
model_class, Employee
)
all_fields = fk_relation + m2m_relation
to_fields = []
all_mail_to_field = []
mail_details_choice = []
for field in list(model_class._meta.fields) + list(model_class._meta.many_to_many):
if not getattr(field, "exclude_from_automation", False):
related_model = field.related_model
models = [Employee]
if recruitment_installed:
models.append(Candidate)
if related_model in models:
email_field = (
f"{field.name}__get_email",
f"{field.verbose_name.capitalize().replace(' id','')} mail field ",
for field_tuple in all_fields:
if not getattr(field_tuple[1], "exclude_from_automation", False):
all_mail_to_field.append(
(
f"{field_tuple[0]}__get_email",
f"({field_tuple[1].model.__name__}) {field_tuple[1].verbose_name.capitalize().replace(' id','')}'s mail ",
)
mail_detail = None
if not isinstance(field, django_models.ManyToManyField):
mail_detail = (
f"{field.name}__pk",
field.verbose_name.capitalize().replace(" id", "")
+ "(Template context)",
)
if field.related_model == Employee:
to_fields.append(
)
if not field_tuple[1].many_to_many:
mail_details_choice += [
(
f"{field_tuple[0]}__pk",
f"{field_tuple[1].verbose_name.capitalize().replace(' id','')} (Template context)",
),
]
# Adding reporting manager if the related model is Employee
if field_tuple[1].related_model == Employee:
# reporting manager mail to
all_mail_to_field.append(
(
f"{field.name}__employee_work_info__reporting_manager_id__get_email",
f"{field.verbose_name.capitalize().replace(' id','')}'s reporting manager",
f"{field_tuple[0]}__employee_work_info__reporting_manager_id__get_email",
f"{field_tuple[1].verbose_name.capitalize().replace(' id','')} / Reporting Manager's mail ",
)
)
if not isinstance(field, django_models.ManyToManyField):
mail_details_choice.append(
(
f"{field.name}__employee_work_info__reporting_manager_id__pk",
f"{field.verbose_name.capitalize().replace(' id','')}'s reporting manager (Template context)",
)
# reporting manager template context
mail_details_choice.append(
(
f"{field_tuple[0]}__employee_work_info__reporting_manager_id__pk",
f"{field_tuple[1].verbose_name.capitalize().replace(' id','')} / Reporting Manager (Template context) ",
)
to_fields.append(email_field)
if mail_detail:
mail_details_choice.append(mail_detail)
text_area_fields = get_textfield_paths(model_class)
mail_details_choice = mail_details_choice + text_area_fields
models = [Employee]
if recruitment_installed:
models.append(Candidate)
if model_class in models:
to_fields.append(
)
if model_class == Employee:
# reporting manager mail to
all_mail_to_field.append(
(
"get_email",
f"{model_class.__name__}'s mail",
f"employee_work_info__reporting_manager_id__get_email",
f"Reporting Manager's mail ",
)
)
mail_to_related_fields = getattr(model_class, "mail_to_related_fields", [])
to_fields = to_fields + mail_to_related_fields
mail_details_choice.append(("pk", model_class.__name__))
if model_path == "employee.models.Employee":
all_mail_to_field.append(("get_email", "Employee's mail"))
elif model_path == "recruitment.models.Candidate":
all_mail_to_field.append(("get_email", "Candidate's mail"))
to_fields = []
# mail_details_choice = []
# for field in list(model_class._meta.fields) + list(model_class._meta.many_to_many):
# if not getattr(field, "exclude_from_automation", False):
# related_model = field.related_model
# models = [Employee]
# if recruitment_installed:
# models.append(Candidate)
# if related_model in models:
# email_field = (
# f"{field.name}__get_email",
# f"{field.verbose_name.capitalize().replace(' id','')} mail field ",
# )
# mail_detail = None
# if not isinstance(field, django_models.ManyToManyField):
# mail_detail = (
# f"{field.name}__pk",
# field.verbose_name.capitalize().replace(" id", "")
# + "(Template context)",
# )
# if field.related_model == Employee:
# to_fields.append(
# (
# f"{field.name}__employee_work_info__reporting_manager_id__get_email",
# f"{field.verbose_name.capitalize().replace(' id','')}'s reporting manager",
# )
# )
# if not isinstance(field, django_models.ManyToManyField):
# mail_details_choice.append(
# (
# f"{field.name}__employee_work_info__reporting_manager_id__pk",
# f"{field.verbose_name.capitalize().replace(' id','')}'s reporting manager (Template context)",
# )
# )
# to_fields.append(email_field)
# if mail_detail:
# mail_details_choice.append(mail_detail)
text_area_fields = get_textfield_paths(model_class)
mail_details_choice = mail_details_choice + text_area_fields
# models = [Employee]
# if recruitment_installed:
# models.append(Candidate)
# if model_class in models:
# to_fields.append(
# (
# "get_email",
# f"{model_class.__name__}'s mail ({model_class.__name__})",
# )
# )
# mail_to_related_fields = getattr(model_class, "mail_to_related_fields", [])
# to_fields = to_fields + mail_to_related_fields
# mail_details_choice.append(("pk", model_class.__name__))
to_fields = list(set(to_fields))
return to_fields, mail_details_choice, model_class
return all_mail_to_field, mail_details_choice, model_class
def get_model_class(model_path):

View File

@@ -0,0 +1,180 @@
"""
horilla_automation/recursive_relation.py
"""
from django.apps import apps
from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField
from django.db.models.fields.reverse_related import (
ManyToOneRel,
ManyToManyRel,
OneToOneRel,
)
# Set a recursion depth limit to prevent cycles
MAX_DEPTH = 4
def get_all_relation_paths(source_model, target_model, max_depth=5):
relation_paths = []
def walk(model, path, visited_models, depth):
if depth > max_depth or model in visited_models or is_history_model(model):
return
visited_models.add(model)
for field in model._meta.get_fields():
if not field.is_relation:
continue
# Forward relations
if not field.auto_created:
related_model = field.related_model
if not related_model or is_history_model(related_model):
continue
new_path = f"{path}__{field.name}" if path else field.name
if related_model == target_model:
relation_paths.append(new_path)
else:
walk(related_model, new_path, visited_models.copy(), depth + 1)
# Reverse relations (related_name or default accessor)
elif isinstance(field, ForeignObjectRel):
related_model = field.related_model
if not related_model or is_history_model(related_model):
continue
accessor_name = field.get_accessor_name()
new_path = f"{path}__{accessor_name}" if path else accessor_name
if related_model == target_model:
relation_paths.append(new_path)
else:
walk(related_model, new_path, visited_models.copy(), depth + 1)
walk(source_model, "", set(), 0)
return relation_paths
def is_history_model(model):
return (
model._meta.model_name.endswith("_history")
or model._meta.app_label == "simple_history"
or model.__name__.lower().endswith("history")
)
def get_simple_relation_paths(source_model, target_model, max_depth=5):
results = []
all_paths = set()
def walk(model, path, visited_models, depth):
if depth > max_depth or model in visited_models:
return
visited_models = visited_models | {model}
for field in model._meta.get_fields():
if not field.is_relation or isinstance(field, ManyToManyField):
continue
# Skip fields without a valid remote_field or related_model
remote = getattr(field, "remote_field", None)
related_model = getattr(remote, "model", None)
if related_model is None:
continue
# Determine accessor name
if field.auto_created and not field.concrete:
accessor = field.get_accessor_name()
else:
accessor = field.name
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(accessor)
print(field)
new_path = f"{path}__{accessor}" if path else accessor
if related_model == target_model:
results.append(new_path)
all_paths.add(new_path)
elif depth + 1 < max_depth:
walk(related_model, new_path, visited_models, depth + 1)
walk(source_model, "", set(), 0)
# Post-process to remove paths that are strict supersets of others
unique_paths = []
for path in sorted(results, key=lambda p: p.count("__")): # shortest first
if not any(
path != other and path.startswith(other + "__") for other in unique_paths
):
unique_paths.append(path)
return unique_paths
def is_history_model(model):
return (
model._meta.model_name.endswith("_history")
or model._meta.app_label == "simple_history"
or model.__name__.lower().endswith("history")
)
def get_forward_relation_paths_separated(source_model, target_model, max_depth=5):
"""
Recursively find forward relation paths from source_model to target_model,
separating ForeignKey and ManyToManyField paths, excluding history models.
"""
fk_paths = []
m2m_paths = []
def walk(model, path, visited_models, depth):
if depth > max_depth or model in visited_models or is_history_model(model):
return
visited_models.add(model)
for field in model._meta.get_fields():
if not field.is_relation or field.auto_created:
continue # Skip non-relational fields and reverse relations
related_model = field.related_model
if not related_model or is_history_model(related_model):
continue
new_path = f"{path}__{field.name}" if path else field.name
if related_model == target_model:
if field.many_to_many:
m2m_paths.append((new_path, field))
else:
fk_paths.append((new_path, field))
else:
walk(related_model, new_path, visited_models.copy(), depth + 1)
walk(source_model, "", set(), 0)
return fk_paths, m2m_paths
_a = {
"pms.models.EmployeeKeyResult": {
"mail_to": [
("employee_objective_id__employee_id__get_email", "Employee's mail"),
(
"employee_objective_id__employee_id__employee_work_inf__reporting_manager_id__get_email",
"Reporting manager's mail",
),
(
"employee_objective_id__objective_id__managers__get_email",
"Objective manager's mail",
),
],
"mail_instance": [
("employee_objective_id__employee_id", "Employee"),
],
}
}

View File

@@ -40,7 +40,7 @@ class MailAutomation(HorillaModel):
title = models.CharField(max_length=256, unique=True)
method_title = models.CharField(max_length=50, editable=False)
model = models.CharField(max_length=100, choices=MODEL_CHOICES, null=False)
mail_to = models.TextField(verbose_name="Mail to")
mail_to = models.TextField(verbose_name="Mail to/Notify to")
mail_details = models.CharField(
max_length=250,
help_text=_trans(

View File

@@ -449,8 +449,10 @@ def send_mail(request, automation, instance):
title_template = template.Template(automation.title)
title_context = template.Context({"instance": instance, "self": sender})
render_title = title_template.render(title_context)
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
soup = BeautifulSoup(render_bdy, "html.parser")
plain_text = soup.get_text(separator="\n")
print(render_title)
email = EmailMessage(
subject=render_title,
@@ -466,11 +468,14 @@ def send_mail(request, automation, instance):
def _send_mail(email):
try:
print("MAIL SENTTTTTTTTTTTTT")
email.send()
except Exception as e:
print("ERRRRRR")
logger.error(e)
def _send_notification(text):
print(text)
notify.send(
sender,
recipient=user_ids,
@@ -478,6 +483,7 @@ def send_mail(request, automation, instance):
icon="person-remove",
redirect="",
)
print("NOTIFICATION SENTTTTTTTTTTTTTT")
if automation.delivary_channel != "notification":
thread = threading.Thread(

View File

@@ -45,7 +45,9 @@ function getToMail(element) {
tr = `
<tr class="dynamic-condition-row">
<td class="sn">${totalRows}</td>
<td id="conditionalField"></td>
<td id="conditionalField">
<div hidden>${JSON.stringify(response.serialized_form)}</div>
</td>
<td>
<select name="condition" onchange="addSelectedAttr(event)" class="w-100">
<option value="==">==</option>
@@ -127,9 +129,7 @@ function getHtml() {
function populateSelect(data, response) {
const selectElement = $(
`<select class="w-100" onchange="updateValue($(this));addSelectedAttr(event)" data-response='${JSON.stringify(
response.serialized_form
).toString()}'></select>`
`<select class="w-100" onchange="updateValue($(this));addSelectedAttr(event)"></select>`
);
data.forEach((item) => {
@@ -139,17 +139,21 @@ function populateSelect(data, response) {
selectElement.append($option);
});
return selectElement;
}
function updateValue(element) {
field = element.val();
attr = element.attr("data-response");
attr = attr
.replace(/[\u0000-\u001F\u007F-\u009F]/g, "")
.replace(/\\n/g, "\\\\n")
.replace(/\\t/g, "\\\\t");
console.log(">>>>>>>>>>>>>>>>>>>>>>")
json = element.closest('table').find('#conditionalField div[hidden]').text()
console.log(json)
response = JSON.parse(attr);
field = element.val();
// attr = json
// .replace(/[\u0000-\u001F\u007F-\u009F]/g, "")
// .replace(/\\n/g, "\\\\n")
// .replace(/\\t/g, "\\\\t");
response = JSON.parse(json);
valueElement = createElement(field, response);
element.closest("tr").find(".condition-value-th").html("");

View File

@@ -0,0 +1,75 @@
{% load static i18n %}
<div class="oh-modal__dialog-header">
<span class="oh-modal__dialog-title">{% trans "Load Automations" %}</span>
<button class="oh-modal__close" aria-label="Close">
<ion-icon name="close-outline"></ion-icon>
</button>
</div>
<div class="oh-modal__dialog-body">
<div class="oh-card">
<form hx-post="{{request.path}}" hx-target="#genericModalBody">
{% csrf_token %}
<div
class="oh-layout--grid-3"
style="
grid-template-columns: repeat(auto-fill, minmax(48%, 1fr));
"
>
{% for automation in automations %}
<div class="oh-card rounded">
<div class="oh-kanban-card__details">
<div class="d-flex-justify-between mb-2">
<span class="oh-kanban-card__title"
>{{automation.fields.title}}</span
>
<span>
<input
name="{{automation.pk}}"
type="checkbox"
class="custom-radio-checkmark"
/>
</span>
</div>
<div
class="oh-kanban-card__subtitle truncated-text"
style="
height: 120px;
white-space: wrap;
"
>
{{automation.template_body|safe}}
</div>
<ul class="oh-faq__tags m-0">
{% if automation.fields.delivary_channel == 'email' %}
<li class="oh-faq__tag text-light bg-primary">
{% trans "Email" %}
</li>
{% elif automation.fields.delivary_channel == 'notification' %}
<li class="oh-faq__tag text-light bg-danger">
{% trans "Notification" %}
</li>
{% elif automation.fields.delivary_channel == 'both' %}
<li class="oh-faq__tag text-light bg-primary">
{% trans "Email" %}
</li>
<li class="oh-faq__tag text-light bg-danger">
{% trans "Notification" %}
</li>
{% endif %}
</ul>
</div>
</div>
{% endfor %}
</div>
<div class="d-flex flex-row-reverse">
<button
type="submit"
class="oh-btn oh-btn--secondary mt-2 mr-0 pl-4 pr-5 oh-btn--w-100-resp"
>
{% trans "Add" %}
</button>
</div>
</form>
</div>
</div>

View File

@@ -47,4 +47,9 @@ urlpatterns = [
views.delete_automation,
name="delete-automation",
),
path(
"load-automations",
cbvs.LoadAutomationsView.as_view(),
name="load-automations",
),
]

View File

@@ -2,11 +2,20 @@
horilla_automations/views/cbvs.py
"""
import json
import os
from django.conf import settings
from django.contrib import messages
from django.urls import reverse_lazy
from django.core import serializers
from django.http import HttpResponse
from django.shortcuts import render
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _trans
from django.views import View
from base.models import HorillaMailTemplate
from horilla.decorators import login_required, permission_required
from horilla_automations import models
from horilla_automations.filters import AutomationFilter
@@ -45,12 +54,38 @@ class AutomationNavView(views.HorillaNavView):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.create_attrs = f"""
hx-get="{reverse_lazy("create-automation")}"
hx-target="#genericModalBody"
data-target="#genericModal"
data-toggle="oh-modal-toggle"
"""
self.actions = []
if self.request.user.has_perm("horilla_automation.add_mailautomation"):
self.create_attrs = f"""
hx-get="{reverse_lazy("create-automation")}"
hx-target="#genericModalBody"
data-target="#genericModal"
data-toggle="oh-modal-toggle"
"""
self.actions.append(
{
"action": "Load Automations",
"attrs": f"""
data-toggle="oh-modal-toggle"
data-target="#genericModal"
hx-target="#genericModalBody"
hx-get="{reverse_lazy('load-automations')}"
style="cursor: pointer;"
""",
}
)
self.actions.append(
{
"action": "Refresh Automations",
"attrs": f"""
hx-get="{reverse_lazy('mail-automations-list-view')}"
hx-target="#listContainer"
class="oh-btn oh-btn--light-bkg"
""",
}
)
nav_title = _trans("Automations")
search_url = reverse_lazy("mail-automations-list-view")
@@ -190,3 +225,102 @@ class AutomationDetailedView(views.HorillaDetailedView):
""",
},
]
@method_decorator(login_required, name="dispatch")
@method_decorator(
permission_required("horilla_automation.add_mailautomation"), name="dispatch"
)
class LoadAutomationsView(View):
template_name = "horilla_automations/load_automation.html"
template_file = os.path.join(settings.BASE_DIR, "load_data", "mail_templates.json")
automation_file = os.path.join(
settings.BASE_DIR, "load_data", "mail_automations.json"
)
def load_json_files(self):
with open(self.template_file, "r") as tf:
templates_raw = json.load(tf)
with open(self.automation_file, "r") as af:
automations_raw = json.load(af)
return templates_raw, automations_raw
def get(self, request):
templates_raw, automations_raw = self.load_json_files()
template_lookup = {item["pk"]: item["fields"]["body"] for item in templates_raw}
processed_automations = []
for automation in automations_raw:
processed = automation.copy()
template_pk = automation["fields"].get("mail_template")
processed["template_body"] = template_lookup.get(template_pk, "")
processed_automations.append(processed)
return render(
request,
self.template_name,
{"automations": processed_automations},
)
def post(self, request):
templates_raw, automations_raw = self.load_json_files()
template_lookup = {item["pk"]: item["fields"]["body"] for item in templates_raw}
selected_ids = [int(k) for k in request.POST.keys() if k.isdigit()]
selected_automations = [a for a in automations_raw if a["pk"] in selected_ids]
required_template_pks = {
a["fields"].get("mail_template")
for a in selected_automations
if a["fields"].get("mail_template")
}
for template_json in templates_raw:
if template_json["pk"] in required_template_pks:
template_data = list(
serializers.deserialize("json", json.dumps([template_json]))
)[0].object
existing = HorillaMailTemplate.objects.filter(
title=template_data.title
).first()
if not existing:
template_data.pk = None
template_data.save()
for automation_json in selected_automations:
deserialized = list(
serializers.deserialize("json", json.dumps([automation_json]))
)[0]
automation_obj = deserialized.object
template_pk = automation_json["fields"].get("mail_template")
template_body = template_lookup.get(template_pk)
mail_template = HorillaMailTemplate.objects.filter(
body=template_body
).first()
automation_obj.mail_template = mail_template
if not models.MailAutomation.objects.filter(
title=automation_obj.title
).exists():
automation_obj.pk = None
automation_obj.save()
messages.success(
request, f"Automation '{automation_obj.title}' added successfully."
)
else:
messages.warning(
request, f"Automation '{automation_obj.title}' already exists."
)
script = """
<script>
$("#reloadMessagesButton").click();
$('#applyFilter').click();
$('.oh-modal--show').first().removeClass('oh-modal--show');
</script>
"""
return HttpResponse(script)