Files
hrms/base/announcement.py

417 lines
15 KiB
Python
Raw Permalink Normal View History

"""
Module for managing announcements, including creation, updates, comments, and views.
"""
import json
from datetime import datetime, timedelta
from django.contrib import messages
from django.contrib.auth.models import User
from django.db.models import Q
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from base.forms import AnnouncementCommentForm, AnnouncementForm
from base.methods import closest_numbers, filter_own_records
from base.models import (
Announcement,
AnnouncementComment,
AnnouncementExpire,
AnnouncementView,
Attachment,
)
from employee.models import Employee
from horilla.decorators import hx_request_required, login_required, permission_required
from notifications.signals import notify
@login_required
@hx_request_required
def announcement_list(request):
"""
Renders a list of announcements for the authenticated user.
This view fetches all announcements and updates their expiration dates if not already set.
It filters announcements based on the user's permissions and whether the announcements
are still valid (not expired). Additionally, it checks if the user has viewed each announcement.
"""
general_expire_date = (
AnnouncementExpire.objects.values_list("days", flat=True).first() or 30
)
announcements = Announcement.objects.all()
announcements_to_update = []
for announcement in announcements.filter(expire_date__isnull=True):
announcement.expire_date = announcement.created_at + timedelta(
days=general_expire_date
)
announcements_to_update.append(announcement)
if announcements_to_update:
Announcement.objects.bulk_update(announcements_to_update, ["expire_date"])
has_view_permission = request.user.has_perm("base.view_announcement")
announcements = announcements.filter(expire_date__gte=datetime.today().date())
announcement_items = (
announcements
if has_view_permission
else announcements.filter(
Q(employees=request.user.employee_get) | Q(employees__isnull=True)
)
)
filtered_announcements = announcement_items.prefetch_related(
"announcementview_set"
).order_by("-created_at")
for announcement in filtered_announcements:
announcement.has_viewed = announcement.announcementview_set.filter(
user=request.user, viewed=True
).exists()
instance_ids = json.dumps([instance.id for instance in filtered_announcements])
context = {
"announcements": filtered_announcements,
"general_expire_date": general_expire_date,
"instance_ids": instance_ids,
}
return render(request, "announcement/announcements_list.html", context)
@login_required
@hx_request_required
def create_announcement(request):
"""
Create a new announcement and notify relevant users.
"""
form = AnnouncementForm()
if request.method == "POST":
form = AnnouncementForm(request.POST, request.FILES)
if form.is_valid():
announcement, attachment_ids = form.save(commit=False)
announcement.save()
announcement.attachments.set(attachment_ids)
employees = form.cleaned_data["employees"]
departments = form.cleaned_data["department"]
job_positions = form.cleaned_data["job_position"]
company = form.cleaned_data["company_id"]
announcement.department.set(departments)
announcement.job_position.set(job_positions)
announcement.company_id.set(company)
dept_ids = departments.values_list("id", flat=True)
job_ids = job_positions.values_list("id", flat=True)
employees_from_dept = Employee.objects.filter(
employee_work_info__department_id__in=dept_ids
)
employees_from_job = Employee.objects.filter(
employee_work_info__job_position_id__in=job_ids
)
all_employees = (
employees | employees_from_dept | employees_from_job
).distinct()
announcement.employees.add(*all_employees)
all_emps = employees_from_dept | employees_from_job | employees
user_map = User.objects.filter(employee_get__in=all_emps).distinct()
dept_emp_ids = set(employees_from_dept.values_list("id", flat=True))
job_emp_ids = set(employees_from_job.values_list("id", flat=True))
direct_emp_ids = set(employees.values_list("id", flat=True))
notified_ids = dept_emp_ids.union(job_emp_ids)
direct_only_ids = direct_emp_ids - notified_ids
sender = request.user.employee_get
def send_notification(users, verb):
if users.exists():
notify.send(
sender,
recipient=users,
verb=verb,
verb_ar="لقد تم ذكرك في إعلان.",
verb_de="Sie wurden in einer Ankündigung erwähnt.",
verb_es="Has sido mencionado en un anuncio.",
verb_fr="Vous avez été mentionné dans une annonce.",
redirect="/",
icon="chatbox-ellipses",
)
send_notification(
user_map.filter(employee_get__id__in=dept_emp_ids),
_("Your department was mentioned in an announcement."),
)
send_notification(
user_map.filter(employee_get__id__in=job_emp_ids),
_("Your job position was mentioned in an announcement."),
)
send_notification(
user_map.filter(employee_get__id__in=direct_only_ids),
_("You have been mentioned in an announcement."),
)
messages.success(request, _("Announcement created successfully."))
form = AnnouncementForm() # Reset the form
return render(request, "announcement/announcement_form.html", {"form": form})
@login_required
@hx_request_required
def delete_announcement(request, anoun_id):
"""
This method is used to delete announcements.
"""
announcement = Announcement.find(anoun_id)
if announcement:
announcement.delete()
messages.success(request, _("Announcement deleted successfully."))
instance_ids = request.GET.get("instance_ids")
instance_ids_list = json.loads(instance_ids)
__, next_instance_id = (
closest_numbers(instance_ids_list, anoun_id)
if instance_ids_list
else (None, None)
)
if anoun_id in instance_ids_list:
instance_ids_list.remove(anoun_id)
if next_instance_id and next_instance_id != anoun_id:
url = reverse("announcement-single-view", kwargs={"anoun_id": next_instance_id})
return redirect(f"{url}?instance_ids={json.dumps(instance_ids_list)}")
return redirect(announcement_single_view)
@login_required
@hx_request_required
def update_announcement(request, anoun_id):
"""
This method renders form and template to update Announcement
"""
announcement = Announcement.objects.get(id=anoun_id)
form = AnnouncementForm(instance=announcement)
existing_attachments = list(announcement.attachments.all())
instance_ids = request.GET.get("instance_ids")
if request.method == "POST":
form = AnnouncementForm(request.POST, request.FILES, instance=announcement)
if form.is_valid():
anou, attachment_ids = form.save(commit=False)
anou.save()
if attachment_ids:
all_attachments = set(existing_attachments) | set(
Attachment.objects.filter(id__in=attachment_ids)
)
anou.attachments.set(all_attachments)
else:
anou.attachments.set(existing_attachments)
employees = form.cleaned_data["employees"]
departments = form.cleaned_data["department"]
job_positions = form.cleaned_data["job_position"]
company = form.cleaned_data["company_id"]
anou.department.set(departments)
anou.job_position.set(job_positions)
anou.company_id.set(company)
messages.success(request, _("Announcement updated successfully."))
emp_dep = User.objects.filter(
employee_get__employee_work_info__department_id__in=departments
)
emp_jobs = User.objects.filter(
employee_get__employee_work_info__job_position_id__in=job_positions
)
employees = employees | Employee.objects.filter(
employee_work_info__department_id__in=departments
)
employees = employees | Employee.objects.filter(
employee_work_info__job_position_id__in=job_positions
)
anou.employees.add(*employees)
notify.send(
request.user.employee_get,
recipient=emp_dep,
verb="Your department was mentioned in a post.",
verb_ar="تم ذكر قسمك في منشور.",
verb_de="Ihr Abteilung wurde in einem Beitrag erwähnt.",
verb_es="Tu departamento fue mencionado en una publicación.",
verb_fr="Votre département a été mentionné dans un post.",
redirect="/",
icon="chatbox-ellipses",
)
notify.send(
request.user.employee_get,
recipient=emp_jobs,
verb="Your job position was mentioned in a post.",
verb_ar="تم ذكر وظيفتك في منشور.",
verb_de="Ihre Arbeitsposition wurde in einem Beitrag erwähnt.",
verb_es="Tu puesto de trabajo fue mencionado en una publicación.",
verb_fr="Votre poste de travail a été mentionné dans un post.",
redirect="/",
icon="chatbox-ellipses",
)
return render(
request,
"announcement/announcement_update_form.html",
{
"form": form,
"instance_ids": instance_ids,
"hx_target": request.META.get("HTTP_HX_TARGET", ""),
},
)
@login_required
@hx_request_required
def remove_announcement_file(request, obj_id, attachment_id):
announcement = get_object_or_404(Announcement, id=obj_id)
attachment = get_object_or_404(Attachment, id=attachment_id)
announcement.attachments.remove(attachment)
messages.success(request, _("The file has been successfully deleted."))
return HttpResponse("<script>reloadMessage();</script>")
@login_required
@hx_request_required
def create_announcement_comment(request, anoun_id):
"""
This method renders form and template to create Announcement comments
"""
anoun = Announcement.objects.filter(id=anoun_id).first()
emp = request.user.employee_get
form = AnnouncementCommentForm(
initial={"employee_id": emp.id, "request_id": anoun_id}
)
comments = AnnouncementComment.objects.filter(announcement_id=anoun_id)
commentators = []
if comments:
for i in comments:
commentators.append(i.employee_id.employee_user_id)
unique_users = list(set(commentators))
if request.method == "POST":
form = AnnouncementCommentForm(request.POST)
if form.is_valid():
form.instance.employee_id = emp
form.instance.announcement_id = anoun
form.save()
form = AnnouncementCommentForm(
initial={"employee_id": emp.id, "request_id": anoun_id}
)
messages.success(request, _("You commented a post."))
notify.send(
request.user.employee_get,
recipient=unique_users,
verb=f"Comment under the announcement {anoun.title}.",
verb_ar=f"تعليق تحت الإعلان {anoun.title}.",
verb_de=f"Kommentar unter der Ankündigung {anoun.title}.",
verb_es=f"Comentario bajo el anuncio {anoun.title}.",
verb_fr=f"Commentaire sous l'annonce {anoun.title}.",
redirect="/",
icon="chatbox-ellipses",
)
return redirect("announcement-view-comment", anoun_id=anoun_id)
return render(
request,
"announcement/comment_view.html",
{"form": form, "request_id": anoun_id},
)
@login_required
@hx_request_required
def comment_view(request, anoun_id):
"""
This method is used to view all comments in the announcements
"""
announcement = Announcement.objects.get(id=anoun_id)
comments = AnnouncementComment.objects.filter(announcement_id=anoun_id).order_by(
"-created_at"
)
if not announcement.public_comments:
comments = filter_own_records(
request, comments, "base.view_announcementcomment"
)
no_comments = not comments.exists()
return render(
request,
"announcement/comment_view.html",
{
"comments": comments,
"no_comments": no_comments,
"request_id": anoun_id,
"announcement": announcement,
},
)
@login_required
@hx_request_required
def delete_announcement_comment(request, comment_id):
"""
This method is used to delete announcement comments
"""
comment = AnnouncementComment.objects.get(id=comment_id)
comment.delete()
messages.success(request, _("Comment deleted successfully!"))
return HttpResponse()
@login_required
@hx_request_required
def announcement_single_view(request, anoun_id=None):
"""
This method is used to render single announcements.
"""
announcement_instance = Announcement.find(anoun_id)
instance_ids = request.GET.get("instance_ids")
instance_ids_list = json.loads(instance_ids) if instance_ids else []
previous_instance_id, next_instance_id = (
closest_numbers(instance_ids_list, anoun_id)
if instance_ids_list
else (None, None)
)
if announcement_instance:
announcement_view_obj, _ = AnnouncementView.objects.get_or_create(
user=request.user, announcement=announcement_instance
)
announcement_view_obj.viewed = True
announcement_view_obj.save()
context = {
"announcement": announcement_instance,
"instance_ids": instance_ids,
"previous_instance_id": previous_instance_id,
"next_instance_id": next_instance_id,
}
return render(request, "announcement/announcement_one.html", context)
@login_required
@hx_request_required
@permission_required("base.view_announcement")
def viewed_by(request):
"""
This method is used to view the employees
"""
announcement_id = request.GET.get("announcement_id")
viewed_users = AnnouncementView.objects.filter(
announcement_id__id=announcement_id, viewed=True
)
return render(request, "announcement/viewed_by.html", {"viewed_by": viewed_users})