""" views.py This module contains the view functions for handling HTTP requests and rendering responses in pms app. """ import datetime import json import logging from itertools import tee from urllib.parse import parse_qs, urlencode, urlparse from dateutil.relativedelta import relativedelta from django import forms from django.contrib import messages from django.contrib.auth.models import User from django.core.paginator import Paginator from django.db.models import ProtectedError, Q from django.db.utils import IntegrityError from django.forms import modelformset_factory from django.http import HttpResponse, HttpResponseRedirect, JsonResponse from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import render_to_string from django.urls import reverse from django.utils.translation import gettext_lazy as _ from base.methods import ( closest_numbers, eval_validate, get_key_instances, get_pagination, paginator_qry, sortby, ) from employee.models import Employee, EmployeeWorkInformation from horilla.decorators import ( hx_request_required, login_required, manager_can_enter, meeting_manager_can_enter, owner_can_enter, permission_required, ) from horilla.group_by import group_by_queryset from horilla_automations.methods.methods import generate_choices from horilla_automations.methods.serialize import serialize_form from notifications.signals import notify from pms.filters import ( ActualKeyResultFilter, ActualObjectiveFilter, AnonymousFeedbackFilter, EmployeeObjectiveFilter, FeedbackFilter, KeyResultFilter, MeetingsFilter, ObjectiveFilter, ObjectiveReGroup, ) from pms.forms import ( AddAssigneesForm, AnonymousFeedbackForm, EmployeeKeyResultForm, EmployeeObjectiveCreateForm, EmployeeObjectiveForm, FeedbackForm, KeyResultForm, KRForm, MeetingsForm, ObjectiveCommentForm, ObjectiveForm, PeriodForm, QuestionForm, QuestionTemplateForm, ) from pms.methods import ( check_permission_feedback_detailed_view, get_anonymous_feedbacks, pms_owner_and_manager_can_enter, ) from pms.models import ( AnonymousFeedback, Answer, BonusPointSetting, Comment, EmployeeBonusPoint, EmployeeKeyResult, EmployeeObjective, Feedback, KeyResult, KeyResultFeedback, Meetings, MeetingsAnswer, Objective, Period, Question, QuestionOptions, QuestionTemplate, ) logger = logging.getLogger(__name__) # objectives @login_required def objective_list_view(request): """ This view is used to show all the objectives and returns some objects. Returns: Objects based on userlevel. """ employee = request.user.employee_get template = "okr/okr_view.html" objective_own = EmployeeObjective.objects.filter( employee_id=employee, archive=False ) objective_own = objective_own.distinct() context = objective_filter_pagination(request, objective_own) return render(request, template, context) def obj_form_save(request, objective_form): """ This view is used to save objective form """ objective = objective_form.save() assignees = objective_form.cleaned_data["assignees"] start_date = objective_form.cleaned_data["start_date"] default_krs = objective_form.cleaned_data["key_result_id"] messages.success(request, _("Objective created")) if assignees: for emp in assignees: emp_objective = EmployeeObjective( objective_id=objective, employee_id=emp, start_date=start_date ) emp_objective.save() # assigning default key result if default_krs: for key in default_krs: emp_kr = EmployeeKeyResult( employee_objective_id=emp_objective, key_result_id=key, progress_type=key.progress_type, target_value=key.target_value, ) emp_kr.save() notify.send( request.user.employee_get, recipient=emp.employee_user_id, verb="You got an OKR!.", verb_ar="لقد حققت هدفًا ونتيجة رئيسية!", verb_de="Du hast ein Ziel-Key-Ergebnis erreicht!", verb_es="¡Has logrado un Resultado Clave de Objetivo!", verb_fr="Vous avez atteint un Résultat Clé d'Objectif !", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": objective.id} ), ) @login_required @manager_can_enter(perm="pms.add_employeeobjective") def objective_creation(request): """ This view is for objective creation , and returns a objective form. Returns: GET: objective form, period, department, job position, employee, department POST: Objective created, and returns to key result creation function """ employee = request.user.employee_get objective_form = ObjectiveForm(employee=employee) if request.GET.get("key_result_id") is not None: objective_form = ObjectiveForm(request.GET) if request.method == "POST": objective_form = ObjectiveForm(request.POST) if objective_form.is_valid(): obj_form_save(request, objective_form) return HttpResponse("") context = { "objective_form": objective_form, "p_form": PeriodForm(), "k_form": KRForm(), } return render(request, "okr/objective_creation.html", context=context) @login_required @hx_request_required @manager_can_enter(perm="pms.change_employeeobjective") def objective_update(request, obj_id): """ This view takes one arguments, id , and returns a HttpResponse object.,using htmx Args: id (int): Primary key of EmployeeObjective. Returns: A HttpResponse object with the content Form errors. """ instance = Objective.objects.get(id=obj_id) objective_form = ObjectiveForm(instance=instance) if request.GET.get("key_result_id") is not None: objective_form = ObjectiveForm(request.GET) if request.method == "POST": objective_form = ObjectiveForm(request.POST, instance=instance) if objective_form.is_valid(): objective = objective_form.save() assignees = objective_form.cleaned_data["assignees"] start_date = objective_form.cleaned_data["start_date"] default_krs = objective_form.cleaned_data["key_result_id"] new_emp = [assignee for assignee in assignees] delete_list = [] if objective.employee_objective.exists(): emp_objectives = objective.employee_objective.all() existing_emp = [emp.employee_id for emp in emp_objectives] delete_list = [ employee for employee in existing_emp if employee not in new_emp ] if len(delete_list) > 0: for emp in delete_list: EmployeeObjective.objects.filter( employee_id=emp, objective_id=objective ).delete() for emp in new_emp: if EmployeeObjective.objects.filter( employee_id=emp, objective_id=objective ).exists(): emp_obj = EmployeeObjective.objects.filter( employee_id=emp, objective_id=objective ).first() emp_obj.start_date = start_date else: emp_obj = EmployeeObjective( employee_id=emp, objective_id=objective, start_date=start_date ) emp_obj.save() # assiging default key result if default_krs: for key in default_krs: if not EmployeeKeyResult.objects.filter( employee_objective_id=emp_obj, key_result_id=key ).exists(): emp_kr = EmployeeKeyResult.objects.create( employee_objective_id=emp_obj, key_result_id=key, progress_type=key.progress_type, target_value=key.target_value, ) emp_kr.save() notify.send( request.user.employee_get, recipient=emp.employee_user_id, verb="You got an OKR!.", verb_ar="لقد حققت هدفًا ونتيجة رئيسية!", verb_de="Du hast ein Ziel-Key-Ergebnis erreicht!", verb_es="¡Has logrado un Resultado Clave de Objetivo!", verb_fr="Vous avez atteint un Résultat Clé d'Objectif !", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": objective.id} ), ) messages.success( request, _("Objective %(objective)s Updated") % {"objective": instance}, ) return HttpResponse("") context = {"objective_form": objective_form, "k_form": KRForm(), "update": True} return render(request, "okr/objective_creation.html", context) # key result @login_required @manager_can_enter("pms.view_keyresult") def view_key_result(request): """ This method is used render template to view all the key result instances """ krs = KeyResult.objects.all() krs_filter = ActualKeyResultFilter(request.GET) krs = paginator_qry(krs, request.GET.get("page")) krs_ids = json.dumps([instance.id for instance in krs.object_list]) context = { "krs": krs, "f": krs_filter, "krs_ids": krs_ids, } return render(request, "okr/key_result/view_kr.html", context) @login_required @hx_request_required @permission_required("pms.view_key_result") def filter_key_result(request): """ Filter and retrieve a list of key results based on the provided query parameters. """ query_string = request.GET.urlencode() krs = ActualKeyResultFilter(request.GET).qs template = "okr/key_result/kr_card.html" if request.GET.get("view") == "list": template = "okr/key_result/kr_list.html" krs = sortby(request, krs, "sortby") krs = paginator_qry(krs, request.GET.get("page")) allowance_ids = json.dumps([instance.id for instance in krs.object_list]) data_dict = parse_qs(query_string) get_key_instances(KeyResult, data_dict) return render( request, template, { "krs": krs, "pd": query_string, "filter_dict": data_dict, "krs_ids": allowance_ids, }, ) @login_required @hx_request_required @permission_required("pms.add_key_result") def key_result_create(request): """ This method renders form and template to create key result """ form = KRForm() redirect_url = request.GET.get("data") dataUrl = request.GET.get("dataUrl") if request.method == "POST": form = KRForm(request.POST) if form.is_valid(): instance = form.save() messages.success( request, _("Key result %(key_result)s created successfully") % {"key_result": instance}, ) mutable_get = request.GET.copy() key_result_ids = mutable_get.getlist("key_result_id", []) if "create_new_key_result" in key_result_ids: key_result_ids.remove("create_new_key_result") key_result_ids.append(str(instance.id)) mutable_get.setlist("key_result_id", key_result_ids) data = mutable_get.urlencode() redirect_url = f"/pms/{dataUrl}{data}" parsed_url = urlparse(redirect_url) query_params = parse_qs(parsed_url.query) # Remove the 'dataUrl' parameter query_params.pop("dataUrl", None) # Reconstruct the URL without 'dataUrl' new_query_string = urlencode(query_params, doseq=True) redirect_url = f"{parsed_url.path}?{new_query_string}" else: redirect_url = request.GET.urlencode() return render( request, "okr/key_result/key_result_form.html", {"k_form": form, "redirect_url": redirect_url, "dataUrl": dataUrl}, ) @login_required @hx_request_required @permission_required("pms.add_keyresult") def kr_create_or_update(request, kr_id=None): """ View function for creating or updating a Key Result. Parameters: - request: HttpRequest object. - kr_id: ID of the Key Result to update (optional). Returns: Renders a form to create or update a Key Result. """ form = KRForm() kr = False key_result = False if kr_id is not None: key_result = KeyResult.objects.filter(id=kr_id).first() form = KRForm(instance=key_result) if request.method == "POST": if key_result: form = KRForm(request.POST, instance=key_result) if form.is_valid(): instance = form.save() messages.success( request, _("Key result %(key_result)s updated successfully") % {"key_result": instance}, ) return HttpResponse("") else: form = KRForm(request.POST) if form.is_valid(): instance = form.save() messages.success( request, _("Key result %(key_result)s created successfully") % {"key_result": instance}, ) return HttpResponse("") return render(request, "okr/key_result/real_kr_form.html", {"form": form}) @login_required @hx_request_required def add_assignees(request, obj_id): """ this function is used to add assigneesto the objective args: obj_id(int) : pimarykey of Objective return: redirect to add assignees form """ objective = Objective.objects.get(id=obj_id) form = AddAssigneesForm(instance=objective) if request.method == "POST": form = AddAssigneesForm(request.POST, instance=objective) if form.is_valid(): objective = form.save(commit=False) assignees = form.cleaned_data["assignees"] start_date = form.cleaned_data["start_date"] for emp in assignees: objective.assignees.add(emp) if not EmployeeObjective.objects.filter( employee_id=emp, objective_id=objective ).exists(): emp_obj = EmployeeObjective( employee_id=emp, objective_id=objective, start_date=start_date ) emp_obj.save() # assiging default key result default_krs = objective.key_result_id.all() if default_krs: for key_result in default_krs: if not EmployeeKeyResult.objects.filter( employee_objective_id=emp_obj, key_result_id=key_result ).exists(): emp_kr = EmployeeKeyResult.objects.create( employee_objective_id=emp_obj, key_result_id=key_result, progress_type=key_result.progress_type, target_value=key_result.target_value, start_date=start_date, ) notify.send( request.user.employee_get, recipient=emp.employee_user_id, verb="You got an OKR!.", verb_ar="لقد حققت هدفًا ونتيجة رئيسية!", verb_de="Du hast ein Ziel-Key-Ergebnis erreicht!", verb_es="¡Has logrado un Resultado Clave de Objetivo!", verb_fr="Vous avez atteint un Résultat Clé d'Objectif !", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": objective.id} ), ) objective.save() messages.success( request, _("Objective %(objective)s Updated") % {"objective": objective}, ) return HttpResponse("") context = { "form": form, "objective": objective, } return render(request, "okr/add_assignees.html", context) @login_required @manager_can_enter(perm="pms.delete_employeeobjective") def objective_delete(request, obj_id): """ This view takes one arguments, id and returns redirecting to a view. Args: id (int) : primarykey of the EmployeeObjective. Returns: Redirect to Objective_list_view". """ try: objective = Objective.objects.get(id=obj_id) if not objective.employee_objective.exists(): objective.delete() messages.success( request, _("Objective %(objective)s deleted") % {"objective": objective}, ) else: messages.warning( request, _("You can't delete objective %(objective)s,related entries exists") % {"objective": objective}, ) except EmployeeObjective.DoesNotExist: messages.error(request, _("Objective not found.")) return redirect(objective_list_view) @login_required @permission_required("pms.change_objective") def objective_manager_remove(request, obj_id, manager_id): """ Removes a manager from an objective. Parameters: - request: HttpRequest object. - obj_id: ID of the Objective from which to remove the manager. - manager_id: ID of the manager to be removed. Returns: HttpResponse indicating success. """ objective = get_object_or_404(Objective, id=obj_id) objective.managers.remove(manager_id) return HttpResponse("") @login_required @permission_required("pms.delete_keyresult") def key_result_remove(request, obj_id, kr_id): """ Removes a Key Result from an objective. Parameters: - request: HttpRequest object. - obj_id: ID of the Objective from which to remove the Key Result. - kr_id: ID of the Key Result to be removed. Returns: HttpResponse indicating success. """ objective = get_object_or_404(Objective, id=obj_id) objective.key_result_id.remove(kr_id) return HttpResponse("") @login_required @manager_can_enter("pms.delete_employeeobjective") def assignees_remove(request, obj_id, emp_id): """ Removes an assignee from an objective. Parameters: - request: HttpRequest object. - obj_id: ID of the Objective from which to remove the assignee. - emp_id: ID of the employee to be removed as an assignee. Returns: HttpResponse indicating success. """ objective = get_object_or_404(Objective, id=obj_id) get_object_or_404( EmployeeObjective, employee_id=emp_id, objective_id=obj_id ).delete() objective.assignees.remove(emp_id) return HttpResponse() def objective_filter_pagination(request, objective_own): """ This view takes two arguments, all_objective,own_objecitve and returns some objects. Args: all_objective (queryset) : Queryset of objectives own_objective (queryset) : Queryset of objectives Returns: All the filtered and paginated object will return. """ previous_data = request.GET.urlencode() initial_data = {"archive": False} # set initial value of archive filter to False field = request.GET.get("field") objective_filter_own = ObjectiveFilter( request.GET or initial_data, queryset=objective_own ).qs.order_by("-id") employee = request.user.employee_get manager = False objectives = ( Objective.objects.filter(Q(managers=employee) | Q(assignees=employee)) ).distinct() if request.user.has_perm("pms.view_objective"): objectives = Objective.objects.all() manager = True elif Objective.objects.filter(managers=employee).exists(): manager = True objectives = ActualObjectiveFilter( request.GET or initial_data, queryset=objectives ).qs objectives = Paginator(objectives, get_pagination()) objective_paginator_own = Paginator(objective_filter_own, get_pagination()) own_page = request.GET.get("page") all_page = request.GET.get("all_page") objectives_own = objective_paginator_own.get_page(own_page) objectives = objectives.get_page(all_page) now = datetime.datetime.now() data_dict = parse_qs(previous_data) get_key_instances(EmployeeObjective, data_dict) context = { "manager": manager, "superuser": "true", "objectives": objectives, "own_objectives": objectives_own, "objective_filer_form": ActualObjectiveFilter().form, "pg": previous_data, "current_date": now, "filter_dict": data_dict, "gp_fields": ObjectiveReGroup.fields, "field": field, } return context @login_required @hx_request_required def objective_list_search(request): """ This view is used to to search objective, returns searched and filtered objects. Returns: All the filtered and searched object will based on userlevel. """ search_val = request.GET.get("search") if search_val is None: search_val = "" employee = request.user.employee_get objective_own = EmployeeObjective.objects.filter(employee_id=employee) context = objective_filter_pagination(request, objective_own) template = "okr/okr_list.html" if request.GET.get("field") != "" and request.GET.get("field") is not None: template = "okr/group_by.html" return render(request, template, context) @login_required # @hx_request_required def objective_dashboard_view(request): """ This view is used to to search objective, returns searched and filtered objects. Returns: All the filtered and searched object will based on userlevel. """ emp_objectives = EmployeeObjectiveFilter(request.GET).qs return render( request, "okr/emp_objective/emp_objective_dashboard_view.html", {"emp_objectives": emp_objectives}, ) def objective_history(emp_obj_id): """ This view is used to get history of EmployeeObjective, return objects. Args: id (int): Primarykey of EmployeeObjective. Returns: All the history of EmployeeObjective. """ def pair_history(iterable): """this function return two history pair""" a, b = tee(iterable) next(b, None) return zip(a, b) changed_key_results = [] def key_result_history(key_result): """key result history""" key_result_iterator = ( key_result.history.all().order_by("history_date").iterator() ) for record_pair in pair_history(key_result_iterator): old_record, new_record = record_pair delta = new_record.diff_against(old_record) history_user_id = delta.new_record.history_user history_change_date = delta.new_record.history_date employee = Employee.objects.filter(employee_user_id=history_user_id).first() key_result = delta.new_record.key_result_id changed_key_results.append( { "delta": delta, "changed_user": employee, "changed_date": history_change_date, "k_r": key_result, } ) obj_objective = EmployeeObjective.objects.get(id=emp_obj_id) all_key_results = EmployeeKeyResult.objects.filter( employee_objective_id=obj_objective ) for key_result in all_key_results: # loop each key result and generate it's history key_result_history(key_result) changed_key_results.reverse() return changed_key_results @login_required def objective_detailed_view(request, obj_id, **kwargs): """ this function is used to update the key result of objectives args: obj_id(int) : pimarykey of EmployeeObjective return: objects to objective_detailed_view """ objective = Objective.objects.get(id=obj_id) emp_objectives = EmployeeObjective.objects.filter( objective_id=objective, archive=False ) previous_data = request.GET.urlencode() data_dict = parse_qs(previous_data) now = datetime.datetime.now() context = { "emp_objectives": emp_objectives, "pd": previous_data, "filter_dict": data_dict, "objective": objective, "key_result_form": KeyResultForm, "objective_key_result_status": EmployeeKeyResult.STATUS_CHOICES, "comment_form": ObjectiveCommentForm, "current_date": now, "emp_obj_form": EmployeeObjectiveFilter(), } return render(request, "okr/okr_detailed_view.html", context) @login_required @hx_request_required def objective_detailed_view_activity(request, id): """ This view is used to show objective activity template ,using htmx Args: id (int): Primary key of EmployeeObjective. Returns: it will return history,comment object to objective_detailed_view_activity. """ objective = EmployeeObjective.objects.get(id=id) if ( request.user.employee_get == objective.employee_id or request.user.employee_get in objective.objective_id.managers.all() or request.user.has_perm("pms.view_comment") ): key_result_history = objective_history(id) history = objective.tracking() comments = Comment.objects.filter(employee_objective_id=objective) activity_list = [] for hist in history: hist["date"] = hist["pair"][0].history_date activity_list.append(hist) for com in comments: comment = { "type": "comment", "comment": com, "date": com.created_at, } activity_list.append(comment) for key in key_result_history: key_result = { "type": "key_result", "key_result": key, "date": key["changed_date"], } activity_list.append(key_result) activity_list = sorted(activity_list, key=lambda x: x["date"], reverse=True) context = { "objective": objective, "historys": history, "comments": comments, "activity_list": activity_list, } return render(request, "okr/objective_detailed_view_activity.html", context) else: messages.info(request, _("You dont have permission.")) return HttpResponse("") @login_required @hx_request_required def objective_detailed_view_comment(request, id): """ This view is used to create comment object for objective activity, using htmx Args: id (int): Primary key of EmployeeObjective. Returns: it will redirect to objective_detailed_view_activity. """ comment_form = ObjectiveCommentForm(request.POST) if comment_form.is_valid(): objective = EmployeeObjective.objects.get(id=id) form = comment_form.save(commit=False) form.employee_id = request.user.employee_get form.employee_objective_id = objective form.save() return redirect(objective_detailed_view_activity, id) return redirect(objective_detailed_view_activity, id) @login_required @hx_request_required def emp_objective_search(request, obj_id): """ This view is used to to search employee objective,returns searched and filtered objects. Returns: All the filtered and searched object will based on userlevel. """ objective = Objective.objects.get(id=obj_id) emp_objectives = objective.employee_objective.all() search_val = request.GET.get("search") if search_val is None: search_val = "" emp_objectives = EmployeeObjectiveFilter(request.GET, emp_objectives).qs if not request.GET.get("archive") == "true": emp_objectives = emp_objectives.filter(archive=False) previous_data = request.GET.urlencode() emp_objectives = Paginator(emp_objectives, get_pagination()) page = request.GET.get("page") emp_objectives = emp_objectives.get_page(page) data_dict = parse_qs(previous_data) get_key_instances(EmployeeObjective, data_dict) context = { "emp_objectives": emp_objectives, "filter_dict": data_dict, "pg": previous_data, "objective": objective, } template = "okr/emp_objective/emp_objective_list.html" if request.GET.get("field") != "" and request.GET.get("field") is not None: template = "okr/group_by.html" return render(request, template, context) @login_required @hx_request_required def kr_table_view(request, emp_objective_id): """ Renders a table view of Key Results associated with an employee objective. Parameters: - request: HttpRequest object. - emp_objective_id: ID of the EmployeeObjective to display Key Results for. Returns: Renders the 'okr/kr_list.html' template with Key Results associated with the specified EmployeeObjective. """ emp_objective = EmployeeObjective.objects.get(id=emp_objective_id) krs = emp_objective.employee_key_result.all() krs = Paginator(krs, get_pagination()) krs_page = request.GET.get("krs_page") krs = krs.get_page(krs_page) previous_data = request.GET.urlencode() context = { "krs": krs, "key_result_status": EmployeeKeyResult.STATUS_CHOICES, "emp_objective": emp_objective, "pd": previous_data, "today": datetime.datetime.today().date(), } template = "okr/kr_list.html" return render(request, template, context) @login_required @hx_request_required def objective_detailed_view_objective_status(request, id): """ This view is used to update status of objective in objective detailed view, redirect to objective_detailed_view_activity. using htmx Args: obj_id (int): Primary key of EmployeeObjective. Returns: All the filtered and searched object will based on userlevel. """ objective = EmployeeObjective.objects.get(id=id) status = request.POST.get("objective_status") objective.status = status objective.save() messages.info( request, _("Objective %(objective)s status updated") % {"objective": objective.objective}, ) return redirect(objective_detailed_view_activity, id) @login_required @hx_request_required def objective_detailed_view_key_result_status(request, obj_id, kr_id): """ This view is used to update status of key result in objective detailed view, redirect to objective_detailed_view_activity. using htmx Args: obj_id (int): Primarykey of EmployeeObjective. kr_id (int): Primarykey of EmployeeKeyResult. Returns: All the filtered and searched object will based on userlevel. """ status = request.POST.get("key_result_status") employee_key_result = EmployeeKeyResult.objects.get(id=kr_id) current_value = employee_key_result.current_value target_value = employee_key_result.target_value if current_value >= target_value: employee_key_result.status = "Closed" else: employee_key_result.status = status employee_key_result.save() messages.info(request, _("Status has been updated")) # return redirect(objective_detailed_view_activity, id=obj_id) response = redirect(objective_detailed_view_activity, id=obj_id) return HttpResponse( response.content.decode("utf-8") + "" ) @login_required @hx_request_required def objective_detailed_view_current_value(request, kr_id): """ This view is used to update current value of key result, return redirect to view . using htmx Args: kr_id (int): Primarykey of EmployeeKeyresult. Returns: All the history of EmployeeObjective. """ if request.method == "POST": current_value = request.POST.get("current_value") employee_key_result = EmployeeKeyResult.objects.get(id=kr_id) target_value = employee_key_result.target_value objective_id = employee_key_result.employee_objective_id.id if int(current_value) < target_value: employee_key_result.current_value = current_value employee_key_result.save() messages.info( request, _("Current value of %(employee_key_result)s updated") % {"employee_key_result": employee_key_result}, ) return redirect(objective_detailed_view_activity, objective_id) elif int(current_value) == target_value: employee_key_result.current_value = current_value employee_key_result.status = "Closed" employee_key_result.save() messages.info( request, _("Current value of %(employee_key_result)s updated") % {"employee_key_result": employee_key_result}, ) # return redirect(objective_detailed_view_activity, objective_id) response = redirect(objective_detailed_view_activity, objective_id) return HttpResponse( response.content.decode("utf-8") + "" ) elif int(current_value) > target_value: messages.warning(request, _("Current value is greater than target value")) return redirect(objective_detailed_view_activity, objective_id) messages.error(request, _("Error occurred during current value updation")) return redirect(objective_detailed_view_activity, objective_id) @login_required def objective_archive(request, id): """ this function is used to archive the objective args: id(int) : pimarykey of EmployeeObjective return: redirect to objective_list_view """ objective = Objective.objects.get(id=id) if objective.archive: objective.archive = False objective.save() messages.info(request, _("Objective un-archived successfully!.")) elif not objective.archive: objective.archive = True objective.save() messages.info(request, _("Objective archived successfully!.")) return redirect(f"/pms/objective-list-view?{request.environ['QUERY_STRING']}") @login_required @hx_request_required @pms_owner_and_manager_can_enter(perm="pms.view_employeeobjective") def view_employee_objective(request, emp_obj_id): """ This function is used to render individual view of the employee objective args: emp_obj_id(int) : pimarykey of EmployeeObjective return: redirect to individual view of employee objective """ emp_objective = EmployeeObjective.objects.get(id=emp_obj_id) context = { "instance": emp_objective, "objective_key_result_status": EmployeeObjective.STATUS_CHOICES, } return render(request, "okr/emp_obj_single.html", context) @login_required @hx_request_required @manager_can_enter(perm="pms.add_employeeobjective") def create_employee_objective(request): """ This function is used to create the employee objective args: emp_obj_id(int) : pimarykey of EmployeeObjective return: redirect to form of employee objective """ data = request.GET.copy() # Convert QueryDict to a regular dictionary cleaned_data = { key: value if len(value) > 1 else value[0] for key, value in data.lists() } form = EmployeeObjectiveCreateForm(initial=cleaned_data) if request.method == "POST": form = EmployeeObjectiveCreateForm(request.POST) if form.is_valid(): # get key result on form krs = list(form.cleaned_data["key_result_id"]) emp_obj = form.save(commit=False) obj = emp_obj.objective_id # Add this employee as assignee obj.assignees.add(emp_obj.employee_id) krs.extend([key_result for key_result in obj.key_result_id.all()]) set_krs = set(krs) emp_obj.save() # Add all key results for kr in set_krs: emp_obj.key_result_id.add(kr) if not EmployeeKeyResult.objects.filter( employee_objective_id=emp_obj, key_result_id=kr ).exists(): emp_kr = EmployeeKeyResult.objects.create( employee_objective_id=emp_obj, key_result_id=kr, progress_type=kr.progress_type, target_value=kr.target_value, start_date=emp_obj.start_date, ) messages.success(request, _("Employee objective created successfully")) return HttpResponse("") context = {"form": form, "k_form": KRForm(), "emp_obj": True} return render( request, "okr/emp_objective/emp_objective_create_form.html", context=context ) @login_required @hx_request_required def update_employee_objective(request, emp_obj_id): """ This function is used to update the employee objective args: emp_obj_id(int) : pimarykey of EmployeeObjective return: redirect to form of employee objective """ emp_objective = EmployeeObjective.objects.get(id=emp_obj_id) if ( request.user.employee_get == emp_objective.employee_id or request.user.employee_get in emp_objective.objective_id.managers.all() or request.user.has_perm("pms.change_employeeobjective") ): form = EmployeeObjectiveForm(instance=emp_objective) if request.method == "POST": form = EmployeeObjectiveForm(request.POST, instance=emp_objective) if form.is_valid(): emp_obj = form.save(commit=False) emp_obj.save() messages.success(request, _("Employee objective Updated successfully")) return HttpResponse("") context = {"form": form, "k_form": KRForm()} return render(request, "okr/emp_objective_form.html", context=context) else: messages.info(request, _("You don't have permission.")) return HttpResponse("") @login_required @manager_can_enter(perm="pms.delete_employeeobjective") def archive_employee_objective(request, emp_obj_id): """ This function is used to archive or unarchive the employee objective args: emp_obj_id(int) : pimarykey of EmployeeObjective return: redirect to detailed of employee objective """ emp_objective = EmployeeObjective.objects.get(id=emp_obj_id) obj_id = emp_objective.objective_id.id if emp_objective.archive: emp_objective.archive = False emp_objective.save() messages.success(request, _("Objective un-archived successfully!.")) elif not emp_objective.archive: emp_objective.archive = True emp_objective.save() messages.success(request, _("Objective archived successfully!.")) return HttpResponseRedirect(request.META.get("HTTP_REFERER", "/")) @login_required @manager_can_enter(perm="pms.delete_employeeobjective") def delete_employee_objective(request, emp_obj_id): """ This function is used to delete the employee objective args: emp_obj_id(int) : pimarykey of EmployeeObjective return: redirect to detailed of employee objective """ emp_objective = EmployeeObjective.objects.get(id=emp_obj_id) single_view = request.GET.get("single_view") if emp_objective.employee_key_result.exists(): messages.warning( request, _("You can't delete this objective,related entries exists") ) else: employee = emp_objective.employee_id objective = emp_objective.objective_id emp_objective.delete() objective.assignees.remove(employee) messages.success(request, _("Objective deleted successfully!.")) if not single_view: return HttpResponseRedirect(request.META.get("HTTP_REFERER", "/")) else: return HttpResponse("") @login_required @hx_request_required def change_employee_objective_status(request): """ This function is used to change status of the employee objective args: hx-request return: a message """ emp_obj = request.GET.get("empObjId") emp_objective = EmployeeObjective.objects.filter(id=emp_obj).first() status = request.GET.get("status") if ( request.user.has_perm("pms.change_employeeobjective") or emp_objective.employee_id == request.user.employee_get or request.user.employee_get in emp_objective.objective_id.managers.all() ): if emp_objective.status != status: emp_objective.status = status emp_objective.save() messages.success( request, _( f"The status of the objective '{emp_objective.objective_id}'\ has been changed to {emp_objective.status}." ), ) notify.send( request.user.employee_get, recipient=emp_objective.employee_id.employee_user_id, verb=f"The status of the objective '{emp_objective.objective_id}'\ has been changed to {emp_objective.status}.", verb_ar=f"تم تغيير حالة الهدف '{emp_objective.objective_id}'\ إلى {emp_objective.status}.", verb_de=f"Der Status des Ziels '{emp_objective.objective_id}'\ wurde zu {emp_objective.status} geändert.", verb_es=f"El estado del objetivo '{emp_objective.objective_id}'\ ha sido cambiado a {emp_objective.status}.", verb_fr=f"Le statut de l'objectif '{emp_objective.objective_id}'\ a été changé à {emp_objective.status}.", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": emp_objective.objective_id.id}, ), ) else: messages.info( request, _("The status of the objective is the same as selected.") ) else: messages.info(request, _("You dont have permission.")) return HttpResponse("") @login_required @manager_can_enter(perm="pms.add_employeekeyresult") def key_result_view(request): """ This view is used to view key result, Args: request: Request object. Returns: if errorr occur it will return errorr message. """ krs = KeyResultFilter(request.GET).qs krs = group_by_queryset( krs, "employee_objective_id__employee_id", request.GET.get("page"), "page" ) context = { "krs": krs, "key_result_status": EmployeeKeyResult.STATUS_CHOICES, } return render(request, "okr/key_result/kr_dashboard_view.html", context=context) @login_required @manager_can_enter(perm="pms.add_employeekeyresult") def key_result_creation(request, obj_id, obj_type): """ This view is used to create key result, Args: id (int): Primarykey of EmployeeObjective. obj_type (str): type of objecitve Returns: if errorr occur it will return errorr message . """ employee = request.user.employee_get if obj_type == "individual": objective = EmployeeObjective.objects.filter(id=int(obj_id)) start_date = None end_date = None for obj in objective: start_date = obj.start_date end_date = obj.end_date key_result_form = KeyResultForm( employee=employee, initial={"start_date": start_date, "end_date": end_date} ) else: objective_ids = json.loads(obj_id) for objective_id in objective_ids: objective = EmployeeObjective.objects.filter(id=objective_id).first() start_date = objective.start_date end_date = objective.end_date key_result_form = KeyResultForm( employee=employee, initial={"start_date": start_date, "end_date": end_date} ) context = { "key_result_form": key_result_form, "objective_id": obj_id, "objective_type": obj_type, } if obj_type == "multiple": # for job position or department the context should have all the related object ids value = context.pop("objective_id") context["objective_ids"] = value if request.method == "POST": if obj_type == "individual": employee_objective_id = EmployeeObjective.objects.get(id=int(obj_id)) form_key_result = KeyResultForm( request.POST, initial={"employee_objective_id": employee_objective_id} ) if form_key_result.is_valid(): form = form_key_result.save(commit=False) form.start_value = form.current_value form.employee_objective_id = employee_objective_id form.save() messages.success(request, _("Key result created")) return redirect(objective_detailed_view, obj_id) else: context["key_result_form"] = form_key_result elif obj_type == "multiple": # If the objective is for job postion or department # The id will be list of objective id objective_ids = json.loads(obj_id) for objective_id in objective_ids: objective = EmployeeObjective.objects.filter(id=objective_id).first() form_key_result = KeyResultForm( request.POST, initial={"employee_objective_id": objective} ) if form_key_result.is_valid(): form = form_key_result.save(commit=False) form.start_value = form.current_value form.employee_id = objective.employee_id form.employee_objective_id = objective form.save() else: context["key_result_form"] = form_key_result return render( request, "okr/key_result/key_result_creation.html", context ) messages.success(request, _("Key results created")) return redirect(objective_list_view) return render(request, "okr/key_result/key_result_creation.html", context) @login_required @hx_request_required @manager_can_enter(perm="pms.add_employeekeyresult") def key_result_creation_htmx(request, id): """ This view is used to create key result in objective detailed view, using htmx Args: id (int): Primarykey of EmployeeObjective. obj_type (str): type of objecitve Returns: if errorr occure it will return errorr message . """ object = EmployeeObjective.objects.filter(id=id) start_date = None end_date = None for obj in object: start_date = obj.start_date end_date = obj.end_date key_result_form = KeyResultForm( initial={"start_date": start_date, "end_date": end_date} ) context = {"key_result_form": key_result_form, "objecitve_id": id} objective = EmployeeObjective.objects.get(id=id) if request.method == "POST": initial_data = {"employee_objective_id": objective} form_key_result = KeyResultForm(request.POST, initial=initial_data) if form_key_result.is_valid(): form = form_key_result.save(commit=False) form.start_value = form.current_value form.employee_objective_id = objective form.save() messages.success(request, _("Key result created")) response = render( request, "okr/key_result/key_result_creation_htmx.html", context ) return HttpResponse( response.content.decode("utf-8") + "" ) context["key_result_form"] = form_key_result return render(request, "okr/key_result/key_result_creation_htmx.html", context) @login_required @hx_request_required @manager_can_enter(perm="pms.update_employeekeyresult") def key_result_update(request, id): """ This view is used to update key result, using htmx Args: id (int): Primarykey of EmployeeKeyResult. Returns: success or errors message. """ key_result = EmployeeKeyResult.objects.get(id=id) key_result_form = KeyResultForm(instance=key_result) context = {"key_result_form": key_result_form, "key_result_id": key_result.id} if request.method == "POST": key_result_form = KeyResultForm(request.POST, instance=key_result) key_result_form.initial["employee_objective_id"] = ( key_result.employee_objective_id ) # adding intial objective value to the form if key_result_form.is_valid(): key_result_form.save() messages.info(request, _("Key result updated")) response = render(request, "okr/key_result/key_result_update.html", context) return HttpResponse( response.content.decode("utf-8") + "" ) else: context["key_result_form"] = key_result_form return render(request, "okr/key_result/key_result_update.html", context) # feedback section def send_feedback_notifications(request, form): # Send notification to employee if form.employee_id: employee = form.employee_id notify.send( request.user.employee_get, recipient=employee.employee_user_id, verb="You have received feedback!", verb_ar="لقد تلقيت ملاحظات!", verb_de="Sie haben Feedback erhalten!", verb_es="¡Has recibido retroalimentación!", verb_fr="Vous avez reçu des commentaires !", redirect=reverse("feedback-detailed-view", kwargs={"id": form.id}), icon="chatbox-ellipses", ) # Send notification to manager if form.manager_id: manager = form.manager_id notify.send( request.user.employee_get, recipient=manager.employee_user_id, verb="You have been assigned as a manager in a feedback!", verb_ar="لقد تم تعيينك كمدير في ملاحظة!", verb_de="Sie wurden als Manager in einem Feedback zugewiesen!", verb_es="¡Has sido asignado como manager en un feedback!", verb_fr="Vous avez été désigné comme manager dans un commentaire !", redirect=reverse("feedback-detailed-view", kwargs={"id": form.id}), icon="chatbox-ellipses", ) # Send notification to subordinates if form.subordinate_id: subordinates = form.subordinate_id.all() for subordinate in subordinates: notify.send( request.user.employee_get, recipient=subordinate.employee_user_id, verb="You have been assigned as a subordinate in a feedback!", verb_ar="لقد تم تعيينك كمرؤوس في ملاحظة!", verb_de="Sie wurden als Untergebener in einem Feedback zugewiesen!", verb_es="¡Has sido asignado como subordinado en un feedback!", verb_fr="Vous avez été désigné comme subordonné dans un commentaire !", redirect=reverse("feedback-detailed-view", kwargs={"id": form.id}), icon="chatbox-ellipses", ) # Send notification to colleagues if form.colleague_id: colleagues = form.colleague_id.all() for colleague in colleagues: notify.send( request.user.employee_get, recipient=colleague.employee_user_id, verb="You have been assigned as a colleague in a feedback!", verb_ar="لقد تم تعيينك كزميل في ملاحظة!", verb_de="Sie wurden als Kollege in einem Feedback zugewiesen!", verb_es="¡Has sido asignado como colega en un feedback!", verb_fr="Vous avez été désigné comme collègue dans un commentaire !", redirect=reverse("feedback-detailed-view", kwargs={"id": form.id}), icon="chatbox-ellipses", ) @login_required @manager_can_enter(perm="pms.add_feedback") def feedback_creation(request): """ This view is used to create feedback object. Returns: it will return feedback creation html. """ form = FeedbackForm() context = { "feedback_form": form, } if request.method == "POST": form = FeedbackForm(request.POST) if form.is_valid(): employees = form.data.getlist("subordinate_id") if key_result_ids := request.POST.getlist("employee_key_results_id"): for key_result_id in key_result_ids: key_result = EmployeeKeyResult.objects.filter( id=key_result_id ).first() feedback_form = form.save() feedback_form.employee_key_results_id.add(key_result) instance = form.save() instance.subordinate_id.set(employees) messages.success(request, _("Feedback created successfully.")) send_feedback_notifications(request, form=instance) return redirect(feedback_list_view) else: context["feedback_form"] = form return render(request, "feedback/feedback_creation.html", context) # @login_required # @manager_can_enter(perm="pms.add_feedback") # def feedback_creation_ajax(request): # """ # This view is used to create feedback object. # Returns: # it will return feedback creation html. # """ # # this ajax request is used to get the Key result and manager of the choosen employee # is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" # if is_ajax: # if request.method == "POST": # employee_id = request.POST.get("employee_id") # key_results = EmployeeKeyResult.objects.filter( # employee_objective_id__employee_id=employee_id # ).values() # employee_work_info = EmployeeWorkInformation.objects.filter( # employee_id__id=employee_id # ).first() # reporting_manager_id = employee_work_info.reporting_manager_id # if reporting_manager_id: # reporting_manager = { # "id": reporting_manager_id.id or None, # "employee_first_name": reporting_manager_id.employee_first_name # or None, # "employee_last_name": reporting_manager_id.employee_last_name # or None, # } # else: # reporting_manager = None # return JsonResponse( # { # "key_results": list(key_results), # "reporting_manager": reporting_manager, # } # ) # return JsonResponse({"status": "Invalid request"}, status=400) @login_required @hx_request_required @manager_can_enter(perm="pms.change_feedback") def feedback_update(request, id): """ This view is used to update the feedback. Args: id(int) : primarykey of the feedback. Returns: it will redirect to feedback_detailed_view. """ feedback = Feedback.objects.get(id=id) form = FeedbackForm(instance=feedback) feedback_started = Answer.objects.filter(feedback_id=feedback) context = {"feedback_form": form} if feedback_started: messages.error(request, _("Ongoing feedback is not editable!.")) response = render(request, "feedback/feedback_update.html", context) return HttpResponse( response.content.decode("utf-8") + "" ) if request.method == "POST": form = FeedbackForm(request.POST, instance=feedback) if form.is_valid(): employees = form.data.getlist("subordinate_id") if key_result_ids := request.POST.getlist("employee_key_results_id"): for key_result_id in key_result_ids: key_result = EmployeeKeyResult.objects.filter( id=key_result_id ).first() feedback_form = form.save() feedback_form.employee_key_results_id.add(key_result) instance = form.save() instance.subordinate_id.set(employees) form = form.save() messages.info(request, _("Feedback updated successfully!.")) send_feedback_notifications(request, form) response = render(request, "feedback/feedback_update.html", context) return HttpResponse( response.content.decode("utf-8") + "" ) else: context["feedback_form"] = form return render(request, "feedback/feedback_update.html", context) @login_required def filter_pagination_feedback( request, self_feedback, requested_feedback, all_feedback, anonymous_feedback ): """ This view is used to filter or search the feedback object , Args: self_feedback (queryset): self feedback filtered queryset. requested_feedback (queryset): requested feedback filtered queryset. all_feedback (queryset): all feedback filtered queryset. Returns: it will return the filtered and searched object. """ previous_data = request.GET.urlencode() initial_data = {"archive": False} # set initial value of archive filter to False feedback_filter_own = FeedbackFilter( request.GET or initial_data, queryset=self_feedback ) feedback_filter_requested = FeedbackFilter( request.GET or initial_data, queryset=requested_feedback ) feedback_filter_all = FeedbackFilter( request.GET or initial_data, queryset=all_feedback ) anonymous_feedback = AnonymousFeedbackFilter( request.GET, queryset=anonymous_feedback ).qs feedback_paginator_own = Paginator(feedback_filter_own.qs, get_pagination()) feedback_paginator_requested = Paginator( feedback_filter_requested.qs, get_pagination() ) feedback_paginator_all = Paginator(feedback_filter_all.qs, get_pagination()) page_number = request.GET.get("page") feedbacks_own = feedback_paginator_own.get_page(page_number) feedbacks_requested = feedback_paginator_requested.get_page(page_number) feedbacks_all = feedback_paginator_all.get_page(page_number) now = datetime.datetime.today().date() data_dict = parse_qs(previous_data) get_key_instances(Feedback, data_dict) context = { "superuser": "true", "self_feedback": feedbacks_own, "requested_feedback": feedbacks_requested, "anonymous_feedback": anonymous_feedback, "all_feedbacks": feedbacks_all, "feedback_filter_form": feedback_filter_own.form, "pg": previous_data, "current_date": now, "filter_dict": data_dict, } return context @login_required @hx_request_required def feedback_list_search(request): """ This view is used to filter or search the feedback object , Args: Returns: it will return the filtered and searched object. """ feedback = request.GET.get("search") # if the search is none the filter will works if feedback is None: feedback = "" employee_id = Employee.objects.get(employee_user_id=request.user) self_feedback = Feedback.objects.filter(employee_id=employee_id).filter( review_cycle__icontains=feedback ) requested_feedback_ids = [] requested_feedback_ids.extend( [i.id for i in Feedback.objects.filter(manager_id=employee_id)] ) requested_feedback_ids.extend( [i.id for i in Feedback.objects.filter(colleague_id=employee_id)] ) requested_feedback_ids.extend( [i.id for i in Feedback.objects.filter(subordinate_id=employee_id)] ) requested_feedback = Feedback.objects.filter( pk__in=requested_feedback_ids, review_cycle__icontains=feedback, start_date__lte=datetime.date.today(), end_date__gte=datetime.date.today(), ) all_feedback = Feedback.objects.none() if request.user.has_perm("pms.view_feedback"): all_feedback = Feedback.objects.all().filter(review_cycle__icontains=feedback) else: # feedbacks to review if employee is a manager all_feedback = Feedback.objects.filter(manager_id=employee_id).filter( review_cycle__icontains=feedback ) # Anonymous feedbacks anonymous_feedback = ( AnonymousFeedback.objects.filter( anonymous_feedback_id=request.user.id, archive=False ) if not request.user.has_perm("pms.view_feedback") else AnonymousFeedback.objects.filter(archive=False) ) related_anonymous_feedbacks = get_anonymous_feedbacks(employee_id) anonymous_feedback = (related_anonymous_feedbacks | anonymous_feedback).distinct() context = filter_pagination_feedback( request, self_feedback, requested_feedback, all_feedback, anonymous_feedback ) return render(request, "feedback/feedback_list.html", context) @login_required def feedback_list_view(request): """ This view is used to filter or search the feedback object , Args: Returns: it will return the filtered and searched object. """ user = request.user employee = user.employee_get # filter own feedbacks (self feedbacks) feedback_own = Feedback.objects.filter( employee_id=employee, archive=False, ) # feedbacks to answer feedback_requested = Feedback.objects.filter( Q(manager_id=employee) | Q(colleague_id=employee) | Q(subordinate_id=employee), start_date__lte=datetime.date.today(), end_date__gte=datetime.date.today(), ).distinct() if user.has_perm("pms.view_feedback"): feedback_all = Feedback.objects.filter(archive=False) else: # feedbacks to review if employee is a manager feedback_all = Feedback.objects.filter(manager_id=employee, archive=False) # Anonymous feedbacks anonymous_feedback = ( AnonymousFeedback.objects.filter( anonymous_feedback_id=request.user.id, archive=False ) if not request.user.has_perm("pms.view_feedback") else AnonymousFeedback.objects.filter(archive=False) ) related_anonymous_feedbacks = get_anonymous_feedbacks(employee) anonymous_feedback = ( related_anonymous_feedbacks.filter(archive=False) | anonymous_feedback ).distinct() context = filter_pagination_feedback( request, feedback_own, feedback_requested, feedback_all, anonymous_feedback ) # checking condition for empty page if Feedback.objects.all().exists(): template = "feedback/feedback_list_view.html" else: template = "feedback/feedback_empty.html" return render(request, template, context) @login_required def feedback_detailed_view(request, id, **kwargs): """ This view is used to for detailed view of feedback, Args: id(int) : primarykey of the feedback Returns: it will return the feedback object to feedback_detailed_view template . """ feedback = Feedback.objects.get(id=id) is_have_perm = check_permission_feedback_detailed_view( request, feedback, "pms.view_feedback" ) if is_have_perm: feedback_started = Answer.objects.filter(feedback_id=id) employees = feedback.requested_employees() yes = [] no = [] for employee in employees: if Answer.objects.filter( feedback_id=feedback, employee_id=employee ).exists(): yes.append(employee) else: no.append(employee) employee_statics = {"yes": yes, "no": no} context = { "feedback": feedback, "feedback_started": feedback_started, "feedback_status": Feedback.STATUS_CHOICES, "employee_statics": employee_statics, "today": datetime.datetime.today().date(), } return render(request, "feedback/feedback_detailed_view.html", context) else: messages.info(request, _("You dont have permission.")) previous_url = request.META.get("HTTP_REFERER", "/") script = f'' key = "HTTP_HX_REQUEST" if key in request.META.keys(): return render(request, "decorator_404.html") return HttpResponse(script) @login_required def feedback_detailed_view_answer(request, id, emp_id): """ This view is used show answer , Args: id(int) : primarykey of the feedback. emp_id(int) : primarykey of the Employee. Returns: it will return the answers . """ employee = Employee.objects.filter(id=emp_id).first() feedback = Feedback.objects.filter(id=id).first() is_have_perm = check_permission_feedback_detailed_view( request, feedback, "pms.view_feedback" ) kr_feedbacks = KeyResultFeedback.objects.filter( feedback_id=feedback, employee_id=employee ) if is_have_perm: answers = Answer.objects.filter(employee_id=employee, feedback_id=feedback) context = { "answers": answers, "kr_feedbacks": kr_feedbacks, } return render(request, "feedback/feedback_detailed_view_answer.html", context) else: messages.info(request, _("You dont have permission.")) previous_url = request.META.get("HTTP_REFERER", "/") script = f'' key = "HTTP_HX_REQUEST" if key in request.META.keys(): return render(request, "decorator_404.html") return HttpResponse(script) @login_required def feedback_answer_get(request, id, **kwargs): """ This view is used to render the feedback questions , Args: id(int) : primarykey of the feedback. Returns: it will redirect to feedaback_answer.html . """ feedback = Feedback.objects.get(id=id) # check if the feedback start_date is not started yet if feedback.start_date > datetime.date.today(): messages.info(request, _("Feedback not started yet")) return redirect(feedback_list_view) # check if the feedback end_date is not over if feedback.end_date and feedback.end_date < datetime.date.today(): messages.info(request, _("Feedback is due")) return redirect(feedback_list_view) user = request.user employee = Employee.objects.filter(employee_user_id=user).first() answer = Answer.objects.filter(feedback_id=feedback, employee_id=employee) question_template = feedback.question_template_id questions = question_template.question.all() options = QuestionOptions.objects.all() feedback_employees = ( [feedback.employee_id] + [feedback.manager_id] + list(feedback.colleague_id.all()) + list(feedback.subordinate_id.all()) ) if not employee in feedback_employees: messages.info(request, _("You are not allowed to answer")) return redirect(feedback_list_view) # Employee does not have an answer object for employee in feedback_employees: has_answer = Answer.objects.filter( employee_id=employee, feedback_id=feedback ).exists() if has_answer: feedback.status = "Closed" feedback.save() # Check if the feedback has already been answered if answer: messages.info(request, _("Feedback already answered")) return redirect(feedback_list_view) context = { "questions": questions, "options": options, "feedback": feedback, } return render(request, "feedback/answer/feedback_answer.html", context) @login_required def feedback_answer_post(request, id): """ This view is used to create feedback answer , Args: id(int) : primarykey of the feedback. Returns: it will redirect to feedback_list_view if the form was success full. """ user = request.user employee = Employee.objects.filter(employee_user_id=user).first() feedback = Feedback.objects.get(id=id) question_template = feedback.question_template_id questions = question_template.question.all() if request.method == "POST": for question in questions: if request.POST.get(f"answer{question.id}"): answer = request.POST.get(f"answer{question.id}") Answer.objects.get_or_create( answer={"answer": answer}, question_id=question, feedback_id=feedback, employee_id=employee, ) feedback.status = "On Track" feedback.save() for key_result in feedback.employee_key_results_id.all(): if request.POST.get(f"key_result{key_result.id}"): answer = request.POST.get(f"key_result{key_result.id}") KeyResultFeedback.objects.get_or_create( answer={"answer": answer}, key_result_id=key_result, feedback_id=feedback, employee_id=request.user.employee_get, ) messages.success( request, _("Feedback %(review_cycle)s has been answered successfully!.") % {"review_cycle": feedback.review_cycle}, ) return redirect(feedback_list_view) @login_required def feedback_answer_view(request, id, **kwargs): """ This view is used to view the feedback for employee. Args: id(int) : primarykey of the feedback. Returns: it will return feedback answer object to feedback_answer_view. """ user = request.user employee = Employee.objects.filter(employee_user_id=user).first() feedback = Feedback.objects.get(id=id) answers = Answer.objects.filter(feedback_id=feedback, employee_id=employee) key_result_feedback = KeyResultFeedback.objects.filter( feedback_id=feedback, employee_id=employee ) if not answers: messages.info(request, _("Feedback is not answered yet")) return redirect(feedback_list_view) context = { "answers": answers, "feedback_id": feedback, "key_result_feedback": key_result_feedback, } return render(request, "feedback/answer/feedback_answer_view.html", context) @login_required @manager_can_enter(perm="pms.delete_feedback") def feedback_delete(request, id): """ This view is used to delete the feedback. Args: id(int) : primarykey of the feedback. Returns: it will redirect to feedback_list_view. """ try: feedback = Feedback.objects.filter(id=id).first() answered = Answer.objects.filter(feedback_id=feedback).first() if ( feedback.status == "Closed" or feedback.status == "Not Started" and not answered ): feedback.delete() messages.success( request, _("Feedback %(review_cycle)s deleted successfully!") % {"review_cycle": feedback.review_cycle}, ) else: messages.warning( request, _("You can't delete feedback %(review_cycle)s with status %(status)s") % {"review_cycle": feedback.review_cycle, "status": feedback.status}, ) return redirect(feedback_list_view) except Feedback.DoesNotExist: messages.error(request, _("Feedback not found.")) except ProtectedError: messages.error(request, _("Related entries exists")) return redirect(feedback_list_view) @login_required @hx_request_required def feedback_detailed_view_status(request, id): """ This view is used to update status of feedback. Args: obj_id (int): Primarykey of feedback. Returns: message to the view """ status = request.POST.get("feedback_status") feedback = get_object_or_404(Feedback, id=id) answer = Answer.objects.filter(feedback_id=feedback) if status == "Not Started" and answer: messages.warning(request, _("Feedback is already started")) return HttpResponse("") feedback.status = status feedback.save() if (feedback.status) == status: messages.success( request, _("Feedback status updated to %(status)s") % {"status": _(status)} ) return HttpResponse("") messages.info( request, _("Error occurred during status update to %(status)s") % {"status": _(status)}, ) return HttpResponse("") @login_required def get_feedback_overview(request, obj_id): """ overview of feedback """ feedback = Feedback.objects.filter(id=obj_id).first() if obj_id else None if feedback and check_permission_feedback_detailed_view( request, feedback, perm="pms.view_feedback" ): question_template = feedback.question_template_id questions = question_template.question.all() feedback_answers = feedback.feedback_answer.all() kr_feedbacks = feedback.feedback_key_result.all() feedback_overview = {} for question in questions: answer_list = [] for answer in feedback_answers: if answer.question_id == question: answer_list.append( { answer.employee_id: [ answer.answer, {"type": answer.question_id.question_type}, ] } ) feedback_overview[question] = answer_list for kr_feedback in kr_feedbacks: answer_list = [] answer_list.append( {kr_feedback.employee_id: [kr_feedback.answer, {"type": "6"}]} ) feedback_overview[ f"Feedback about keyresult: {kr_feedback.key_result_id.key_result_id}" ] = answer_list return render( request, "feedback/feedback_overview.html", context={"feedback_overview": feedback_overview}, ) @login_required @manager_can_enter(perm="pms.delete_feedback") def feedback_archive(request, id): """ this function is used to archive the feedback for employee args: id(int): primarykey of feedback """ feedback = Feedback.objects.get(id=id) if feedback.archive: feedback.archive = False feedback.save() messages.info(request, _("Feedback un-archived successfully!.")) elif not feedback.archive: feedback.archive = True feedback.save() messages.info(request, _("Feedback archived successfully!.")) return redirect(feedback_list_view) @login_required def get_collegues(request): """ Get collegues and subordinates for the manager. """ try: employee_id = request.GET.get("employee_id") employee = Employee.objects.get(id=int(employee_id)) if employee_id else None if employee: employees_queryset = Employee.objects.none() reporting_manager = ( employee.employee_work_info.reporting_manager_id if employee.employee_work_info else None ) if request.GET.get("data") == "colleagues": department = employee.get_department() # employee ids to exclude from collegue list exclude_ids = [employee.id] if reporting_manager: exclude_ids.append(reporting_manager.id) # Get employees in the same department as the employee employees_queryset = Employee.objects.filter( is_active=True, employee_work_info__department_id=department ).exclude(id__in=exclude_ids) elif request.GET.get("data") == "manager": if reporting_manager: employees_queryset = Employee.objects.filter( id=reporting_manager.id ) elif request.GET.get("data") == "subordinates": employees_queryset = Employee.objects.filter( is_active=True, employee_work_info__reporting_manager_id=employee ) elif request.GET.get("data") == "keyresults": employees_queryset = EmployeeKeyResult.objects.filter( employee_objective_id__employee_id=employee ).values_list("id", "key_result_id__title") # Convert QuerySets to a list employees = [(employee.id, employee) for employee in employees_queryset] context = {"employees": employees} employee_html = render_to_string("employee/employees_select.html", context) return HttpResponse(employee_html) else: return JsonResponse({"error": "Employee not found"}, status=404) except Employee.DoesNotExist: return JsonResponse({"error": "Invalid Employee ID"}, status=400) except Exception as e: return JsonResponse({"error": str(e)}, status=500) @login_required def feedback_status(request): """this function is used to un-archive the feedback args: id(int): primarykey of feedback emp_id(int): primarykey of feedback """ is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if is_ajax: if request.method == "POST": employee_id = request.POST.get("employee_id") feedback_id = request.POST.get("feedback_id") feedback = Feedback.objects.get(id=feedback_id) employee = Employee.objects.filter(id=employee_id).first() answer = Answer.objects.filter(employee_id=employee, feedback_id=feedback) status = _("Completed") if answer else _("Not-completed") return JsonResponse({"status": status}) return JsonResponse({"status": "Invalid request"}, status=400) @login_required @manager_can_enter(perm="pms.add_question") def question_creation(request, id): """ This view is used to create question object. Args: id(int) : primarykey of the question template. Returns: it will redirect to question_template_detailed_view. """ if request.method == "POST": form = QuestionForm(request.POST) question_template = QuestionTemplate.objects.get(id=id) feedback_ongoing = Feedback.objects.filter( question_template_id=question_template ).first() if feedback_ongoing: messages.info(request, _("Question template is used in feedback.")) return redirect(question_template_detailed_view, id) if form.is_valid(): obj_question = form.save(commit=False) obj_question.template_id = question_template obj_question.save() if obj_question.question_type == "4": # checking the question type is multichoice option_a = request.POST.get("option_a") option_b = request.POST.get("option_b") option_c = request.POST.get("option_c") option_d = request.POST.get("option_d") QuestionOptions( question_id=obj_question, option_a=option_a, option_b=option_b, option_c=option_c, option_d=option_d, ).save() messages.success(request, _("Question created successfully.")) return redirect(question_template_detailed_view, id) messages.success(request, _("Question created successfully.")) return redirect(question_template_detailed_view, id) else: messages.error(request, _("Error occurred during question creation!")) return redirect(question_template_detailed_view, id) @login_required def question_view(request, id): """ This view is used to view question object. Args: id(int) : primarykey of the question template. Returns: it will redirect to question_template_detailed_view. """ question_template = QuestionTemplate.objects.get(id=id) question_formset = modelformset_factory(Question, form=QuestionForm, extra=0) questions = question_template.question.all() formset = question_formset(queryset=questions) options = [] question_types = ["text", "ratings", "boolean", "Multi-choices", "likert"] for question in questions: question_options = QuestionOptions.objects.filter(question_id=question) options.extend(question_options) context = { "question_template": question_template, "questions": questions, "question_options": options, "question_types": question_types, "formset": formset, } return render( request, "feedback/question_template/question_template_detailed_view.html", context, ) @login_required @manager_can_enter(perm="pms.change_question") def question_update(request, temp_id, q_id): """ This view is used to update question object. Args: id (int): primarykey of question temp_id (int): primarykey of question_template Returns: it will redirect to question_template_detailed_view. """ if request.method == "POST": question = Question.objects.get(id=q_id) form = QuestionForm(request.POST, instance=question) if form.is_valid(): question_type = form.cleaned_data["question_type"] if question_type == "4": # if question is Multi-choices option_a = form.cleaned_data["option_a"] option_b = form.cleaned_data["option_b"] option_c = form.cleaned_data["option_c"] option_d = form.cleaned_data["option_d"] options, created = QuestionOptions.objects.get_or_create( question_id=question ) options.option_a = option_a options.option_b = option_b options.option_c = option_c options.option_d = option_d options.save() form.save() messages.info(request, _("Question updated successfully.")) return redirect(question_template_detailed_view, temp_id) else: form.save() question_options = QuestionOptions.objects.filter(question_id=question) if question_options: question_options.delete() messages.info(request, _("Question updated successfully.")) return redirect(question_template_detailed_view, temp_id) else: # Form submission had errors messages.error( request, "\n".join( [ f"{field}: {error}" for field, errors in form.errors.items() for error in errors ] ), ) return redirect(question_template_detailed_view, temp_id) @login_required @manager_can_enter(perm="pms.delete_question") def question_delete(request, id): """ This view is used to delete question object. Args: id (int): primarykey of question Returns: it will redirect to question_template_detailed_view. """ try: # Code that may trigger the FOREIGN KEY constraint failed error question = Question.objects.filter(id=id).first() temp_id = question.template_id.id QuestionOptions.objects.filter(question_id=question).delete() question.delete() messages.success(request, _("Question deleted successfully!")) return HttpResponse("") except Question.DoesNotExist: messages.error(request, _("Question not found.")) except IntegrityError: messages.error( request, _("Failed to delete question: Question template is in use.") ) except ProtectedError: messages.error(request, _("Related entries exist.")) except Exception as e: messages.error(request, _(f"Unexpected error: {str(e)}")) return HttpResponse("") @login_required @manager_can_enter(perm="pms.add_questiontemplate") def question_template_creation(request): """ This view is used to create a question template object. Args: Returns: It will redirect to question_template_detailed_view. """ form = QuestionTemplateForm() if request.method == "POST": form = QuestionTemplateForm(request.POST) if form.is_valid(): form.save() messages.success(request, _("Question template created successfully!")) return render( request, "feedback/question_template/question_template_form.html", {"form": form}, ) @login_required @manager_can_enter(perm="pms.view_questiontemplate") def question_template_view(request): """ This view is used to view question template object. Returns: it will redirect to question_template_detailed_view. """ question_templates = QuestionTemplate.objects.all() context = {"form": QuestionTemplateForm, "question_templates": question_templates} return render( request, "feedback/question_template/question_template_view.html", context ) @login_required @hx_request_required @manager_can_enter(perm="pms.view_questiontemplate") def question_template_hx_view(request): """ This view is used to view question template object in htmx. """ question_templates = QuestionTemplate.objects.all() context = {"question_templates": question_templates} return render( request, "feedback/question_template/question_template_list.html", context ) @login_required @manager_can_enter(perm="pms.view_questiontemplate") def question_template_detailed_view(request, template_id, **kwargs): """ This view is used to view question template object. Args: id (int): primarykey of question template temp_id (int): primarykey of question_template Returns: it will redirect to question_template_detailed_view. """ question_template = QuestionTemplate.objects.filter(id=template_id).first() if not question_template: messages.error(request, _("Question template does not exist")) return redirect(question_template_view) questions = question_template.question.all().order_by("-id") question_types = ["text", "ratings", "boolean", "multi-choices", "likert"] options = QuestionOptions.objects.filter(question_id__in=questions) # passing individual form question_form_list = [QuestionForm(instance=question) for question in questions] context = { "question_template": question_template, "questions": questions, "question_options": options, "question_types": question_types, "form": QuestionForm, "form_list": question_form_list, } return render( request, "feedback/question_template/question_template_detailed_view.html", context, ) @login_required @hx_request_required @manager_can_enter(perm="pms.change_questiontemplate") def question_template_update(request, template_id): """ This view is used to update question template object. Args: id (int): primarykey of question template Returns: it will redirect to question_template_view. """ question_template = QuestionTemplate.objects.filter(id=template_id).first() form = QuestionTemplateForm(instance=question_template) context = {"form": form} if request.method == "POST": form = QuestionTemplateForm(request.POST, instance=question_template) if form.is_valid(): form.save() messages.success(request, _("Question template updated")) context["form"] = form return render( request, "feedback/question_template/question_template_form.html", context ) @login_required @manager_can_enter(perm="pms.delete_questiontemplate") def question_template_delete(request, template_id): """ This view is used to delete question template object. Args: id (int): primarykey of question template Returns: it will redirect to question_template_view. """ try: question_template = QuestionTemplate.objects.get(id=template_id) if Feedback.objects.filter(question_template_id=question_template): messages.info(request, _("This template is using in a feedback")) else: question_template.delete() messages.success( request, _("The question template is deleted successfully !.") ) except QuestionTemplate.DoesNotExist: messages.error(request, _("question template not found.")) except ProtectedError: messages.error(request, _("Related entries exists")) return redirect("question-template-hx-view") @login_required @manager_can_enter(perm="pms.view_period") def period_view(request): """ This view is used to view period objects. Returns: it will return to period_view. """ periods = Period.objects.all() context = { "periods": periods, } return render(request, "period/period_view.html", context) @login_required @manager_can_enter(perm="pms.view_period") @hx_request_required def period_hx_view(request): """ Renders a view displaying periods used for tracking Key Results' completion time. Parameters: - request: HttpRequest object. Returns: Renders the 'period/period_list.html' template with a list of historical periods used for tracking Key Results. """ periods = Period.objects.all() context = { "periods": periods, } return render(request, "period/period_list.html", context=context) @login_required @manager_can_enter(perm="pms.add_period") @hx_request_required def period_create(request): """ This view is used to create period objects. Returns: it will redirect to period_view. """ context = {"form": PeriodForm()} if request.method == "POST": form = PeriodForm(request.POST) if form.is_valid(): form.save() messages.success(request, _("Period creation was Successful ")) else: context["form"] = form return render(request, "period/period_create.html", context) @login_required @manager_can_enter(perm="pms.change_period") def period_update(request, period_id): """ This view is used to update period objects. Args: id (int): primarykey of period Returns: it will redirect to period_view. """ period = Period.objects.filter(id=period_id).first() form = PeriodForm(instance=period) context = {"form": form} if request.method == "POST": form = PeriodForm(request.POST, instance=period) if form.is_valid(): form.save() messages.success(request, _("Period updated Successfully. ")) else: context["form"] = form return render(request, "period/period_update.html", context) @login_required @manager_can_enter(perm="pms.delete_period") def period_delete(request, period_id): """ This view is used to delete period objects. Args: id (int): primarykey of period Returns: it will redirect to period_view. """ try: obj_period = Period.objects.get(id=period_id) obj_period.delete() messages.success(request, _("Period deleted successfully.")) except Period.DoesNotExist: messages.error(request, _("Period not found.")) except ProtectedError: messages.error(request, _("Related entries exists")) return redirect("period-hx-view") @login_required def period_change(request): """ this function is used to detect the period change and return the start and end date of that period """ is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if is_ajax: if request.method == "POST": data = json.load(request) period_obj = Period.objects.get(id=data) start_date = period_obj.start_date end_date = period_obj.end_date return JsonResponse({"start_date": start_date, "end_date": end_date}) return JsonResponse({"failed": "failed"}) return HttpResponse(status=204) @login_required def dashboard_view(request): """ This view is used to view dashboard. Returns: it will redirect to dashboard. """ user = request.user employee = Employee.objects.filter(employee_user_id=user).first() is_manager = Employee.objects.filter( employee_work_info__reporting_manager_id=employee ) count_key_result = KeyResult.objects.all().count() if user.has_perm("pms.view_employeeobjective") and user.has_perm( "pms.view_feedback" ): count_objective = EmployeeObjective.objects.all().count() count_feedback = Feedback.objects.all().count() okr_at_risk = EmployeeObjective.objects.filter(status="At Risk") elif is_manager: employees_ids = [employee.id for employee in is_manager] count_objective = EmployeeObjective.objects.filter( employee_id__in=employees_ids ).count() count_feedback = Feedback.objects.filter(employee_id__in=employees_ids).count() okr_at_risk = EmployeeObjective.objects.filter( employee_id__in=employees_ids ).filter(status="At Risk") else: count_objective = EmployeeObjective.objects.filter(employee_id=employee).count() count_key_result = EmployeeKeyResult.objects.filter( employee_objective_id__employee_id=employee ).count() count_feedback = Feedback.objects.filter(employee_id=employee).count() okr_at_risk = EmployeeObjective.objects.filter(employee_id=employee).filter( status="At Risk" ) context = { "count_objective": count_objective, "count_key_result": count_key_result, "count_feedback": count_feedback, "okr_at_risk": okr_at_risk, } return render(request, "dashboard/pms_dashboard.html", context) @login_required def dashboard_objective_status(request): """objective dashboard data""" is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if is_ajax and request.method == "GET": objective_status = EmployeeObjective.STATUS_CHOICES data = {"message": _("No records available at the moment.")} for status in objective_status: objectives = EmployeeObjective.objects.filter( status=status[0], archive=False ) objectives_count = filtersubordinates( request, queryset=objectives, perm="pms.view_employeeobjective" ).count() if objectives_count: data.setdefault("objective_label", []).append(status[1]) data.setdefault("objective_value", []).append(objectives_count) return JsonResponse(data) @login_required def dashboard_key_result_status(request): """key result dashboard data""" is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if is_ajax and request.method == "GET": key_result_status = EmployeeKeyResult.STATUS_CHOICES data = {"message": _("No records available at the moment.")} for i in key_result_status: key_results = EmployeeKeyResult.objects.filter(status=i[0]) key_results_count = filtersubordinates( request, queryset=key_results, perm="pms.view_employeekeyresult", field="employee_objective_id__employee_id", ).count() if key_results_count: data.setdefault("key_result_label", []).append(i[1]) data.setdefault("key_result_value", []).append(key_results_count) return JsonResponse(data) @login_required def dashboard_feedback_status(request): """feedback dashboard data""" is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if is_ajax and request.method == "GET": feedback_status = Feedback.STATUS_CHOICES data = {"message": _("No records available at the moment.")} for i in feedback_status: feedbacks = Feedback.objects.filter(status=i[0]) feedback_count = filtersubordinates( request, queryset=feedbacks, perm="pms.view_feedback" ).count() if feedback_count: data.setdefault("feedback_label", []).append(i[1]) data.setdefault("feedback_value", []).append(feedback_count) return JsonResponse(data) def filtersubordinates(request, queryset, perm=None, field=None): """ This method is used to filter out subordinates queryset element. """ user = request.user if user.has_perm(perm): return queryset manager = Employee.objects.filter(employee_user_id=user).first() if manager: if field is not None: queryset = queryset.filter( **{f"{field}__employee_work_info__reporting_manager_id": manager} ) | queryset.filter(**{field: manager}) else: queryset = queryset.filter( employee_id__employee_work_info__reporting_manager_id=manager ) | queryset.filter(employee_id=manager) return queryset else: queryset = queryset.filter(employee_id=user.employee_get) return queryset @login_required def create_period(request): """ This is an ajax method to return json response to create stage related to the project in the task-all form fields """ if request.method == "GET": form = PeriodForm() if request.method == "POST": form = PeriodForm(request.POST) if form.is_valid(): instance = form.save() return JsonResponse( { "id": instance.id, "name": instance.period_name, "start_date": instance.start_date, "end_date": instance.end_date, } ) errors = form.errors.as_json() return JsonResponse({"errors": errors}) return render(request, "okr/create_period.html", context={"form": form}) @login_required def objective_bulk_archive(request): """ This method is used to archive/un-archive bulk objectivs """ ids = request.POST["ids"] ids = json.loads(ids) is_active = False message = _("un-archived") if request.GET.get("is_active") == "False": is_active = True message = _("archived") for objective_id in ids: objective_obj = EmployeeObjective.objects.get(id=objective_id) objective_obj.archive = is_active objective_obj.save() messages.success( request, _("{objective} is {message}").format( objective=objective_obj, message=message ), ) return JsonResponse({"message": "Success"}) @login_required @manager_can_enter(perm="pms.delete_employeeobjective") def objective_bulk_delete(request): """ This method is used to bulk delete objective """ ids = request.POST["ids"] ids = json.loads(ids) for objective_id in ids: try: objective = EmployeeObjective.objects.get(id=objective_id) if objective.status == "Not Started" or objective.status == "Closed": objective.delete() messages.success( request, _("%(employee)s's %(objective)s deleted") % { "objective": objective.objective, "employee": objective.employee_id, }, ) else: messages.warning( request, _("You can't delete objective %(objective)s with status %(status)s") % {"objective": objective.objective, "status": objective.status}, ) except EmployeeObjective.DoesNotExist: messages.error(request, _("Objective not found.")) return JsonResponse({"message": "Success"}) @login_required def feedback_bulk_archive(request): """ This method is used to archive/un-archive bulk feedbacks """ ids = request.POST["ids"] announy_ids = request.POST["announy_ids"] ids = json.loads(ids) announy_ids = json.loads(announy_ids) is_active = False message = _("un-archived") if request.GET.get("is_active") == "False": is_active = True message = _("archived") for feedback_id in ids: feedback_id = Feedback.objects.get(id=feedback_id) feedback_id.archive = is_active feedback_id.save() messages.success( request, _("{feedback} is {message}").format(feedback=feedback_id, message=message), ) for feedback_id in announy_ids: feedback_id = AnonymousFeedback.objects.get(id=feedback_id) feedback_id.archive = is_active feedback_id.save() messages.success( request, _("{feedback} is {message}").format( feedback=feedback_id.feedback_subject, message=message ), ) return JsonResponse({"message": "Success"}) @login_required @manager_can_enter(perm="pms.delete_feedback") def feedback_bulk_delete(request): """ This method is used to bulk delete feedbacks """ ids = request.POST["ids"] ids = json.loads(ids) for feedback_id in ids: try: feedback = Feedback.objects.get(id=feedback_id) if feedback.status == "Closed" or feedback.status == "Not Started": feedback.delete() messages.success( request, _("Feedback %(review_cycle)s deleted successfully!") % {"review_cycle": feedback.review_cycle}, ) else: messages.warning( request, _( "You can't delete feedback %(review_cycle)s with status %(status)s" ) % { "review_cycle": feedback.review_cycle, "status": feedback.status, }, ) except Feedback.DoesNotExist: messages.error(request, _("Feedback not found.")) return JsonResponse({"message": "Success"}) @login_required def objective_select(request): """ This method is used to return all the id of the employees to select the employee row """ page_number = request.GET.get("page") table = request.GET.get("tableName") user = request.user.employee_get employees = EmployeeObjective.objects.filter(employee_id=user, archive=False) if page_number == "all": if table == "all": if request.user.has_perm("pms.view_employeeobjective"): employees = EmployeeObjective.objects.filter(archive=False) else: employees = EmployeeObjective.objects.filter( employee_id__employee_user_id=request.user ) | EmployeeObjective.objects.filter( employee_id__employee_work_info__reporting_manager_id__employee_user_id=request.user ) else: employees = EmployeeObjective.objects.filter( employee_id=user, archive=False ) employee_ids = [str(emp.id) for emp in employees] total_count = employees.count() context = {"employee_ids": employee_ids, "total_count": total_count} return JsonResponse(context, safe=False) @login_required def objective_select_filter(request): """ This method is used to return all the ids of the filtered employees """ page_number = request.GET.get("page") filtered = request.GET.get("filter") filters = json.loads(filtered) if filtered else {} table = request.GET.get("tableName") user = request.user.employee_get employee_filter = ObjectiveFilter(filters, queryset=EmployeeObjective.objects.all()) if page_number == "all": if table == "all": if request.user.has_perm("pms.view_employeeobjective"): employee_filter = ObjectiveFilter( filters, queryset=EmployeeObjective.objects.all() ) else: employee_filter = ObjectiveFilter( filters, queryset=EmployeeObjective.objects.filter( employee_id__employee_work_info__reporting_manager_id__employee_user_id=request.user ), ) else: employee_filter = ObjectiveFilter( filters, queryset=EmployeeObjective.objects.filter(employee_id=user) ) # Get the filtered queryset filtered_employees = employee_filter.qs employee_ids = [str(emp.id) for emp in filtered_employees] total_count = filtered_employees.count() context = {"employee_ids": employee_ids, "total_count": total_count} return JsonResponse(context) @login_required @hx_request_required def anonymous_feedback_add(request): """ View function for adding anonymous feedback. Parameters: - request: HttpRequest object. Returns: - If request method is POST and form is valid: Saves the submitted feedback and sends a notification if based on an employee. Returns a JavaScript snippet to reload the page. - If request method is GET or form is invalid: Renders the 'anonymous/anonymous_feedback_form.html' template with the feedback form. """ if request.method == "POST": form = AnonymousFeedbackForm(request.POST) anonymous_id = request.user.id if form.is_valid(): feedback = form.save(commit=False) feedback.anonymous_feedback_id = anonymous_id feedback.save() if feedback.based_on == "employee": try: notify.send( User.objects.filter(username="Horilla Bot").first(), recipient=feedback.employee_id.employee_user_id, verb="You received an anonymous feedback!", verb_ar="لقد تلقيت تقييمًا مجهولًا!", verb_de="Sie haben anonymes Feedback erhalten!", verb_es="¡Has recibido un comentario anónimo!", verb_fr="Vous avez reçu un feedback anonyme!", redirect=reverse("feedback-view"), icon="bag-check", ) except: pass return HttpResponse("") else: form = AnonymousFeedbackForm() context = {"form": form, "create": True} return render(request, "anonymous/anonymous_feedback_form.html", context) @login_required @hx_request_required def edit_anonymous_feedback(request, obj_id): """ View function for editing anonymous feedback. Parameters: - request: HttpRequest object. - id: ID of the AnonymousFeedback instance to be edited. Returns: - If request method is POST and form is valid: Saves the edited feedback. Returns a JavaScript snippet to reload the page. - If request method is GET or form is invalid: Renders the 'anonymous/anonymous_feedback_form.html' template with the feedback form pre-filled with existing data. """ feedback = AnonymousFeedback.objects.get(id=obj_id) # checking feedback owner if str(request.user.id) == feedback.anonymous_feedback_id or request.user.has_perm( "pms.change_anonymousfeedback" ): form = AnonymousFeedbackForm(instance=feedback) anonymous_id = request.user.id if request.method == "POST": form = AnonymousFeedbackForm(request.POST, instance=feedback) if form.is_valid(): feedback = form.save(commit=False) feedback.anonymous_feedback_id = anonymous_id feedback.save() return HttpResponse("") context = {"form": form, "create": False} return render(request, "anonymous/anonymous_feedback_form.html", context) else: messages.info(request, _("You are don't have permissions.")) return HttpResponse("") @login_required def archive_anonymous_feedback(request, obj_id): """ this function is used to archive the feedback for employee args: id(int): primarykey of feedback """ feedback = AnonymousFeedback.objects.get(id=obj_id) # checking feedback owner if str(request.user.id) == feedback.anonymous_feedback_id or request.user.has_perm( "pms.anonymousfeedback" ): if feedback.archive: feedback.archive = False feedback.save() messages.info(request, _("Feedback un-archived successfully!.")) elif not feedback.archive: feedback.archive = True feedback.save() messages.info(request, _("Feedback archived successfully!.")) else: messages.info(request, _("You are don't have permissions.")) return redirect(feedback_list_view) @login_required @permission_required("pms.delete_anonymousfeedback") def delete_anonymous_feedback(request, obj_id): """ Deletes an anonymous feedback entry. Parameters: - request: HttpRequest object. - id: ID of the AnonymousFeedback instance to be deleted. Returns: Redirects to the feedback list view after deleting the feedback. """ try: feedback = AnonymousFeedback.objects.get(id=obj_id) feedback.delete() messages.success(request, _("Feedback deleted successfully!")) except IntegrityError: messages.error( request, _("Failed to delete feedback: Feedback template is in use.") ) except AnonymousFeedback.DoesNotExist: messages.error(request, _("Feedback not found.")) except ProtectedError: messages.error(request, _("Related entries exists")) return redirect(feedback_list_view) @login_required @hx_request_required def view_single_anonymous_feedback(request, obj_id): """ Renders a view to display a single anonymous feedback entry. Parameters: - request: HttpRequest object. - id: ID of the AnonymousFeedback instance to be displayed. Returns: Renders the 'anonymous/single_view.html' template with the details of the specified anonymous feedback. """ feedback = AnonymousFeedback.objects.get(id=obj_id) return render(request, "anonymous/single_view.html", {"feedback": feedback}) @login_required @hx_request_required def employee_keyresult_creation(request, emp_obj_id): """ This view is for employee keyresult creation , and returns a employee keyresult form. Returns: GET: employee keyresult form POST: employee keyresult created, and returnes to employee objective details view """ emp_objective = EmployeeObjective.objects.get(id=emp_obj_id) if ( request.user.employee_get in emp_objective.objective_id.managers.all() or request.user.has_perm("pms.add_employeekeyresult") ): employee = emp_objective.employee_id data = request.GET.copy() # Convert QueryDict to a regular dictionary cleaned_data = { key: value if len(value) > 1 else value[0] for key, value in data.lists() } if not cleaned_data.get("employee_objective_id"): cleaned_data["employee_objective_id"] = emp_obj_id emp_key_result = EmployeeKeyResultForm(initial=cleaned_data) if request.method == "POST": emp_key_result = EmployeeKeyResultForm(request.POST) if emp_key_result.is_valid(): emp_key_result.save() emp_objective.update_objective_progress() key_result = emp_key_result.cleaned_data["key_result_id"] emp_objective.key_result_id.add(key_result) # assignees = emp_key_result.cleaned_data['assignees'] # start_date =emp_key_result.cleaned_data['start_date'] messages.success(request, _("Key result assigned sucessfully.")) notify.send( request.user.employee_get, recipient=employee.employee_user_id, verb="You got an Key Result!.", verb_ar="لقد حصلت على نتيجة رئيسية!", verb_de="Du hast ein Schlüsselergebnis erreicht!", verb_es="¡Has conseguido un Resultado Clave!", verb_fr="Vous avez obtenu un Résultat Clé!", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": emp_objective.objective_id.id}, ), ) return HttpResponse("") context = { "form": emp_key_result, "emp_objective": emp_objective, } return render(request, "okr/key_result/kr_form.html", context=context) else: messages.info(request, _("You are don't have permissions.")) return HttpResponse("") @login_required @hx_request_required @manager_can_enter(perm="pms.add_employeekeyresult") def employee_keyresult_update(request, kr_id): """ This function is for update employee keyresult, and returns a employee keyresult form. Returns: GET: employee keyresult form POST: employee keyresult updated, and returnes to employee objective details view """ emp_kr = EmployeeKeyResult.objects.get(id=kr_id) employee = emp_kr.employee_objective_id.employee_id data = request.GET.copy() if len(data) > 1: # Convert QueryDict to a regular dictionary cleaned_data = { key: value if len(value) > 1 else value[0] for key, value in data.lists() } emp_key_result = EmployeeKeyResultForm(initial=cleaned_data, instance=emp_kr) else: emp_key_result = EmployeeKeyResultForm(instance=emp_kr) if request.method == "POST": emp_key_result = EmployeeKeyResultForm(request.POST, instance=emp_kr) if emp_key_result.is_valid(): emp_key_result.save() emp_kr.employee_objective_id.update_objective_progress() messages.success(request, _("Key result Updated sucessfully.")) notify.send( request.user.employee_get, recipient=employee.employee_user_id, verb="Your Key Result updated.", verb_ar="تم تحديث نتيجتك الرئيسية.", verb_de="Ihr Schlüsselergebnis wurde aktualisiert.", verb_es="Se ha actualizado su Resultado Clave.", verb_fr="Votre Résultat Clé a été mis à jour.", redirect=reverse( "objective-detailed-view", kwargs={"obj_id": emp_kr.employee_objective_id.objective_id.id}, ), ) return HttpResponse("") context = { "form": emp_key_result, "update": True, } return render(request, "okr/key_result/kr_form.html", context=context) @login_required @manager_can_enter(perm="pms.delete_employeekeyresult") def delete_employee_keyresult(request, kr_id): """ This function is used to delete the employee key result args: kr_id(int) : pimarykey of EmployeeKeyResult return: redirect to detailed of employee objective """ emp_kr = EmployeeKeyResult.objects.get(id=kr_id) # employee = emp_kr.employee_id objective = emp_kr.employee_objective_id.objective_id emp_objective = emp_kr.employee_objective_id emp_kr.delete() emp_objective.update_objective_progress() # objective.assignees.remove(employee) messages.success(request, _("Objective deleted successfully!.")) if request.GET.get("dashboard"): return redirect(f"/pms/dashboard-view") return redirect(f"/pms/objective-detailed-view/{objective.id}") @login_required def employee_keyresult_update_status(request, kr_id): """ This function is used to delete the employee key result args: kr_id(int) : pimarykey of EmployeeKeyResult return: redirect to detailed of employee objective """ emp_kr = EmployeeKeyResult.objects.get(id=kr_id) status = request.POST.get("key_result_status") emp_kr.status = status emp_kr.save() messages.success(request, _("Key result sattus changed to {}.").format(status)) return redirect( f"/pms/kr-table-view/{emp_kr.employee_objective_id.id}?&objective_id={emp_kr.employee_objective_id.objective_id.id}" ) @login_required def key_result_current_value_update(request): """ This method is used to update keyresult current value """ try: current_value = eval_validate(request.POST.get("current_value")) emp_kr_id = eval_validate(request.POST.get("emp_key_result_id")) emp_kr = EmployeeKeyResult.objects.get(id=emp_kr_id) if current_value <= emp_kr.target_value: emp_kr.current_value = current_value emp_kr.save() emp_kr.employee_objective_id.update_objective_progress() return JsonResponse({"type": "sucess"}) except: return JsonResponse({"type": "error"}) @login_required def get_keyresult_data(request): """ Function to get the data of keyresult and return to the form. """ key_id = ( request.GET.get("key_result_id") if request.GET.get("key_result_id") else None ) key_result = KeyResult.objects.filter(id=key_id).first() if key_result: if request.GET.get("data-update") == "target_value": return HttpResponse( f'' ) if request.GET.get("data-update") == "end_date": start_date = datetime.datetime.strptime( request.GET.get("start_date"), "%Y-%m-%d" ) end_date = (start_date + relativedelta(days=key_result.duration)).date() return HttpResponse( f'' ) else: if request.GET.get("data-update") == "target_value": return HttpResponse( f'' ) if request.GET.get("data-update") == "end_date": return HttpResponse( f'' ) @login_required def view_meetings(request): """ This view is used to view the meeting , Returns: it will redirect to view_meetings.html . """ previous_data = request.GET.urlencode() meetings = Meetings.objects.filter(is_active=True) if not request.user.has_perm("pms.view_meetings"): employee_id = request.user.employee_get meetings = meetings.filter( Q(employee_id=employee_id) | Q(manager=employee_id) ).distinct() meetings = meetings.order_by("-id") filter_form = MeetingsFilter() meetings = paginator_qry(meetings, request.GET.get("page")) requests_ids = json.dumps([instance.id for instance in meetings.object_list]) data_dict = parse_qs(previous_data) get_key_instances(Meetings, data_dict) all_meetings = Meetings.objects.filter() context = { "all_meetings": all_meetings, "meetings": meetings, "filter_form": filter_form.form, "requests_ids": requests_ids, } return render(request, "meetings/view_meetings.html", context) @login_required @hx_request_required def create_meetings(request): """ This view is used to create the meeting , Returns: Get: it renders form.html to create the meeting. Post: it will redirect to view_meetings.html . """ instance_id = eval_validate(str(request.GET.get("instance_id"))) instance = None initial = {"manager": request.user.employee_get, "employee_id": None} if instance_id and isinstance(instance_id, int): instance = Meetings.objects.filter(id=instance_id).first() initial = {} form = MeetingsForm(instance=instance, initial=initial) if request.method == "POST": form = MeetingsForm(request.POST, instance=instance) if form.is_valid(): instance = form.save() managers = [ manager.employee_user_id for manager in form.cleaned_data["manager"] ] answer_employees = [ answer_emp.employee_user_id for answer_emp in form.cleaned_data["answer_employees"] ] employees = form.cleaned_data["employee_id"] employees = [ employee.employee_user_id for employee in employees.exclude( id__in=form.cleaned_data["answer_employees"] ) ] try: notify.send( request.user.employee_get, recipient=answer_employees, verb=f"You have been added as an answerable employee for the meeting {instance.title}", verb_ar=f"لقد تمت إضافتك كموظف مسؤول عن الاجتماع {instance.title}", verb_de=f"Du wurden als Mitarbeiter zum Ausfüllen für das {instance.title}-Meeting hinzugefügt", verb_es=f"Se le ha agregado como empleado responsable de la reunión {instance.title}", verb_fr=f"Vous avez été ajouté en tant que employé responsable pour la réunion {instance.title}", icon="information", redirect=reverse("view-meetings") + f"?search={instance.title}", ) except Exception as error: pass try: notify.send( request.user.employee_get, recipient=employees, verb=f"You have been added to the meeting {instance.title}", verb_ar=f"لقد تمت إضافتك إلى اجتماع {instance.title}.", verb_de=f"Sie wurden zur {instance.title} Besprechung hinzugefügt", verb_es=f"Te han agregado a la reunión {instance.title}", verb_fr=f"Vous avez été ajouté à la réunion {instance.title}", icon="information", redirect=reverse("view-meetings") + f"?search={instance.title}", ) except Exception as error: pass try: notify.send( request.user.employee_get, recipient=managers, verb=f"You have been added as a manager for the meeting {instance.title}", verb_ar=f"لقد تمت إضافتك كمدير للاجتماع {instance.title}", verb_de=f"Sie wurden als Manager für das Meeting {instance.title} hinzugefügt", verb_es=f"Se le ha agregado como administrador de la reunión {instance.title}", verb_fr=f"Vous avez été ajouté en tant que responsable de réunion {instance.title}", icon="information", redirect=reverse("view-meetings") + f"?search={instance.title}", ) except Exception as error: pass messages.success(request, _("Meeting added successfully")) return render( request, "meetings/form.html", { "form": form, }, ) from django.db.models import F @login_required @permission_required("pms.change_meetings") def archive_meetings(request, obj_id): """ This view is used to archive and unarchive the meeting , Args: meet_id(int) : primarykey of the meeting. employee_id(int) : primarykey of the employee Returns: it will redirect to view_meetings.html . """ meeting = Meetings.find(obj_id) meeting.is_active = not meeting.is_active meeting.save() message = ( _("Meeting unarchived successfully") if meeting.is_active else _("Meeting archived successfully") ) messages.success(request, message) return HttpResponse("") @login_required @permission_required("pms.change_meetings") def meeting_manager_remove(request, meet_id, manager_id): """ This view is used to remove the manager from the meeting , Args: meet_id(int) : primarykey of the meeting. employee_id(int) : primarykey of the employee Returns: it will redirect to view_meetings.html . """ meeting = Meetings.objects.filter(id=meet_id).first() meeting.manager.remove(manager_id) meeting.save() messages.success( request, _("Manager has been successfully removed from the meeting.") ) return HttpResponse("") @login_required def meeting_employee_remove(request, meet_id, employee_id): """ This view is used to remove the employees from the meeting , Args: meet_id(int) : primarykey of the meeting. employee_id(int) : primarykey of the employee Returns: it will redirect to view_meetings.html . """ meeting = Meetings.objects.filter(id=meet_id).first() meeting.employee_id.remove(employee_id) meeting.save() messages.success( request, _("Employee has been successfully removed from the meeting.") ) return HttpResponse("") @login_required @hx_request_required def filter_meetings(request): """ This view is used to filter the meeting , Returns: it will render to meeting_list.html . """ previous_data = request.GET.urlencode() filter_obj = MeetingsFilter(request.GET).qs if not request.user.has_perm("pms.view_meetings"): employee_id = request.user.employee_get filter_obj = filter_obj.filter( Q(employee_id=employee_id) | Q(manager=employee_id) ).distinct() if ( request.GET.get("is_active") is None or request.GET.get("is_active") == "unknown" ): filter_obj = filter_obj.filter(is_active=True) filter_obj = filter_obj.order_by("-id") filter_obj = sortby(request, filter_obj, "sortby") filter_obj = paginator_qry(filter_obj, request.GET.get("page")) requests_ids = json.dumps([instance.id for instance in filter_obj.object_list]) data_dict = parse_qs(previous_data) get_key_instances(EmployeeObjective, data_dict) return render( request, "meetings/meetings_list.html", { "meetings": filter_obj, "pd": previous_data, "filter_dict": data_dict, "requests_ids": requests_ids, }, ) @login_required @meeting_manager_can_enter("pms.change_meetings") def add_response(request, obj_id): """ This view is used to add the MoM to the meeting , Args: id(int) : primarykey of the meeting. Returns: it will redirect to view_meetings.html . """ meeting = Meetings.find(obj_id) if request.method == "POST": response = request.POST.get("response") meeting.response = response meeting.save() messages.success( request, _("Minutes of Meeting (MoM) have been created successfully") ) return render(request, "meetings/mom_form.html", {"meeting": meeting}) @login_required @hx_request_required @meeting_manager_can_enter("pms.change_meetings", answerable=True) def meeting_answer_get(request, id, **kwargs): """ This view is used to render the Meeting questions , Args: id(int) : primarykey of the meeting. Returns: it will redirect to meeting_answer.html . """ employee = request.user.employee_get if employee_id := request.GET.get("emp_id"): employee = Employee.objects.filter(id=employee_id).first() meeting = Meetings.objects.get(id=id) answer = MeetingsAnswer.objects.filter(meeting_id=meeting, employee_id=employee) questions = meeting.question_template.question.all() options = QuestionOptions.objects.all() meeting_employees = meeting.manager.all() | meeting.employee_id.all() if answer or request.GET.get("emp_id"): return redirect("meeting-answer-view", id=meeting.id, emp_id=employee.id) if not employee in meeting_employees: messages.info(request, _("You are not allowed to answer")) return redirect(view_meetings) context = { "questions": questions, "options": options, "meeting": meeting, } return render(request, "meetings/meeting_answer.html", context) @login_required @meeting_manager_can_enter("pms.change_meetings", answerable=True) def meeting_answer_post(request, id): """ This view is used to create meeting answer , Args: id(int) : primarykey of the meeting. Returns: it will redirect to view_meeting if the form was success full. """ employee = request.user.employee_get meeting = Meetings.objects.get(id=id) question_template = meeting.question_template.question.all() if request.method == "POST": for question in question_template: if request.POST.get(f"answer{question.id}"): answer = request.POST.get(f"answer{question.id}") MeetingsAnswer.objects.get_or_create( answer={"answer": answer}, question_id=question, meeting_id=meeting, employee_id=employee, ) messages.success( request, _("Questions for meeting %(meeting)s has been answered successfully!.") % {"meeting": meeting.title}, ) return redirect(view_meetings) @login_required @meeting_manager_can_enter("pms.change_meetings", answerable=True) def meeting_answer_view(request, id, emp_id, **kwargs): """ This view is used to view the meeting for employee. Args: id(int) : primarykey of the meeting. emp_id(int) : id of the employee Returns: it will return meeting answer object to meeting_answer_view. """ employee = Employee.objects.filter(id=emp_id).first() meeting = Meetings.objects.get(id=id) answers = MeetingsAnswer.objects.filter(meeting_id=meeting, employee_id=employee) context = { "answers": answers, "meeting": meeting, } return render(request, "meetings/meeting_answer_view.html", context) @login_required @hx_request_required @meeting_manager_can_enter("pms.change_meetings", answerable=True) def meeting_question_template_view(request, meet_id): """ This view is used to view the activity sidebar page for employee. Args: id(int) : primarykey of the meeting. Returns: it will return meeting answer object to meeting_question_template_view. """ employee = request.user.employee_get meeting = Meetings.objects.get(id=meet_id) answer = MeetingsAnswer.objects.filter(meeting_id=meeting, employee_id=employee) is_answered = False if answer: is_answered = True context = { "is_answered": is_answered, "meeting": meeting, } return render(request, "meetings/meeting_question_template_view.html", context) @login_required def meeting_single_view(request, id): meeting = Meetings.objects.filter(id=id).first() context = {"meeting": meeting} requests_ids_json = request.GET.get("requests_ids") if requests_ids_json: requests_ids = json.loads(requests_ids_json) previous_id, next_id = closest_numbers(requests_ids, id) context["requests_ids"] = requests_ids_json context["previous"] = previous_id context["next"] = next_id return render(request, "meetings/meeting_single_view.html", context) @login_required @hx_request_required @owner_can_enter("pms.view_feedback", Employee) def performance_tab(request, emp_id): """ This function is used to view performance tab of an employee in employee individual & profile view. Parameters: request (HttpRequest): The HTTP request object. emp_id (int): The id of the employee. Returns: return performance-tab template """ feedback_own = Feedback.objects.filter(employee_id=emp_id, archive=False) today = datetime.datetime.today() context = { "self_feedback": feedback_own, "current_date": today, } return render(request, "tabs/performance-tab.html", context=context) @login_required def dashboard_feedback_answer(request): previous_data = request.GET.urlencode() page_number = request.GET.get("page") employee = request.user.employee_get feedback_requested = Feedback.objects.filter( Q(manager_id=employee, manager_id__is_active=True) | Q(colleague_id=employee, colleague_id__is_active=True) | Q(subordinate_id=employee, subordinate_id__is_active=True) ).distinct() feedbacks = feedback_requested.exclude(feedback_answer__employee_id=employee) feedbacks = paginator_qry(feedbacks, page_number) return render( request, "request_and_approve/feedback_answer.html", { "feedbacks": feedbacks, "pd": previous_data, "current_date": datetime.date.today(), }, ) @login_required @permission_required("pms.delete_bonuspointsetting") def delete_bonus_point_setting(request, pk): """ Delete bonus point setting """ try: BonusPointSetting.objects.get(id=pk).delete() messages.success(request, "Bonus Point Setting deleted") except Exception as e: logger(e) messages.error(request, "Something went wrong") return redirect(reverse("bonus-point-setting-list-view")) @login_required @permission_required("pms.delete_employeebonuspoint") def delete_employee_bonus_point(request, pk): """ Automation delete view """ try: bonus = EmployeeBonusPoint.objects.get(id=pk) bonus.delete() messages.success(request, _(f"{bonus} deleted")) except Exception as e: logger(e) messages.error(request, _("Something went wrong")) return redirect(reverse("employee-bonus-point-list-view")) @login_required def bonus_setting_form_values(request): model = request.GET["model"] """ This method is to render `mail to` fields """ model_path = request.GET["model"] to_fields, mail_details_choice, model_class = generate_choices(model_path) class InstantModelForm(forms.ModelForm): """ InstantModelForm """ class Meta: model = model_class fields = "__all__" serialized_form = serialize_form(InstantModelForm(), "automation_multiple_") return JsonResponse( { "choices": to_fields, "mail_details_choice": mail_details_choice, "serialized_form": serialized_form, } ) @login_required @permission_required("pms.update_bonuspointsetting") def update_isactive_bonuspoint_setting(request, obj_id): """ htmx function to update is active field in BonusPointSetting. Args: - is_active: Boolean value representing the state of BonusPointSetting, - obj_id: Id of BonusPointSetting object. """ is_active = request.POST.get("is_active") bonus_point_setting = BonusPointSetting.objects.get(id=obj_id) if is_active == "on": bonus_point_setting.is_active = True messages.success(request, _("Bonus point setting activated successfully.")) else: bonus_point_setting.is_active = False messages.success(request, _("Bonus point setting deactivated successfully.")) bonus_point_setting.save() return HttpResponse("")