[ADD] HORILLA_API: Helpdesk api and swagger setup

This commit is contained in:
Horilla
2026-01-05 17:34:08 +05:30
parent 3cbf7f6e0e
commit a67077e23e
22 changed files with 1097 additions and 87 deletions

View File

@@ -73,7 +73,7 @@ If endpoints are not appearing in documentation:
- Ensure the URL is included in the `patterns` list in `horilla_api/urls.py`
- Check that the view is using proper decorators
- Verify the serializer is correctly defined
If you see a Basic authorization option in Swagger:
- Clear your browser cache and refresh `/api/swagger/`
- Confirm `SWAGGER_SETTINGS` only contains the `Bearer` scheme
- Confirm `SWAGGER_SETTINGS` only contains the `Bearer` scheme

View File

@@ -3,5 +3,6 @@ from horilla.settings import INSTALLED_APPS
INSTALLED_APPS.append("geofencing")
INSTALLED_APPS.append("facedetection")
# Import Swagger settings to ensure they're applied
from . import swagger_settings

View File

@@ -2,6 +2,13 @@ from functools import wraps
from django.http import HttpResponseForbidden
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _
from rest_framework import status
from rest_framework.response import Response
from accessibility.methods import check_is_accessible
from horilla.horilla_middlewares import _thread_locals
from horilla_views.cbv_methods import decorator_with_arguments
def or_condition(*decorators):
@@ -24,3 +31,47 @@ def or_condition(*decorators):
return _wrapped_view
return decorator
@decorator_with_arguments
def enter_if_accessible(function, feature, perm=None, method=None):
"""
Accessibility check decorator for API views (DRF APIView)
Returns proper API responses instead of redirects
"""
def check_accessible(self, *args, **kwargs):
"""
Check accessible
"""
request = getattr(_thread_locals, "request")
if not getattr(self, "request", None):
self.request = request
accessible = False
session_key = getattr(request.session, "session_key", None)
if session_key:
cache_key = session_key + "accessibility_filter"
employee = getattr(request.user, "employee_get", None)
if employee:
accessible = check_is_accessible(feature, cache_key, employee)
has_perm = True
if perm:
has_perm = request.user.has_perm(perm)
method_result = False
if method:
try:
method_result = method(request, *args, **kwargs)
except Exception:
method_result = False
if accessible or has_perm or method_result:
return function(self, *args, **kwargs)
return Response(
{"error": _("You dont have access to the feature")},
status=status.HTTP_403_FORBIDDEN,
)
return check_accessible

View File

@@ -46,6 +46,10 @@ def groupby_queryset(request, url, field_name, queryset):
def permission_based_queryset(user, perm, queryset, user_obj=None):
# Handle AnonymousUser during schema generation
if not user.is_authenticated:
return queryset.none()
if user.has_perm(perm):
return queryset

View File

@@ -0,0 +1,4 @@
"""
horilla_api/api_serializers/helpdesk/__init__.py
"""

View File

@@ -0,0 +1,205 @@
"""
horilla_api/api_serializers/helpdesk/serializers.py
"""
from rest_framework import serializers
from helpdesk.models import (
Ticket,
TicketType,
FAQ,
FAQCategory,
Comment,
Attachment,
ClaimRequest,
DepartmentManager,
)
from employee.models import Employee
from base.models import Tags
class TicketTypeSerializer(serializers.ModelSerializer):
class Meta:
model = TicketType
fields = "__all__"
class FAQCategorySerializer(serializers.ModelSerializer):
class Meta:
model = FAQCategory
fields = "__all__"
class FAQSerializer(serializers.ModelSerializer):
category = FAQCategorySerializer(read_only=True)
category_id = serializers.PrimaryKeyRelatedField(
queryset=FAQCategory.objects.all(), source="category", write_only=True
)
tags = serializers.PrimaryKeyRelatedField(
queryset=Tags.objects.all(), many=True, required=False
)
class Meta:
model = FAQ
fields = "__all__"
class DepartmentManagerSerializer(serializers.ModelSerializer):
manager = serializers.SerializerMethodField()
department = serializers.SerializerMethodField()
class Meta:
model = DepartmentManager
fields = "__all__"
def get_manager(self, obj):
if obj.manager:
return {
"id": obj.manager.id,
"employee_first_name": obj.manager.employee_first_name,
"employee_last_name": obj.manager.employee_last_name,
"get_full_name": obj.manager.get_full_name(),
}
return None
def get_department(self, obj):
if obj.department:
return {
"id": obj.department.id,
"department": obj.department.department,
}
return None
class CommentSerializer(serializers.ModelSerializer):
employee_id = serializers.SerializerMethodField()
employee_id_write = serializers.PrimaryKeyRelatedField(
queryset=Employee.objects.all(), source="employee_id", write_only=True, required=False
)
ticket_id = serializers.PrimaryKeyRelatedField(
queryset=Ticket.objects.all(), source="ticket", write_only=True
)
class Meta:
model = Comment
fields = "__all__"
def get_employee_id(self, obj):
if obj.employee_id:
return {
"id": obj.employee_id.id,
"employee_first_name": obj.employee_id.employee_first_name,
"employee_last_name": obj.employee_id.employee_last_name,
"get_full_name": obj.employee_id.get_full_name(),
}
return None
class AttachmentSerializer(serializers.ModelSerializer):
ticket_id = serializers.PrimaryKeyRelatedField(
queryset=Ticket.objects.all(), source="ticket", write_only=True, required=False
)
comment_id = serializers.PrimaryKeyRelatedField(
queryset=Comment.objects.all(), source="comment", write_only=True, required=False
)
class Meta:
model = Attachment
fields = "__all__"
class ClaimRequestSerializer(serializers.ModelSerializer):
ticket_id = serializers.SerializerMethodField()
ticket_id_write = serializers.PrimaryKeyRelatedField(
queryset=Ticket.objects.all(), source="ticket_id", write_only=True
)
employee_id = serializers.SerializerMethodField()
employee_id_write = serializers.PrimaryKeyRelatedField(
queryset=Employee.objects.all(), source="employee_id", write_only=True
)
class Meta:
model = ClaimRequest
fields = "__all__"
def get_ticket_id(self, obj):
if obj.ticket_id:
return {
"id": obj.ticket_id.id,
"title": obj.ticket_id.title,
}
return None
def get_employee_id(self, obj):
if obj.employee_id:
return {
"id": obj.employee_id.id,
"employee_first_name": obj.employee_id.employee_first_name,
"employee_last_name": obj.employee_id.employee_last_name,
"get_full_name": obj.employee_id.get_full_name(),
}
return None
class TicketSerializer(serializers.ModelSerializer):
employee_id = serializers.SerializerMethodField()
employee_id_write = serializers.PrimaryKeyRelatedField(
queryset=Employee.objects.all(), source="employee_id", write_only=True, required=False
)
ticket_type = TicketTypeSerializer(read_only=True)
ticket_type_id = serializers.PrimaryKeyRelatedField(
queryset=TicketType.objects.all(), source="ticket_type", write_only=True
)
assigned_to = serializers.SerializerMethodField()
assigned_to_ids = serializers.PrimaryKeyRelatedField(
queryset=Employee.objects.all(), many=True, source="assigned_to", write_only=True, required=False
)
tags = serializers.SerializerMethodField()
tags_ids = serializers.PrimaryKeyRelatedField(
queryset=Tags.objects.all(), many=True, source="tags", write_only=True, required=False
)
raised_on_display = serializers.SerializerMethodField()
comments = CommentSerializer(many=True, read_only=True)
attachments = AttachmentSerializer(many=True, read_only=True)
class Meta:
model = Ticket
fields = "__all__"
extra_kwargs = {
'assigned_to': {'read_only': True},
'tags': {'read_only': True},
}
def get_employee_id(self, obj):
if obj.employee_id:
return {
"id": obj.employee_id.id,
"employee_first_name": obj.employee_id.employee_first_name,
"employee_last_name": obj.employee_id.employee_last_name,
"get_full_name": obj.employee_id.get_full_name(),
}
return None
def get_assigned_to(self, obj):
if obj.assigned_to.exists():
return [
{
"id": emp.id,
"employee_first_name": emp.employee_first_name,
"employee_last_name": emp.employee_last_name,
"get_full_name": emp.get_full_name(),
}
for emp in obj.assigned_to.all()
]
return []
def get_tags(self, obj):
if obj.tags.exists():
return [{"id": tag.id, "name": tag.name} for tag in obj.tags.all()]
return []
def get_raised_on_display(self, obj):
try:
return obj.get_raised_on()
except:
return obj.raised_on

View File

@@ -0,0 +1,4 @@
"""
horilla_api/api_urls/helpdesk/__init__.py
"""

View File

@@ -0,0 +1,47 @@
"""
horilla_api/api_urls/helpdesk/urls.py
"""
from django.urls import path
from horilla_api.api_views.helpdesk.views import *
urlpatterns = [
# Ticket Type URLs
path("ticket-type/", TicketTypeGetCreateAPIView.as_view()),
path("ticket-type/<int:pk>/", TicketTypeGetUpdateDeleteAPIView.as_view()),
# FAQ Category URLs
path("faq-category/", FAQCategoryGetCreateAPIView.as_view()),
path("faq-category/<int:pk>/", FAQCategoryGetUpdateDeleteAPIView.as_view()),
# FAQ URLs
path("faq/", FAQGetCreateAPIView.as_view()),
path("faq/<int:pk>/", FAQGetUpdateDeleteAPIView.as_view()),
path("faq/category/<int:category_id>/", FAQGetCreateAPIView.as_view()),
# Ticket URLs
path("ticket/", TicketGetCreateAPIView.as_view()),
path("ticket/<int:pk>/", TicketGetUpdateDeleteAPIView.as_view()),
path("ticket/<int:pk>/status/", TicketChangeStatusAPIView.as_view()),
path("ticket/<int:pk>/archive/", TicketArchiveAPIView.as_view()),
# Comment URLs
path("ticket/<int:ticket_id>/comment/", CommentGetCreateAPIView.as_view()),
path("comment/<int:pk>/", CommentGetUpdateDeleteAPIView.as_view()),
# Attachment URLs
path("ticket/<int:ticket_id>/attachment/", AttachmentGetCreateAPIView.as_view()),
path("comment/<int:comment_id>/attachment/", AttachmentGetCreateAPIView.as_view()),
path("attachment/<int:pk>/", AttachmentGetDeleteAPIView.as_view()),
# Claim Request URLs
path("claim-request/", ClaimRequestGetCreateAPIView.as_view()),
path("ticket/<int:ticket_id>/claim-request/", ClaimRequestGetCreateAPIView.as_view()),
path("claim-request/<int:pk>/approve/", ClaimRequestApproveAPIView.as_view()),
path("claim-request/<int:pk>/reject/", ClaimRequestRejectAPIView.as_view()),
# Department Manager URLs
path("department-manager/", DepartmentManagerGetCreateAPIView.as_view()),
path("department-manager/<int:pk>/", DepartmentManagerGetUpdateDeleteAPIView.as_view()),
]

View File

@@ -196,7 +196,10 @@ class AttendanceView(APIView):
permission_classes = [IsAuthenticated]
filterset_class = AttendanceFilters
def get_queryset(self, request, type):
def get_queryset(self, request=None, type=None):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False) or request is None:
return Attendance.objects.none()
if type == "ot":
condition = AttendanceValidationCondition.objects.first()
@@ -758,7 +761,7 @@ class OfflineEmployeesCountView(APIView):
class OfflineEmployeesListView(APIView):
"""
Li sts active employees who have not clocked in today, including their leave status.
Lists active employees who have not clocked in today, including their leave status.
"""
permission_classes = [IsAuthenticated]

View File

@@ -1,23 +1,23 @@
"""
Example API view with documentation
"""
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from horilla_api.docs import document_api
class ExampleDocumentedView(APIView):
"""
Example view demonstrating API documentation
"""
@document_api(
operation_description="Get API documentation example",
responses={
200: "Example response with documentation",
404: "Not found error"
},
tags=['Documentation Example']
responses={200: "Example response with documentation", 404: "Not found error"},
tags=["Documentation Example"],
)
def get(self, request):
"""
@@ -25,5 +25,5 @@ class ExampleDocumentedView(APIView):
"""
return Response(
{"message": "API documentation is working correctly"},
status=status.HTTP_200_OK
)
status=status.HTTP_200_OK,
)

View File

@@ -340,7 +340,10 @@ class WorkTypeRequestView(APIView):
filterset_class = WorkTypeRequestFilter
permission_classes = [IsAuthenticated]
def get_queryset(self, request):
def get_queryset(self, request=None):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False) or request is None:
return WorkTypeRequest.objects.none()
queryset = WorkTypeRequest.objects.all()
user = request.user
# checking user level permissions
@@ -919,7 +922,10 @@ class ShiftRequestView(APIView):
filterset_class = ShiftRequestFilter
permission_classes = [IsAuthenticated]
def get_queryset(self, request):
def get_queryset(self, request=None):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False) or request is None:
return ShiftRequest.objects.none()
queryset = ShiftRequest.objects.all()
user = request.user
# checking user level permissions

View File

@@ -250,8 +250,14 @@ class EmployeeBankDetailsAPIView(APIView):
permission_classes = [IsAuthenticated]
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return EmployeeBankDetails.objects.none()
queryset = EmployeeBankDetails.objects.all()
user = self.request.user
# Handle AnonymousUser during schema generation
if not user.is_authenticated:
return EmployeeBankDetails.objects.none()
# checking user level permissions
perm = "base.view_employeebankdetails"
queryset = permission_based_queryset(user, perm, queryset)

View File

@@ -0,0 +1,4 @@
"""
horilla_api/api_views/helpdesk/__init__.py
"""

View File

@@ -0,0 +1,574 @@
"""
horilla_api/api_views/helpdesk/views.py
"""
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from rest_framework.pagination import PageNumberPagination
from django_filters.rest_framework import DjangoFilterBackend
from django.shortcuts import get_object_or_404
from django.http import Http404
from django.contrib.auth.models import AnonymousUser
from django.db.models import Q
from horilla_api.api_serializers.helpdesk.serializers import (
TicketSerializer,
TicketTypeSerializer,
FAQSerializer,
FAQCategorySerializer,
CommentSerializer,
AttachmentSerializer,
ClaimRequestSerializer,
DepartmentManagerSerializer,
)
from helpdesk.models import (
Ticket,
TicketType,
FAQ,
FAQCategory,
Comment,
Attachment,
ClaimRequest,
DepartmentManager,
)
from helpdesk.filter import TicketFilter, FAQFilter, FAQCategoryFilter
from ...api_methods.base.methods import groupby_queryset, permission_based_queryset
from ...api_decorators.base.decorators import (
manager_permission_required,
permission_required,
)
from base.methods import filtersubordinates
def object_check(cls, pk):
try:
obj = cls.objects.get(id=pk)
return obj
except cls.DoesNotExist:
return None
# Ticket Type Views
class TicketTypeGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend]
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, 'swagger_fake_view', False):
return TicketType.objects.none()
return TicketType.objects.all()
def get(self, request):
ticket_types = self.get_queryset()
paginator = PageNumberPagination()
page = paginator.paginate_queryset(ticket_types, request)
serializer = TicketTypeSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
@permission_required("helpdesk.add_tickettype")
def post(self, request):
serializer = TicketTypeSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class TicketTypeGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
ticket_type = object_check(TicketType, pk)
if ticket_type is None:
return Response({"error": "TicketType not found"}, status=404)
serializer = TicketTypeSerializer(ticket_type)
return Response(serializer.data, status=200)
@permission_required("helpdesk.change_tickettype")
def put(self, request, pk):
ticket_type = object_check(TicketType, pk)
if ticket_type is None:
return Response({"error": "TicketType not found"}, status=404)
serializer = TicketTypeSerializer(ticket_type, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
@permission_required("helpdesk.delete_tickettype")
def delete(self, request, pk):
ticket_type = object_check(TicketType, pk)
if ticket_type is None:
return Response({"error": "TicketType not found"}, status=404)
try:
ticket_type.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
# FAQ Category Views
class FAQCategoryGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend]
filterset_class = FAQCategoryFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, 'swagger_fake_view', False):
return FAQCategory.objects.none()
return FAQCategory.objects.all()
def get(self, request):
faq_categories = self.get_queryset()
filterset = self.filterset_class(request.GET, queryset=faq_categories)
paginator = PageNumberPagination()
page = paginator.paginate_queryset(filterset.qs, request)
serializer = FAQCategorySerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
@permission_required("helpdesk.add_faqcategory")
def post(self, request):
serializer = FAQCategorySerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class FAQCategoryGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
faq_category = object_check(FAQCategory, pk)
if faq_category is None:
return Response({"error": "FAQCategory not found"}, status=404)
serializer = FAQCategorySerializer(faq_category)
return Response(serializer.data, status=200)
@permission_required("helpdesk.change_faqcategory")
def put(self, request, pk):
faq_category = object_check(FAQCategory, pk)
if faq_category is None:
return Response({"error": "FAQCategory not found"}, status=404)
serializer = FAQCategorySerializer(faq_category, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
@permission_required("helpdesk.delete_faqcategory")
def delete(self, request, pk):
faq_category = object_check(FAQCategory, pk)
if faq_category is None:
return Response({"error": "FAQCategory not found"}, status=404)
try:
faq_category.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
# FAQ Views
class FAQGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend]
filterset_class = FAQFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, 'swagger_fake_view', False):
return FAQ.objects.none()
return FAQ.objects.all()
def get(self, request, category_id=None):
if category_id:
faqs = FAQ.objects.filter(category_id=category_id)
else:
faqs = self.get_queryset()
filterset = self.filterset_class(request.GET, queryset=faqs)
paginator = PageNumberPagination()
page = paginator.paginate_queryset(filterset.qs, request)
serializer = FAQSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
@permission_required("helpdesk.add_faq")
def post(self, request):
serializer = FAQSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class FAQGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
faq = object_check(FAQ, pk)
if faq is None:
return Response({"error": "FAQ not found"}, status=404)
serializer = FAQSerializer(faq)
return Response(serializer.data, status=200)
@permission_required("helpdesk.change_faq")
def put(self, request, pk):
faq = object_check(FAQ, pk)
if faq is None:
return Response({"error": "FAQ not found"}, status=404)
serializer = FAQSerializer(faq, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
@permission_required("helpdesk.delete_faq")
def delete(self, request, pk):
faq = object_check(FAQ, pk)
if faq is None:
return Response({"error": "FAQ not found"}, status=404)
try:
faq.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
# Ticket Views
class TicketGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend]
filterset_class = TicketFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, 'swagger_fake_view', False):
return Ticket.objects.none()
if not self.request.user.is_authenticated:
return Ticket.objects.none()
user = self.request.user
perm = "helpdesk.view_ticket"
queryset = Ticket.objects.all()
queryset = permission_based_queryset(user, perm, queryset, user_obj=True)
return queryset
def get(self, request):
tickets = self.get_queryset()
filterset = self.filterset_class(request.GET, queryset=tickets)
field_name = request.GET.get("groupby_field", None)
if field_name:
url = request.build_absolute_uri()
return groupby_queryset(request, url, field_name, filterset.qs)
paginator = PageNumberPagination()
page = paginator.paginate_queryset(filterset.qs, request)
serializer = TicketSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
def post(self, request):
# Set employee_id from request user if not provided
data = request.data.copy()
if not data.get("employee_id_write") and not data.get("employee_id") and request.user.is_authenticated:
data["employee_id_write"] = request.user.employee_get.id
serializer = TicketSerializer(data=data)
if serializer.is_valid():
ticket = serializer.save()
return Response(TicketSerializer(ticket).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class TicketGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
ticket = object_check(Ticket, pk)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
# Check permissions
user = request.user
if not (user.has_perm("helpdesk.view_ticket") or
ticket.employee_id == user.employee_get or
ticket.assigned_to.filter(id=user.employee_get.id).exists()):
return Response({"error": "Permission denied"}, status=403)
serializer = TicketSerializer(ticket)
return Response(serializer.data, status=200)
@permission_required("helpdesk.change_ticket")
def put(self, request, pk):
ticket = object_check(Ticket, pk)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
serializer = TicketSerializer(ticket, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
@permission_required("helpdesk.delete_ticket")
def delete(self, request, pk):
ticket = object_check(Ticket, pk)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
try:
ticket.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
class TicketChangeStatusAPIView(APIView):
permission_classes = [IsAuthenticated]
@permission_required("helpdesk.change_ticket")
def put(self, request, pk):
ticket = object_check(Ticket, pk)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
status_value = request.data.get("status")
if status_value not in [choice[0] for choice in Ticket._meta.get_field("status").choices]:
return Response({"error": "Invalid status"}, status=400)
ticket.status = status_value
if status_value == "resolved":
from datetime import date
ticket.resolved_date = date.today()
ticket.save()
return Response(TicketSerializer(ticket).data, status=200)
class TicketArchiveAPIView(APIView):
permission_classes = [IsAuthenticated]
@permission_required("helpdesk.change_ticket")
def put(self, request, pk):
ticket = object_check(Ticket, pk)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
ticket.is_active = False
ticket.save()
return Response(TicketSerializer(ticket).data, status=200)
# Comment Views
class CommentGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, ticket_id):
ticket = object_check(Ticket, ticket_id)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
comments = Comment.objects.filter(ticket_id=ticket_id)
paginator = PageNumberPagination()
page = paginator.paginate_queryset(comments, request)
serializer = CommentSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
def post(self, request, ticket_id):
ticket = object_check(Ticket, ticket_id)
if ticket is None:
return Response({"error": "Ticket not found"}, status=404)
data = request.data.copy()
data["ticket_id"] = ticket_id
if not data.get("employee_id_write") and not data.get("employee_id") and request.user.is_authenticated:
data["employee_id_write"] = request.user.employee_get.id
serializer = CommentSerializer(data=data)
if serializer.is_valid():
comment = serializer.save()
return Response(CommentSerializer(comment).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class CommentGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
comment = object_check(Comment, pk)
if comment is None:
return Response({"error": "Comment not found"}, status=404)
serializer = CommentSerializer(comment)
return Response(serializer.data, status=200)
def put(self, request, pk):
comment = object_check(Comment, pk)
if comment is None:
return Response({"error": "Comment not found"}, status=404)
# Check if user owns the comment
if comment.employee_id != request.user.employee_get:
return Response({"error": "Permission denied"}, status=403)
serializer = CommentSerializer(comment, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
def delete(self, request, pk):
comment = object_check(Comment, pk)
if comment is None:
return Response({"error": "Comment not found"}, status=404)
# Check if user owns the comment or has permission
if comment.employee_id != request.user.employee_get and not request.user.has_perm("helpdesk.delete_comment"):
return Response({"error": "Permission denied"}, status=403)
try:
comment.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
# Attachment Views
class AttachmentGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, ticket_id=None, comment_id=None):
if ticket_id:
attachments = Attachment.objects.filter(ticket_id=ticket_id)
elif comment_id:
attachments = Attachment.objects.filter(comment_id=comment_id)
else:
return Response({"error": "ticket_id or comment_id required"}, status=400)
paginator = PageNumberPagination()
page = paginator.paginate_queryset(attachments, request)
serializer = AttachmentSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
def post(self, request):
serializer = AttachmentSerializer(data=request.data)
if serializer.is_valid():
attachment = serializer.save()
return Response(AttachmentSerializer(attachment).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class AttachmentGetDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
attachment = object_check(Attachment, pk)
if attachment is None:
return Response({"error": "Attachment not found"}, status=404)
serializer = AttachmentSerializer(attachment)
return Response(serializer.data, status=200)
def delete(self, request, pk):
attachment = object_check(Attachment, pk)
if attachment is None:
return Response({"error": "Attachment not found"}, status=404)
try:
attachment.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)
# Claim Request Views
class ClaimRequestGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, ticket_id=None):
if ticket_id:
claim_requests = ClaimRequest.objects.filter(ticket_id=ticket_id)
else:
claim_requests = ClaimRequest.objects.all()
paginator = PageNumberPagination()
page = paginator.paginate_queryset(claim_requests, request)
serializer = ClaimRequestSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
def post(self, request):
serializer = ClaimRequestSerializer(data=request.data)
if serializer.is_valid():
claim_request = serializer.save()
return Response(ClaimRequestSerializer(claim_request).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ClaimRequestApproveAPIView(APIView):
permission_classes = [IsAuthenticated]
@permission_required("helpdesk.change_ticket")
def put(self, request, pk):
claim_request = object_check(ClaimRequest, pk)
if claim_request is None:
return Response({"error": "ClaimRequest not found"}, status=404)
claim_request.is_approved = True
claim_request.is_rejected = False
claim_request.save()
# Add employee to assigned_to
ticket = claim_request.ticket_id
ticket.assigned_to.add(claim_request.employee_id)
return Response(ClaimRequestSerializer(claim_request).data, status=200)
class ClaimRequestRejectAPIView(APIView):
permission_classes = [IsAuthenticated]
@permission_required("helpdesk.change_ticket")
def put(self, request, pk):
claim_request = object_check(ClaimRequest, pk)
if claim_request is None:
return Response({"error": "ClaimRequest not found"}, status=404)
claim_request.is_approved = False
claim_request.is_rejected = True
claim_request.save()
return Response(ClaimRequestSerializer(claim_request).data, status=200)
# Department Manager Views
class DepartmentManagerGetCreateAPIView(APIView):
permission_classes = [IsAuthenticated]
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, 'swagger_fake_view', False):
return DepartmentManager.objects.none()
return DepartmentManager.objects.all()
def get(self, request):
department_managers = self.get_queryset()
paginator = PageNumberPagination()
page = paginator.paginate_queryset(department_managers, request)
serializer = DepartmentManagerSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
@permission_required("helpdesk.add_departmentmanager")
def post(self, request):
serializer = DepartmentManagerSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class DepartmentManagerGetUpdateDeleteAPIView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, pk):
department_manager = object_check(DepartmentManager, pk)
if department_manager is None:
return Response({"error": "DepartmentManager not found"}, status=404)
serializer = DepartmentManagerSerializer(department_manager)
return Response(serializer.data, status=200)
@permission_required("helpdesk.change_departmentmanager")
def put(self, request, pk):
department_manager = object_check(DepartmentManager, pk)
if department_manager is None:
return Response({"error": "DepartmentManager not found"}, status=404)
serializer = DepartmentManagerSerializer(department_manager, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=200)
return Response(serializer.errors, status=400)
@permission_required("helpdesk.delete_departmentmanager")
def delete(self, request, pk):
department_manager = object_check(DepartmentManager, pk)
if department_manager is None:
return Response({"error": "DepartmentManager not found"}, status=404)
try:
department_manager.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({"error": str(e)}, status=400)

View File

@@ -15,7 +15,7 @@ from base.methods import filtersubordinates
from horilla_api.api_serializers.leave.serializers import *
from leave.filters import *
from leave.methods import filter_conditional_leave_request
from leave.models import LeaveRequest
from leave.models import AvailableLeave, LeaveAllocationRequest, LeaveRequest, LeaveType
from notifications.signals import notify
from ...api_decorators.base.decorators import manager_permission_required
@@ -39,6 +39,15 @@ class EmployeeLeaveRequestGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = UserLeaveRequestFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return LeaveRequest.objects.none()
if not self.request.user.is_authenticated:
return LeaveRequest.objects.none()
employee = self.request.user.employee_get
return employee.leaverequest_set.all().order_by("-id")
def get(self, request):
employee = request.user.employee_get
leave_request = employee.leaverequest_set.all().order_by("-id")
@@ -143,6 +152,12 @@ class LeaveTypeGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = LeaveTypeFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return LeaveType.objects.none()
return LeaveType.objects.all()
# @method_decorator(permission_required('leave.view_leavetype', raise_exception=True), name='dispatch')
def get(self, request):
leave_type = LeaveType.objects.all()
@@ -209,6 +224,17 @@ class LeaveAllocationRequestGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = LeaveAllocationRequestFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return LeaveAllocationRequest.objects.none()
if not self.request.user.is_authenticated:
return LeaveAllocationRequest.objects.none()
allocation_requests = LeaveAllocationRequest.objects.all().order_by("-id")
return filtersubordinates(
self.request, allocation_requests, "leave.view_leaveallocationrequest"
)
def get_user(self, request):
user = request.user
if isinstance(user, AnonymousUser):
@@ -305,6 +331,17 @@ class AssignLeaveGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = AssignedLeaveFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return AvailableLeave.objects.none()
if not self.request.user.is_authenticated:
return AvailableLeave.objects.none()
available_leave = AvailableLeave.objects.all().order_by("-id")
return filtersubordinates(
self.request, available_leave, "leave.view_availableleave"
)
@method_decorator(
permission_required("leave.view_availableleave", raise_exception=True),
name="dispatch",
@@ -405,6 +442,19 @@ class LeaveRequestGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = LeaveRequestFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return LeaveRequest.objects.none()
if not self.request.user.is_authenticated:
return LeaveRequest.objects.none()
leave_request = LeaveRequest.objects.all().order_by("-id")
multiple_approvals = filter_conditional_leave_request(self.request)
return (
filtersubordinates(self.request, leave_request, "leave.view_leaverequest")
| multiple_approvals
)
@manager_permission_required("leave.view_leaverequest")
def get(self, request):
leave_request = LeaveRequest.objects.all().order_by("-id")
@@ -921,6 +971,15 @@ class EmployeeLeaveAllocationGetCreateAPIView(APIView):
filter_backends = [DjangoFilterBackend]
filterset_class = LeaveAllocationRequestFilter
def get_queryset(self):
# Handle schema generation for DRF-YASG
if getattr(self, "swagger_fake_view", False):
return LeaveAllocationRequest.objects.none()
if not self.request.user.is_authenticated:
return LeaveAllocationRequest.objects.none()
employee = self.request.user.employee_get
return employee.leaveallocationrequest_set.all().order_by("-id")
def get_user(self, request):
user = request.user
if isinstance(user, AnonymousUser):

View File

@@ -1,30 +1,33 @@
"""
Authentication utilities for the API
"""
from rest_framework import authentication
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_simplejwt.authentication import JWTAuthentication
class SwaggerAuthentication(authentication.BaseAuthentication):
"""
Custom authentication class for Swagger UI
"""
def authenticate(self, request):
# Get the authentication header
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
# Try JWT authentication first
if auth_header.startswith('Bearer '):
if auth_header.startswith("Bearer "):
jwt_auth = JWTAuthentication()
try:
return jwt_auth.authenticate(request)
except:
pass
# Fall back to session authentication
if request.user and request.user.is_authenticated:
return (request.user, None)
return None
@@ -32,8 +35,11 @@ class RejectBasicAuthentication(authentication.BaseAuthentication):
"""
Explicitly reject HTTP Basic Auth across the API with a clear error message.
"""
def authenticate(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Basic '):
raise AuthenticationFailed('Basic authentication is disabled. Use Bearer token (JWT) in the Authorization header.')
return None
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
if auth_header.startswith("Basic "):
raise AuthenticationFailed(
"Basic authentication is disabled. Use Bearer token (JWT) in the Authorization header."
)
return None

View File

@@ -1,20 +1,25 @@
"""
Decorators for API views
"""
from functools import wraps
from rest_framework.response import Response
from rest_framework import status
from rest_framework.response import Response
def api_authentication_required(view_func):
"""
Decorator to ensure API views require authentication
"""
@wraps(view_func)
def wrapped_view(request, *args, **kwargs):
if not request.user.is_authenticated:
return Response(
{"detail": "Authentication credentials were not provided."},
status=status.HTTP_401_UNAUTHORIZED
status=status.HTTP_401_UNAUTHORIZED,
)
return view_func(request, *args, **kwargs)
return wrapped_view
return wrapped_view

View File

@@ -1,45 +1,54 @@
"""
Documentation helpers for API views
"""
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from rest_framework import authentication
# Module tags for organizing endpoints
MODULE_TAGS = {
'auth': 'Authentication',
'asset': 'Asset Management',
'base': 'Base',
'employee': 'Employee Management',
'notifications': 'Notifications',
'payroll': 'Payroll',
'attendance': 'Attendance',
'leave': 'Leave Management',
"auth": "Authentication",
"asset": "Asset Management",
"base": "Base",
"employee": "Employee Management",
"notifications": "Notifications",
"payroll": "Payroll",
"attendance": "Attendance",
"leave": "Leave Management",
}
# Common response schemas
error_response = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'error': openapi.Schema(type=openapi.TYPE_STRING),
'detail': openapi.Schema(type=openapi.TYPE_STRING),
}
"error": openapi.Schema(type=openapi.TYPE_STRING),
"detail": openapi.Schema(type=openapi.TYPE_STRING),
},
)
success_response = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'success': openapi.Schema(type=openapi.TYPE_BOOLEAN),
'message': openapi.Schema(type=openapi.TYPE_STRING),
}
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
},
)
# Common parameters
pagination_params = [
openapi.Parameter('page', openapi.IN_QUERY, description="Page number", type=openapi.TYPE_INTEGER),
openapi.Parameter('page_size', openapi.IN_QUERY, description="Number of results per page", type=openapi.TYPE_INTEGER),
openapi.Parameter(
"page", openapi.IN_QUERY, description="Page number", type=openapi.TYPE_INTEGER
),
openapi.Parameter(
"page_size",
openapi.IN_QUERY,
description="Number of results per page",
type=openapi.TYPE_INTEGER,
),
]
def document_api(
operation_description=None,
request_body=None,
@@ -51,9 +60,9 @@ def document_api(
):
"""
Decorator for documenting API views with authentication
Example usage:
@document_api(
operation_description="List all employees",
responses={200: EmployeeSerializer(many=True)},
@@ -63,9 +72,9 @@ def document_api(
...
"""
# Add pagination parameters for list views
if manual_parameters is None and query_params == 'paginated':
if manual_parameters is None and query_params == "paginated":
manual_parameters = pagination_params
# Add common error responses
if responses and 400 not in responses:
responses[400] = error_response
@@ -73,10 +82,10 @@ def document_api(
responses[401] = error_response
if responses and 403 not in responses:
responses[403] = error_response
# Add security requirement (Bearer only)
security = [{'Bearer': []}]
security = [{"Bearer": []}]
return swagger_auto_schema(
operation_description=operation_description,
request_body=request_body,
@@ -85,4 +94,4 @@ def document_api(
tags=tags,
security=security,
**kwargs
)
)

View File

@@ -11,13 +11,13 @@ class RejectBasicAuthMiddleware:
self.get_response = get_response
def __call__(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if isinstance(auth_header, str) and auth_header.startswith('Basic '):
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
if isinstance(auth_header, str) and auth_header.startswith("Basic "):
return JsonResponse(
{
"error": "Basic authentication is disabled",
"detail": "Use Bearer token (JWT) in the Authorization header."
"detail": "Use Bearer token (JWT) in the Authorization header.",
},
status=401
status=401,
)
return self.get_response(request)
return self.get_response(request)

View File

@@ -1,15 +1,18 @@
"""
Schema configuration for API documentation
"""
from drf_yasg import openapi
from drf_yasg.generators import OpenAPISchemaGenerator
from drf_yasg.inspectors import SwaggerAutoSchema
from drf_yasg.utils import swagger_auto_schema
from drf_yasg.generators import OpenAPISchemaGenerator
class ModuleTaggingAutoSchema(SwaggerAutoSchema):
"""
Custom schema generator that automatically tags operations based on their module
"""
def get_tags(self, operation_keys):
# Extract module name from the operation keys
if len(operation_keys) > 1:
@@ -24,13 +27,23 @@ class OrderedTagSchemaGenerator(OpenAPISchemaGenerator):
Places 'auth' first, followed by remaining tags sorted alphabetically.
"""
def get_schema(self, request=None, public=False):
schema = super().get_schema(request=request, public=public)
# Collect all tag names used in operations
tag_names = set()
for path_item in schema.paths.values():
for method_name in ("get", "put", "post", "delete", "options", "head", "patch", "trace"):
for method_name in (
"get",
"put",
"post",
"delete",
"options",
"head",
"patch",
"trace",
):
operation = getattr(path_item, method_name, None)
if operation and getattr(operation, "tags", None):
for t in operation.tags:
@@ -44,12 +57,13 @@ class OrderedTagSchemaGenerator(OpenAPISchemaGenerator):
schema.tags = [{"name": name} for name in ordered_names]
return schema
def api_doc(**kwargs):
"""
Decorator for documenting API views
Example usage:
@api_doc(
responses={200: EmployeeSerializer(many=True)},
operation_description="List all employees",
@@ -60,27 +74,35 @@ def api_doc(**kwargs):
"""
return swagger_auto_schema(**kwargs)
# Common response schemas
error_response = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'error': openapi.Schema(type=openapi.TYPE_STRING),
'detail': openapi.Schema(type=openapi.TYPE_STRING),
}
"error": openapi.Schema(type=openapi.TYPE_STRING),
"detail": openapi.Schema(type=openapi.TYPE_STRING),
},
)
success_response = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'success': openapi.Schema(type=openapi.TYPE_BOOLEAN),
'message': openapi.Schema(type=openapi.TYPE_STRING),
}
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
},
)
# Common parameters
pagination_params = [
openapi.Parameter('page', openapi.IN_QUERY, description="Page number", type=openapi.TYPE_INTEGER),
openapi.Parameter('page_size', openapi.IN_QUERY, description="Number of results per page", type=openapi.TYPE_INTEGER),
openapi.Parameter(
"page", openapi.IN_QUERY, description="Page number", type=openapi.TYPE_INTEGER
),
openapi.Parameter(
"page_size",
openapi.IN_QUERY,
description="Number of results per page",
type=openapi.TYPE_INTEGER,
),
]
# Security definitions are already configured in rest_conf.py
# Security definitions are already configured in rest_conf.py

View File

@@ -1,30 +1,29 @@
"""
Custom Swagger settings for the API
"""
from django.conf import settings
# Define security definitions for Swagger UI
SWAGGER_SETTINGS = {
'SECURITY_DEFINITIONS': {
'Bearer': {
'type': 'apiKey',
'name': 'Authorization',
'in': 'header',
'description': 'JWT Token Authentication: Enter your token with the "Bearer " prefix, e.g. "Bearer abcde12345"'
"SECURITY_DEFINITIONS": {
"Bearer": {
"type": "apiKey",
"name": "Authorization",
"in": "header",
"description": 'JWT Token Authentication: Enter your token with the "Bearer " prefix, e.g. "Bearer abcde12345"',
}
},
'USE_SESSION_AUTH': False,
'DEFAULT_UI_SETTINGS': {
"USE_SESSION_AUTH": False,
"DEFAULT_UI_SETTINGS": {
# Keep tag order as defined in the generated spec
'tagsSorter': 'none'
"tagsSorter": "none"
},
'SECURITY_REQUIREMENTS': [
{'Bearer': []}
],
"SECURITY_REQUIREMENTS": [{"Bearer": []}],
}
# Apply settings
if hasattr(settings, 'SWAGGER_SETTINGS'):
if hasattr(settings, "SWAGGER_SETTINGS"):
settings.SWAGGER_SETTINGS.update(SWAGGER_SETTINGS)
else:
setattr(settings, 'SWAGGER_SETTINGS', SWAGGER_SETTINGS)
setattr(settings, "SWAGGER_SETTINGS", SWAGGER_SETTINGS)

View File

@@ -42,4 +42,5 @@ urlpatterns = [
path("payroll/", include("horilla_api.api_urls.payroll.urls")),
path("attendance/", include("horilla_api.api_urls.attendance.urls")),
path("leave/", include("horilla_api.api_urls.leave.urls")),
path("helpdesk/", include("horilla_api.api_urls.helpdesk.urls")),
]