diff --git a/accessibility/__init__.py b/accessibility/__init__.py new file mode 100644 index 0000000..8bd3cfb Binary files /dev/null and b/accessibility/__init__.py differ diff --git a/accessibility/accessibility.py b/accessibility/accessibility.py new file mode 100644 index 0000000..77e8f82 --- /dev/null +++ b/accessibility/accessibility.py @@ -0,0 +1,10 @@ +""" +accessibility/accessibility.py +""" + +from django.utils.translation import gettext_lazy as _ + +ACCESSBILITY_FEATURE = [ + ("employee_view", _("Default Employee View")), + ("employee_detailed_view", _("Default Employee Detailed View")), +] diff --git a/accessibility/admin.py b/accessibility/admin.py new file mode 100644 index 0000000..97e651b --- /dev/null +++ b/accessibility/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/accessibility/apps.py b/accessibility/apps.py new file mode 100644 index 0000000..baf6246 --- /dev/null +++ b/accessibility/apps.py @@ -0,0 +1,15 @@ +from django.apps import AppConfig + + +class AccessibilityConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "accessibility" + + def ready(self) -> None: + from accessibility import signals + from horilla.urls import include, path, urlpatterns + + urlpatterns.append( + path("", include("accessibility.urls")), + ) + return super().ready() diff --git a/accessibility/cbv_decorators.py b/accessibility/cbv_decorators.py new file mode 100644 index 0000000..d2f21f4 --- /dev/null +++ b/accessibility/cbv_decorators.py @@ -0,0 +1,56 @@ +""" +employee/decorators.py +""" + +from django.contrib import messages +from django.http import HttpResponse +from django.shortcuts import redirect +from django.utils.translation import gettext_lazy as _ + +from accessibility.methods import check_is_accessible +from base.decorators import decorator_with_arguments +from horilla.horilla_middlewares import _thread_locals + + +@decorator_with_arguments +def enter_if_accessible(function, feature, perm=None, method=None): + """ + accessible check decorator for cbv + """ + + def check_accessible(self, *args, **kwargs): + """ + Check accessible + """ + path = "/" + request = getattr(_thread_locals, "request") + if not getattr(self, "request", None): + self.request = request + referrer = request.META.get("HTTP_REFERER") + if referrer and request.path not in referrer: + path = request.META["HTTP_REFERER"] + accessible = False + cache_key = request.session.session_key + "accessibility_filter" + employee = getattr(request.user, "employee_get") + if employee: + accessible = check_is_accessible(feature, cache_key, employee) + has_perm = True + if perm: + has_perm = request.user.has_perm(perm) + + if accessible or has_perm or method(request): + return function(self, *args, **kwargs) + key = "HTTP_HX_REQUEST" + keys = request.META.keys() + messages.info(request, _("You dont have access to the feature")) + if key in keys: + return HttpResponse( + f""" + + """ + ) + return redirect(path) + + return check_accessible diff --git a/accessibility/decorators.py b/accessibility/decorators.py new file mode 100644 index 0000000..925e9bd --- /dev/null +++ b/accessibility/decorators.py @@ -0,0 +1,52 @@ +""" +employee/decorators.py +""" + +from django.contrib import messages +from django.http import HttpResponse +from django.shortcuts import redirect +from django.utils.translation import gettext_lazy as _ + +from accessibility.methods import check_is_accessible +from base.decorators import decorator_with_arguments + + +@decorator_with_arguments +def enter_if_accessible(function, feature, perm=None, method=None): + """ + accessiblie check decorator + """ + + def check_accessible(request, *args, **kwargs): + """ + Check accessible + """ + path = "/" + referrer = request.META.get("HTTP_REFERER") + if referrer and request.path not in referrer: + path = request.META["HTTP_REFERER"] + accessible = False + cache_key = request.session.session_key + "accessibility_filter" + employee = getattr(request.user, "employee_get") + if employee: + accessible = check_is_accessible(feature, cache_key, employee) + has_perm = True + if perm: + has_perm = request.user.has_perm(perm) + + if accessible or has_perm or (method and method(request, *args, **kwargs)): + return function(request, *args, **kwargs) + key = "HTTP_HX_REQUEST" + keys = request.META.keys() + messages.info(request, _("You dont have access to the feature")) + if key in keys: + return HttpResponse( + f""" + + """ + ) + return redirect(path) + + return check_accessible diff --git a/accessibility/filters.py b/accessibility/filters.py new file mode 100644 index 0000000..1777abe --- /dev/null +++ b/accessibility/filters.py @@ -0,0 +1,117 @@ +""" +accessibility/filters.py +""" + +from functools import reduce + +import django_filters +from django.db.models import Q +from django.template.loader import render_to_string +from django.utils.translation import gettext as _ + +from employee.models import Employee +from horilla.filters import HorillaFilterSet +from horilla.horilla_middlewares import _thread_locals + + +def _filter_form_structured(self): + """ + Render the form fields as HTML table rows with Bootstrap styling. + """ + request = getattr(_thread_locals, "request", None) + context = { + "form": self, + "request": request, + } + table_html = render_to_string("accessibility/filter_form_body.html", context) + return table_html + + +class AccessibilityFilter(HorillaFilterSet): + """ + Accessibility Filter with dynamic OR logic between fields + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.form.structured = _filter_form_structured(self.form) + + pk = django_filters.ModelMultipleChoiceFilter( + queryset=Employee.objects.all(), + field_name="pk", + lookup_expr="in", + label=_("Employee"), + ) + excluded_employees = django_filters.ModelMultipleChoiceFilter( + queryset=Employee.objects.all(), + label=_("Exclude Employees"), + ) + + verbose_name = { + "employee_work_info__job_position_id": _("Job Position"), + "employee_work_info__department_id": _("Department"), + "employee_work_info__work_type_id": _("Work Type"), + "employee_work_info__employee_type_id": _("Employee Type"), + "employee_work_info__job_role_id": _("Job Role"), + "employee_work_info__company_id": _("Company"), + "employee_work_info__shift_id": _("Shift"), + "employee_work_info__tags": _("Tags"), + "employee_user_id__groups": _("Groups"), + "employee_user_id__user_permissions": _("Permissions"), + } + + class Meta: + """ + Meta class for additional options + """ + + model = Employee + fields = [ + "pk", + "employee_work_info__job_position_id", + "employee_work_info__department_id", + "employee_work_info__work_type_id", + "employee_work_info__employee_type_id", + "employee_work_info__job_role_id", + "employee_work_info__company_id", + "employee_work_info__shift_id", + "employee_work_info__tags", + "employee_user_id__groups", + "employee_user_id__user_permissions", + ] + + def filter_queryset(self, queryset): + """ + Dynamically apply OR condition between all specified fields + """ + or_conditions = [] + + for field in self.Meta.fields: + field_value = self.data.get(field) + if field_value: + # Ensure field_value is always a list of strings (IDs) + if not isinstance(field_value, (list, tuple)): + field_value = [field_value] + + # Convert all to ints + try: + field_value = [int(v) for v in field_value if v] + except ValueError: + continue # skip invalid values + + # For related fields, use __in + if "__" in field: + or_conditions.append(Q(**{f"{field}__id__in": field_value})) + else: + or_conditions.append(Q(**{f"{field}__in": field_value})) + + if or_conditions: + queryset = queryset.filter(reduce(lambda x, y: x | y, or_conditions)) + + excluded_employees = self.data.get("excluded_employees") + if excluded_employees: + if not isinstance(excluded_employees, (list, tuple)): + excluded_employees = [excluded_employees] + queryset = queryset.exclude(pk__in=excluded_employees) + + return queryset diff --git a/accessibility/methods.py b/accessibility/methods.py new file mode 100644 index 0000000..8456b88 --- /dev/null +++ b/accessibility/methods.py @@ -0,0 +1,47 @@ +""" +accessibility/methods.py +""" + +from django.core.cache import cache + +from accessibility.accessibility import ACCESSBILITY_FEATURE +from accessibility.filters import AccessibilityFilter +from accessibility.models import DefaultAccessibility +from horilla.horilla_middlewares import _thread_locals + + +def check_is_accessible(feature, cache_key, employee): + """ + Method to check the employee is accessible for the feature or not + """ + if not employee: + return False + + accessibility = DefaultAccessibility.objects.filter( + feature=feature, is_enabled=True + ).first() + + if accessibility and accessibility.exclude_all: + return False + if not feature or not accessibility: + return True + + data: dict = cache.get(cache_key, default={}) + if data and data.get(feature) is not None: + return data.get(feature) + + employees = accessibility.employees.all() + accessible = employee in employees + return accessible + + +def update_employee_accessibility_cache(cache_key, employee): + """ + Cache for get all the queryset + """ + feature_accessible = {} + for accessibility, _display in ACCESSBILITY_FEATURE: + feature_accessible[accessibility] = check_is_accessible( + accessibility, cache_key, employee + ) + cache.set(cache_key, feature_accessible) diff --git a/accessibility/middlewares.py b/accessibility/middlewares.py new file mode 100644 index 0000000..e6f191c --- /dev/null +++ b/accessibility/middlewares.py @@ -0,0 +1,48 @@ +""" +accessibility/middlewares.py +""" + +from django.core.cache import cache + +from accessibility.methods import check_is_accessible +from accessibility.models import ACCESSBILITY_FEATURE + +ACCESSIBILITY_CACHE_USER_KEYS = {} + + +def update_accessibility_cache(cache_key, request): + """Cache for get all the queryset""" + feature_accessible = {} + for accessibility, _display in ACCESSBILITY_FEATURE: + feature_accessible[accessibility] = check_is_accessible( + accessibility, cache_key, getattr(request.user, "employee_get") + ) + cache.set(cache_key, feature_accessible) + + +class AccessibilityMiddleware: + """ + AccessibilityMiddleware + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + session_key = request.session.session_key + if session_key: + cache_key = session_key + "accessibility_filter" + exists_user_cache_key = ACCESSIBILITY_CACHE_USER_KEYS.get( + request.user.id, [] + ) + if not exists_user_cache_key: + ACCESSIBILITY_CACHE_USER_KEYS[request.user.id] = exists_user_cache_key + if ( + session_key + and getattr(request.user, "employee_get", None) + and not cache.get(cache_key) + ): + exists_user_cache_key.append(cache_key) + update_accessibility_cache(cache_key, request) + response = self.get_response(request) + return response diff --git a/accessibility/models.py b/accessibility/models.py new file mode 100644 index 0000000..255f3a4 --- /dev/null +++ b/accessibility/models.py @@ -0,0 +1,23 @@ +""" +accessibility/models.py +""" + +from django.db import models + +from accessibility.accessibility import ACCESSBILITY_FEATURE +from employee.models import Employee +from horilla.models import HorillaModel + + +class DefaultAccessibility(HorillaModel): + """ + DefaultAccessibilityModel + """ + + feature = models.CharField(max_length=100, choices=ACCESSBILITY_FEATURE) + filter = models.JSONField() + exclude_all = models.BooleanField(default=False) + employees = models.ManyToManyField( + Employee, blank=True, related_name="default_accessibility" + ) + is_enabled = models.BooleanField(default=True) diff --git a/accessibility/signals.py b/accessibility/signals.py new file mode 100644 index 0000000..90ce956 --- /dev/null +++ b/accessibility/signals.py @@ -0,0 +1,71 @@ +""" +accessibility/signals.py +""" + +import threading + +from django.core.cache import cache +from django.db.models.signals import post_save +from django.dispatch import receiver + +from accessibility.middlewares import ACCESSIBILITY_CACHE_USER_KEYS +from accessibility.models import DefaultAccessibility +from employee.models import EmployeeWorkInformation +from horilla.signals import post_bulk_update + + +def _clear_accessibility_cache(): + for _user_id, cache_keys in ACCESSIBILITY_CACHE_USER_KEYS.copy().items(): + for key in cache_keys: + cache.delete(key) + + +def _clear_bulk_employees_cache(queryset): + for instance in queryset: + cache_key = None + if instance.employee_id and instance.employee_id.employee_user_id: + cache_key = ACCESSIBILITY_CACHE_USER_KEYS.get( + instance.employee_id.employee_user_id.id + ) + if cache_key: + cache.delete(cache_key) + + +@receiver(post_save, sender=EmployeeWorkInformation) +def monitor_employee_update(sender, instance, created, **kwargs): + """ + This method tracks updates to an employee's work information instance. + """ + + _sender = sender + _created = created + + if instance.employee_id and instance.employee_id.employee_user_id: + user_id = instance.employee_id.employee_user_id.id + cache_keys = ACCESSIBILITY_CACHE_USER_KEYS.get(user_id, []) + + for key in cache_keys: + cache.delete(key) + + +@receiver(post_save, sender=DefaultAccessibility) +def monitor_accessibility_update(sender, instance, created, **kwargs): + """ + This method is used to track accessibility updates + """ + _sender = sender + _created = created + _instance = instance + thread = threading.Thread(target=_clear_accessibility_cache) + thread.start() + + +@receiver(post_bulk_update, sender=EmployeeWorkInformation) +def monitor_employee_bulk_update(sender, queryset, *args, **kwargs): + """ + This method is used to track accessibility updates + """ + _sender = sender + _queryset = queryset + thread = threading.Thread(target=_clear_bulk_employees_cache(queryset)) + thread.start() diff --git a/accessibility/tests.py b/accessibility/tests.py new file mode 100644 index 0000000..bccdb2f --- /dev/null +++ b/accessibility/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/accessibility/urls.py b/accessibility/urls.py new file mode 100644 index 0000000..874bfad --- /dev/null +++ b/accessibility/urls.py @@ -0,0 +1,20 @@ +""" +accessibility/urls.py +""" + +from django.urls import path + +from accessibility import views as accessibility + +urlpatterns = [ + path( + "user-accessibility/", + accessibility.user_accessibility, + name="user-accessibility", + ), + path( + "get-initial-accessibility-data", + accessibility.get_accessibility_data, + name="get-initial-accessibility-data", + ), +] diff --git a/accessibility/views.py b/accessibility/views.py new file mode 100644 index 0000000..861ebd1 --- /dev/null +++ b/accessibility/views.py @@ -0,0 +1,63 @@ +""" +employee/accessibility.py + +Employee accessibility related methods and functionalites +""" + +from django.contrib import messages +from django.http import HttpResponse, JsonResponse +from django.shortcuts import render +from django.utils.translation import gettext_lazy as _ + +from accessibility.accessibility import ACCESSBILITY_FEATURE +from accessibility.filters import AccessibilityFilter +from accessibility.models import DefaultAccessibility +from horilla.decorators import login_required, permission_required + + +@login_required +@permission_required("auth.change_permission") +def user_accessibility(request): + """ + User accessibility method + """ + if request.POST: + feature = request.POST["feature"] + accessibility = DefaultAccessibility.objects.filter(feature=feature).first() + accessibility = accessibility if accessibility else DefaultAccessibility() + accessibility.feature = feature + accessibility.filter = dict(request.POST) + accessibility.exclude_all = bool(request.POST.get("exclude_all")) + accessibility.save() + employees = AccessibilityFilter(data=accessibility.filter).qs + accessibility.employees.set(employees) + + if len(request.POST.keys()) > 1: + messages.success(request, _("Accessibility filter saved")) + else: + messages.info(request, _("All filter cleared")) + + return HttpResponse("") + + accessibility_filter = AccessibilityFilter() + return render( + request, + "accessibility/accessibility.html", + { + "accessibility": ACCESSBILITY_FEATURE, + "accessibility_filter": accessibility_filter, + }, + ) + + +@login_required +@permission_required("auth.change_permission") +def get_accessibility_data(request): + """ + Save accessibility filter method + """ + feature = request.GET["feature"] + accessibility = DefaultAccessibility.objects.filter(feature=feature).first() + if not accessibility: + return JsonResponse("", safe=False) + return JsonResponse(accessibility.filter)