2684 lines
98 KiB
Python
2684 lines
98 KiB
Python
"""
|
|
Module for managing biometric devices and employee attendance.
|
|
|
|
Includes classes and functions for adding, editing, and deleting biometric devices,
|
|
as well as scheduling attendance capture. Also provides views for managing employees,
|
|
registered on biometric devices.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from threading import Event, Thread
|
|
from urllib.parse import parse_qs, unquote
|
|
|
|
import pytz
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from django.contrib import messages
|
|
from django.http import HttpResponse, JsonResponse
|
|
from django.shortcuts import redirect, render
|
|
from django.template.loader import render_to_string
|
|
from django.utils import timezone as django_timezone
|
|
from django.utils.translation import gettext as __
|
|
from django.utils.translation import gettext_lazy as _
|
|
from zk import ZK
|
|
from zk import exception as zk_exception
|
|
|
|
from attendance.methods.utils import Request
|
|
from attendance.models import AttendanceActivity
|
|
from attendance.views.clock_in_out import clock_in, clock_out
|
|
from base.methods import get_key_instances, get_pagination
|
|
from employee.models import Employee, EmployeeWorkInformation
|
|
from horilla.decorators import (
|
|
hx_request_required,
|
|
install_required,
|
|
login_required,
|
|
permission_required,
|
|
)
|
|
from horilla.filters import HorillaPaginator
|
|
from horilla.horilla_settings import BIO_DEVICE_THREADS
|
|
from horilla.settings import TIME_ZONE
|
|
|
|
from .anviz import CrossChexCloudAPI
|
|
from .cosec import COSECBiometric
|
|
from .dahua import DahuaAPI
|
|
from .etimeoffice import ETimeOfficeAPI
|
|
from .filters import BiometricDeviceFilter
|
|
from .forms import (
|
|
BiometricDeviceForm,
|
|
BiometricDeviceSchedulerForm,
|
|
CosecUserAddForm,
|
|
COSECUserForm,
|
|
DahuaUserForm,
|
|
EmployeeBiometricAddForm,
|
|
MapBioUsers,
|
|
)
|
|
from .models import BiometricDevices, BiometricEmployees, COSECAttendanceArguments
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def str_time_seconds(time):
|
|
"""
|
|
this method is used reconvert time in H:M formate string back to seconds and return it
|
|
args:
|
|
time : time in H:M format
|
|
"""
|
|
|
|
ftr = [3600, 60, 1]
|
|
return sum(a * b for a, b in zip(ftr, map(int, time.split(":"))))
|
|
|
|
|
|
def paginator_qry(qryset, page_number):
|
|
"""
|
|
This method is used to paginate query set
|
|
"""
|
|
paginator = HorillaPaginator(qryset, get_pagination())
|
|
qryset = paginator.get_page(page_number)
|
|
return qryset
|
|
|
|
|
|
def biometric_paginator_qry(data_list, page_number, per_page=25):
|
|
"""
|
|
This function is used to paginate a list of dictionaries.
|
|
"""
|
|
start_index = (page_number - 1) * per_page
|
|
end_index = page_number * per_page
|
|
paginated_data = {}
|
|
paginated_data["users"] = data_list[start_index:end_index]
|
|
|
|
total_items = len(data_list)
|
|
total_pages = (total_items + per_page - 1) // per_page
|
|
has_previous = page_number > 1
|
|
has_next = page_number < total_pages
|
|
|
|
paginated_data["paginator"] = {
|
|
"number": page_number,
|
|
"previous_page_number": page_number - 1 if has_previous else None,
|
|
"next_page_number": page_number + 1 if has_next else None,
|
|
"num_pages": total_pages,
|
|
"has_previous": has_previous,
|
|
"has_next": has_next,
|
|
}
|
|
return paginated_data
|
|
|
|
|
|
def biometric_set_time(conn):
|
|
"""
|
|
Sets the time on the biometric device using the provided connection.
|
|
|
|
Parameters:
|
|
- conn: The connection to the biometric device.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
new_time = datetime.today()
|
|
conn.set_time(new_time)
|
|
|
|
|
|
class ZKBioAttendance(Thread):
|
|
"""
|
|
Represents a thread for capturing live attendance data from a ZKTeco biometric device.
|
|
|
|
Attributes:
|
|
- machine_ip: The IP address of the ZKTeco biometric device.
|
|
- port_no: The port number for communication with the ZKTeco biometric device.
|
|
- conn: The connection object to the ZKTeco biometric device.
|
|
- _stop_event: Event flag to signal thread termination.
|
|
|
|
Methods:
|
|
- run(): Overrides the run method of the Thread class to capture live attendance data.
|
|
- stop(): Sets the _stop_event to signal the thread to stop gracefully.
|
|
"""
|
|
|
|
def __init__(self, machine_ip, port_no, password):
|
|
super().__init__()
|
|
self.machine_ip = machine_ip
|
|
self.port_no = port_no
|
|
self.password = int(password)
|
|
self._stop_event = Event() # Initialize stop event
|
|
self.conn = None
|
|
|
|
def run(self):
|
|
try:
|
|
zk_device = ZK(
|
|
self.machine_ip,
|
|
port=self.port_no,
|
|
timeout=5,
|
|
password=self.password,
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
patch_direction = {"in": 0, "out": 1}
|
|
conn = zk_device.connect()
|
|
self.conn = conn
|
|
if conn:
|
|
device = BiometricDevices.objects.filter(
|
|
machine_ip=self.machine_ip, port=self.port_no
|
|
).first()
|
|
if device and device.is_live:
|
|
while not self._stop_event.is_set():
|
|
attendances = conn.live_capture()
|
|
for attendance in attendances:
|
|
if attendance:
|
|
user_id = attendance.user_id
|
|
punch_code = (
|
|
patch_direction[device.device_direction]
|
|
if device.device_direction in patch_direction
|
|
else attendance.punch
|
|
)
|
|
date_time = django_timezone.make_aware(
|
|
attendance.timestamp
|
|
)
|
|
# date_time = attendance.timestamp
|
|
date = date_time.date()
|
|
time = date_time.time()
|
|
device.last_fetch_date = date
|
|
device.last_fetch_time = time
|
|
device.save()
|
|
bio_id = BiometricEmployees.objects.filter(
|
|
user_id=user_id, device_id=device
|
|
).first()
|
|
if bio_id:
|
|
if punch_code in {0, 3, 4}:
|
|
try:
|
|
clock_in(
|
|
Request(
|
|
user=bio_id.employee_id.employee_user_id,
|
|
date=date,
|
|
time=time,
|
|
datetime=date_time,
|
|
)
|
|
)
|
|
except Exception as error:
|
|
logger.error(
|
|
"Got an error in clock_in %s", error
|
|
)
|
|
|
|
continue
|
|
else:
|
|
try:
|
|
clock_out(
|
|
Request(
|
|
user=bio_id.employee_id.employee_user_id,
|
|
date=date,
|
|
time=time,
|
|
datetime=date_time,
|
|
)
|
|
)
|
|
except Exception as error:
|
|
logger.error(
|
|
"Got an error in clock_out", error
|
|
)
|
|
continue
|
|
else:
|
|
continue
|
|
except ConnectionResetError as error:
|
|
ZKBioAttendance(self.machine_ip, self.port_no, self.password).start()
|
|
|
|
def stop(self):
|
|
"""To stop the ZK live capture mode"""
|
|
self.conn.end_live_capture = True
|
|
|
|
|
|
class COSECBioAttendanceThread(Thread):
|
|
"""
|
|
A thread class that handles the real-time retrieval and processing of
|
|
biometric attendance data from a COSEC biometric device.
|
|
|
|
Attributes:
|
|
device_id (int): The ID of the biometric device to interact with.
|
|
_stop_event (threading.Event): An event to signal when to stop the thread.
|
|
|
|
Methods:
|
|
run():
|
|
Continuously fetches attendance data from the COSEC device, processes
|
|
it, and updates the last fetched sequence and rollover count.
|
|
|
|
stop():
|
|
Signals the thread to stop by setting the _stop_event.
|
|
"""
|
|
|
|
def __init__(self, device_id):
|
|
super().__init__()
|
|
self.device_id = device_id
|
|
self._stop_event = Event()
|
|
|
|
def run(self):
|
|
try:
|
|
device = BiometricDevices.objects.get(id=self.device_id)
|
|
if not device.is_live:
|
|
return
|
|
|
|
device_args = COSECAttendanceArguments.objects.filter(
|
|
device_id=device
|
|
).first()
|
|
last_fetch_roll_ovr_count = (
|
|
int(device_args.last_fetch_roll_ovr_count) if device_args else 0
|
|
)
|
|
last_fetch_seq_number = (
|
|
int(device_args.last_fetch_seq_number) if device_args else 1
|
|
)
|
|
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
timeout=10,
|
|
)
|
|
while not self._stop_event.is_set():
|
|
attendances = cosec.get_attendance_events(
|
|
last_fetch_roll_ovr_count, int(last_fetch_seq_number) + 1
|
|
)
|
|
if not isinstance(attendances, list):
|
|
self._stop_event.wait(5)
|
|
continue
|
|
|
|
for attendance in attendances:
|
|
ref_user_id = attendance["detail-1"]
|
|
employee = BiometricEmployees.objects.filter(
|
|
ref_user_id=ref_user_id
|
|
).first()
|
|
if not employee:
|
|
continue
|
|
|
|
date_str = attendance["date"]
|
|
time_str = attendance["time"]
|
|
attendance_date = datetime.strptime(date_str, "%d/%m/%Y").date()
|
|
attendance_time = datetime.strptime(time_str, "%H:%M:%S").time()
|
|
attendance_datetime = datetime.combine(
|
|
attendance_date, attendance_time
|
|
)
|
|
punch_code = attendance["detail-2"]
|
|
|
|
request_data = Request(
|
|
user=employee.employee_id.employee_user_id,
|
|
date=attendance_date,
|
|
time=attendance_time,
|
|
datetime=django_timezone.make_aware(attendance_datetime),
|
|
)
|
|
try:
|
|
if punch_code in ["1", "3", "5", "7", "9", "0"]:
|
|
clock_in(request_data)
|
|
elif punch_code in ["2", "4", "6", "8", "10"]:
|
|
clock_out(request_data)
|
|
except Exception as error:
|
|
logger.error("Error processing attendance: ", error)
|
|
|
|
if attendances:
|
|
last_attendance = attendances[-1]
|
|
last_fetch_seq_number = last_attendance["seq-No"]
|
|
last_fetch_roll_ovr_count = last_attendance["roll-over-count"]
|
|
COSECAttendanceArguments.objects.update_or_create(
|
|
device_id=device,
|
|
defaults={
|
|
"last_fetch_roll_ovr_count": last_fetch_roll_ovr_count,
|
|
"last_fetch_seq_number": last_fetch_seq_number,
|
|
},
|
|
)
|
|
# Sleep to prevent overwhelming the device with requests
|
|
self._stop_event.wait(2)
|
|
|
|
except Exception as error:
|
|
device = BiometricDevices.objects.get(id=self.device_id)
|
|
device.is_live = False
|
|
device.save()
|
|
logger.error("Error in COSECBioAttendanceThread: ", error)
|
|
|
|
def stop(self):
|
|
"""Set the stop event to signal the thread to stop gracefully."""
|
|
self._stop_event.set()
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.view_biometricdevices")
|
|
def biometric_devices_view(request):
|
|
"""
|
|
Renders and filters the list of biometric devices based on query parameters.
|
|
|
|
Handles both initial page load and HTMX-based filter/search requests.
|
|
|
|
Template:
|
|
- "biometric/view_biometric_devices.html"
|
|
|
|
Context:
|
|
- biometric_form (BiometricDeviceForm): Form for adding new biometric devices.
|
|
- devices (QuerySet): Filtered and paginated queryset of biometric devices.
|
|
- f (BiometricDeviceFilter): Filter form.
|
|
- pd (str): URL-encoded query params for HTMX push.
|
|
- filter_dict (dict): Parsed filter query params.
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
is_active = request.GET.get("is_active")
|
|
|
|
# Apply filters
|
|
filter_form = BiometricDeviceFilter(request.GET)
|
|
biometric_devices = filter_form.qs.order_by("-created_at")
|
|
|
|
# Default to is_active=True if not specified or "unknown"
|
|
if not is_active or is_active == "unknown":
|
|
biometric_devices = biometric_devices.filter(is_active=True)
|
|
|
|
# Paginate
|
|
biometric_devices = paginator_qry(biometric_devices, request.GET.get("page"))
|
|
|
|
# Parse filters for reuse
|
|
data_dict = parse_qs(previous_data)
|
|
get_key_instances(BiometricDevices, data_dict)
|
|
|
|
# Render
|
|
return render(
|
|
request,
|
|
"biometric/view_biometric_devices.html",
|
|
{
|
|
"biometric_form": BiometricDeviceForm(),
|
|
"devices": biometric_devices,
|
|
"f": filter_form,
|
|
"pd": previous_data,
|
|
"filter_dict": data_dict,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.change_biometricdevices")
|
|
def biometric_device_schedule(request, device_id):
|
|
"""
|
|
Handles scheduling of attendance capture from a biometric device.
|
|
|
|
Parameters:
|
|
- request (HttpRequest): The HTTP request object.
|
|
- device_id (uuid): The ID of the biometric device for which scheduling is being done.
|
|
|
|
Returns:
|
|
- HttpResponse: HTML response indicating success or failure of the scheduling operation.
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
initial_data = {"scheduler_duration": device.scheduler_duration}
|
|
scheduler_form = BiometricDeviceSchedulerForm(initial=initial_data)
|
|
context = {
|
|
"scheduler_form": scheduler_form,
|
|
"device_id": device_id,
|
|
}
|
|
if request.method == "POST":
|
|
scheduler_form = BiometricDeviceSchedulerForm(request.POST)
|
|
if scheduler_form.is_valid():
|
|
if device.machine_type == "zk":
|
|
try:
|
|
port_no = device.port
|
|
machine_ip = device.machine_ip
|
|
password = device.zk_password
|
|
conn = None
|
|
zk_device = ZK(
|
|
machine_ip,
|
|
port=port_no,
|
|
timeout=5,
|
|
password=int(password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
conn.test_voice(index=0)
|
|
duration = request.POST.get("scheduler_duration")
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
device.scheduler_duration = duration
|
|
device.is_scheduler = True
|
|
device.is_live = False
|
|
device.save()
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: zk_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
except Exception as error:
|
|
logger.error("An error comes in biometric_device_schedule ", error)
|
|
script = """
|
|
<script>
|
|
Swal.fire({
|
|
title : "Schedule Attendance unsuccessful",
|
|
text: "Please double-check the accuracy of the provided IP Address and Port Number for correctness",
|
|
icon: "warning",
|
|
showConfirmButton: false,
|
|
timer: 3500,
|
|
timerProgressBar: true,
|
|
didClose: () => {
|
|
location.reload();
|
|
},
|
|
});
|
|
</script>
|
|
"""
|
|
return HttpResponse(script)
|
|
elif device.machine_type == "anviz":
|
|
duration = request.POST.get("scheduler_duration")
|
|
device.is_scheduler = True
|
|
device.scheduler_duration = duration
|
|
device.save()
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: anviz_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
elif device.machine_type == "dahua":
|
|
duration = request.POST.get("scheduler_duration")
|
|
device.is_scheduler = True
|
|
device.is_live = False
|
|
device.scheduler_duration = duration
|
|
device.save()
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: dahua_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
elif device.machine_type == "cosec":
|
|
duration = request.POST.get("scheduler_duration")
|
|
device.is_scheduler = True
|
|
device.is_live = False
|
|
device.scheduler_duration = duration
|
|
device.save()
|
|
scheduler = BackgroundScheduler()
|
|
existing_thread = BIO_DEVICE_THREADS.get(device.id)
|
|
if existing_thread:
|
|
existing_thread.stop()
|
|
del BIO_DEVICE_THREADS[device.id]
|
|
scheduler.add_job(
|
|
lambda: cosec_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
elif device.machine_type == "etimeoffice":
|
|
duration = request.POST.get("scheduler_duration")
|
|
device.is_scheduler = True
|
|
device.is_live = False
|
|
device.scheduler_duration = duration
|
|
device.save()
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: etimeoffice_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
else:
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
|
|
context["scheduler_form"] = scheduler_form
|
|
response = render(request, "biometric/scheduler_device_form.html", context)
|
|
return HttpResponse(
|
|
response.content.decode("utf-8")
|
|
+ "<script>$('#BiometricDeviceTestModal').removeClass('oh-modal--show');\
|
|
$('#BiometricDeviceModal').toggleClass('oh-modal--show');</script>"
|
|
)
|
|
return render(request, "biometric/scheduler_device_form.html", context)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.change_biometricdevices")
|
|
def biometric_device_unschedule(request, device_id):
|
|
"""
|
|
Handles unschedule of attendance capture for a biometric device.
|
|
|
|
Parameters:
|
|
- request (HttpRequest): The HTTP request object.
|
|
- device_id (uuid): The ID of the biometric device for which unscheduling is being done.
|
|
|
|
Returns:
|
|
- HttpResponseRedirect: Redirects to the biometric devices view after unscheduling.
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
device.is_scheduler = False
|
|
device.save()
|
|
messages.success(request, _("Biometric device unscheduled successfully"))
|
|
return redirect(f"/biometric/view-biometric-devices/?{previous_data}")
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.add_biometricdevices")
|
|
def biometric_device_add(request):
|
|
"""
|
|
Handles the addition of a new biometric device.
|
|
|
|
Parameters:
|
|
- request (HttpRequest): The HTTP request object containing data about the request.
|
|
|
|
Returns:
|
|
- HttpResponse: Renders the 'add_biometric_device.html' template with the biometric device form.
|
|
"""
|
|
previous_data = unquote(request.GET.urlencode())
|
|
previous_data = (
|
|
previous_data[3:] if previous_data.startswith("pd=") else previous_data
|
|
)
|
|
biometric_form = BiometricDeviceForm()
|
|
if request.method == "POST":
|
|
biometric_form = BiometricDeviceForm(request.POST)
|
|
if biometric_form.is_valid():
|
|
biometric_form.save()
|
|
messages.success(request, _("Biometric device added successfully."))
|
|
biometric_form = BiometricDeviceForm()
|
|
context = {"biometric_form": biometric_form, "pd": previous_data}
|
|
return render(request, "biometric/biometric_device_form.html", context)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.change_biometricdevices")
|
|
def biometric_device_edit(request, device_id):
|
|
"""
|
|
Handles the editing of an existing biometric device.
|
|
|
|
Parameters:
|
|
- request (HttpRequest): The HTTP request object containing data about the request.
|
|
- device_id (uuid): The ID of the biometric device to be edited.
|
|
|
|
Returns:
|
|
- HttpResponse: Renders the 'edit_biometric_device.html' template with the biometric
|
|
device form pre-filled with existing data.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if not device:
|
|
messages.error(request, _("Biometric device not found."))
|
|
return render(request, "biometric/biometric_device_form.html")
|
|
biometric_form = BiometricDeviceForm(instance=device)
|
|
if request.method == "POST":
|
|
biometric_form = BiometricDeviceForm(request.POST, instance=device)
|
|
if biometric_form.is_valid():
|
|
biometric_form.save()
|
|
messages.success(request, _("Biometric device updated successfully."))
|
|
context = {
|
|
"biometric_form": biometric_form,
|
|
"device_id": device_id,
|
|
}
|
|
return render(request, "biometric/biometric_device_form.html", context)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.change_biometricdevices")
|
|
def biometric_device_archive(request, device_id):
|
|
"""
|
|
This method is used to archive or un-archive devices
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
device_obj = BiometricDevices.find(device_id)
|
|
if not device_obj:
|
|
messages.error(request, _("Biometric device not found."))
|
|
return redirect(f"/biometric/view-biometric-devices/?{previous_data}")
|
|
device_obj.is_active = not device_obj.is_active
|
|
device_obj.save()
|
|
message = _("archived") if not device_obj.is_active else _("un-archived")
|
|
messages.success(request, _("Device is %(message)s") % {"message": message})
|
|
return redirect(f"/biometric/view-biometric-devices/?{previous_data}")
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.delete_biometricdevices")
|
|
def biometric_device_delete(request, device_id):
|
|
"""
|
|
Handles the deletion of a biometric device.
|
|
|
|
Parameters:
|
|
- request (HttpRequest): The HTTP request object containing data about the request.
|
|
- device_id (uuid): The ID of the biometric device to be deleted.
|
|
|
|
Returns:
|
|
- HttpResponseRedirect: Redirects to the 'view-biometric-devices' page after deleting the
|
|
biometric device.
|
|
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
device_obj = BiometricDevices.find(device_id)
|
|
if not device_obj:
|
|
messages.error(request, _("Biometric device not found."))
|
|
return redirect(f"/biometric/view-biometric-devices/?{previous_data}")
|
|
device_obj.delete()
|
|
messages.success(request, _("Biometric device deleted successfully."))
|
|
return redirect(f"/biometric/view-biometric-devices/?{previous_data}")
|
|
|
|
|
|
def render_connection_response(title, text, icon):
|
|
"""
|
|
Helper function to render the connection
|
|
response from device test connection.
|
|
"""
|
|
context = {
|
|
"title": title,
|
|
"text": text,
|
|
"icon": icon,
|
|
}
|
|
return render_to_string("biometric/test_connection_script.html", context)
|
|
|
|
|
|
def test_zkteco_connection(device):
|
|
"""Test connection for ZKTeco device."""
|
|
conn = None
|
|
port_no = device.port
|
|
machine_ip = device.machine_ip
|
|
password = device.zk_password
|
|
zk_device = ZK(
|
|
machine_ip,
|
|
port=port_no,
|
|
timeout=60,
|
|
password=int(password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
try:
|
|
conn = zk_device.connect()
|
|
conn.test_voice(index=0)
|
|
find_employees_in_zk(device.id)
|
|
return render_connection_response(
|
|
_("Connection Successful"),
|
|
_("ZKTeco test connection successful."),
|
|
"success",
|
|
)
|
|
except zk_exception.ZKErrorResponse:
|
|
return render_connection_response(
|
|
_("Authentication Error"),
|
|
_("Double-check the provided IP, Port, and Password."),
|
|
"warning",
|
|
)
|
|
except Exception:
|
|
logger.error("ZKTeco connection error", exc_info=True)
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Please check the IP, Port, and Password."),
|
|
"warning",
|
|
)
|
|
finally:
|
|
if conn is not None:
|
|
conn.disconnect()
|
|
|
|
|
|
def test_anviz_connection(device):
|
|
"""Test connection for Anviz device."""
|
|
|
|
try:
|
|
from .anviz import CrossChexCloudAPI
|
|
|
|
anviz = CrossChexCloudAPI(
|
|
api_url=device.api_url,
|
|
api_key=device.api_key,
|
|
api_secret=device.api_secret,
|
|
anviz_request_id=device.anviz_request_id,
|
|
)
|
|
test_response = anviz.test_connection()
|
|
|
|
if test_response.get("token"):
|
|
device.api_token = test_response.get("token")
|
|
device.api_expires = test_response.get("expires")
|
|
device.save()
|
|
records = anviz.get_attendance_records()
|
|
|
|
return render_connection_response(
|
|
_("Connection Successful"),
|
|
_("Anviz test connection successful."),
|
|
"success",
|
|
)
|
|
else:
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("API credentials might be incorrect."),
|
|
"warning",
|
|
)
|
|
except Exception as error:
|
|
logger.error("Anviz connection error", exc_info=True)
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"), _("API request failed."), "warning"
|
|
)
|
|
|
|
|
|
def test_cosec_connection(device):
|
|
"""Test connection for COSEC device."""
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
timeout=10,
|
|
)
|
|
response = cosec.basic_config()
|
|
if response.get("app"):
|
|
find_employees_in_cosec(device.id)
|
|
return render_connection_response(
|
|
_("Connection Successful"),
|
|
_("Matrix test connection successful."),
|
|
"success",
|
|
)
|
|
else:
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided Machine IP, Username, and Password."),
|
|
"warning",
|
|
)
|
|
|
|
|
|
def test_dahua_connection(device):
|
|
"""Test connection for Dahua device."""
|
|
dahua = DahuaAPI(
|
|
ip=device.machine_ip,
|
|
username=device.bio_username,
|
|
password=device.bio_password,
|
|
)
|
|
response = dahua.get_system_info()
|
|
if response.get("status_code") == 200:
|
|
return render_connection_response(
|
|
_("Connection Successful"),
|
|
_("Dahua test connection successful."),
|
|
"success",
|
|
)
|
|
else:
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided Machine IP, Username, and Password."),
|
|
"warning",
|
|
)
|
|
|
|
|
|
def test_etimeoffice_connection(device):
|
|
"""Test connection for e-TimeOffice device."""
|
|
now = datetime.now()
|
|
etimeoffice = ETimeOfficeAPI(
|
|
username=device.bio_username,
|
|
password=device.bio_password,
|
|
)
|
|
|
|
from_date = f"{now.day:02d}/{now.month:02d}/{now.year}_00:00"
|
|
to_date = (
|
|
f"{now.day:02d}/{now.month:02d}/{now.year}_{now.hour:02d}:{now.minute:02d}"
|
|
)
|
|
|
|
try:
|
|
response = etimeoffice.download_punch_data(from_date=from_date, to_date=to_date)
|
|
if response.get("Msg") == "Success":
|
|
return render_connection_response(
|
|
_("Connection Successful"),
|
|
_("e-Time Office test connection successful."),
|
|
"success",
|
|
)
|
|
|
|
error_msg = response.get("Message")
|
|
return render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided API Url, Username, and Password: {}").format(
|
|
error_msg
|
|
),
|
|
"warning",
|
|
)
|
|
|
|
except Exception as e:
|
|
return render_connection_response(
|
|
_("Connection error"),
|
|
_(f"API request failed with exception: {str(e)}"),
|
|
"danger",
|
|
)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.view_biometricdevices")
|
|
def biometric_device_test(request, device_id):
|
|
"""
|
|
Test the connection with the specified biometric device.
|
|
"""
|
|
|
|
# Retrieve device and validate
|
|
device = BiometricDevices.objects.filter(id=device_id).first()
|
|
if not device:
|
|
return HttpResponse("Device not found.", status=404)
|
|
|
|
script = ""
|
|
try:
|
|
if device.machine_type == "zk":
|
|
script = test_zkteco_connection(device)
|
|
elif device.machine_type == "anviz":
|
|
script = test_anviz_connection(device)
|
|
elif device.machine_type == "cosec":
|
|
script = test_cosec_connection(device)
|
|
elif device.machine_type == "dahua":
|
|
script = test_dahua_connection(device)
|
|
elif device.machine_type == "etimeoffice":
|
|
script = test_etimeoffice_connection(device)
|
|
else:
|
|
script = render_connection_response(
|
|
"Connection unsuccessful",
|
|
"Please select a valid biometric device.",
|
|
"warning",
|
|
)
|
|
except Exception as error:
|
|
logger.error("Error in biometric_device_test", exc_info=True)
|
|
script = render_connection_response(
|
|
"Connection unsuccessful", "An unexpected error occurred.", "warning"
|
|
)
|
|
|
|
return HttpResponse(script)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.view_biometricdevices")
|
|
def biometric_device_bulk_fetch_logs(request):
|
|
script = ""
|
|
zk_ids = request.GET.getlist("selected_device_ids")
|
|
zk_devices = BiometricDevices.objects.filter(id__in=zk_ids, machine_type="zk")
|
|
|
|
if not zk_devices:
|
|
messages.error(request, _(""))
|
|
script = render_connection_response(
|
|
_("Biometric device not supported."),
|
|
_(
|
|
"Bulk log fetching is currently available only for ZKTeco / eSSL devices. Support for other biometric systems will be added soon."
|
|
),
|
|
"warning",
|
|
)
|
|
return HttpResponse(script)
|
|
|
|
attendance_count, error_message = zk_biometric_attendance_logs(zk_devices)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
elif "Authentication" in error_message:
|
|
script = render_connection_response(
|
|
_("Authentication Error"),
|
|
_("Double-check the provided IP, Port, and Password."),
|
|
"warning",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection Unsuccessful"),
|
|
_(f"Please check the IP, Port, and Password. Error: {error_message}"),
|
|
"warning",
|
|
)
|
|
return HttpResponse(script)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.view_biometricdevices")
|
|
def biometric_device_fetch_logs(request, device_id=None):
|
|
"""
|
|
Fetches biometric attendance logs from a specified device.
|
|
|
|
This view function connects to a biometric device based on the device type (zk, anviz, cosec, dahua, or etimeoffice)
|
|
and retrieves the attendance logs.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if not device:
|
|
return HttpResponse("Device not found.", status=404)
|
|
script = ""
|
|
if device.machine_type == "zk":
|
|
attendance_count, error_message = zk_biometric_attendance_logs(device)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
elif "Authentication" in error_message:
|
|
script = render_connection_response(
|
|
_("Authentication Error"),
|
|
_("Double-check the provided IP, Port, and Password."),
|
|
"warning",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection Unsuccessful"),
|
|
_(f"Please check the IP, Port, and Password. Error: {error_message}"),
|
|
"warning",
|
|
)
|
|
elif device.machine_type == "anviz":
|
|
attendance_count = anviz_biometric_attendance_logs(device)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("API credentials might be incorrect."),
|
|
"warning",
|
|
)
|
|
|
|
elif device.machine_type == "cosec":
|
|
attendance_count = cosec_biometric_attendance_logs(device)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided Machine IP, Username, and Password."),
|
|
"warning",
|
|
)
|
|
elif device.machine_type == "dahua":
|
|
attendance_count = dahua_biometric_attendance_logs(device)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided Machine IP, Username, and Password."),
|
|
"warning",
|
|
)
|
|
elif device.machine_type == "etimeoffice":
|
|
attendance_count = etimeoffice_biometric_attendance_logs(device)
|
|
if isinstance(attendance_count, int):
|
|
script = render_connection_response(
|
|
_("Logs Fetched Successfully"),
|
|
_(
|
|
f"Biometric attendance logs fetched successfully. Total records: {attendance_count}"
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
_("Connection unsuccessful"),
|
|
_("Double-check the provided API Url, Username, and Password"),
|
|
"warning",
|
|
)
|
|
else:
|
|
script = render_connection_response(
|
|
"Connection unsuccessful",
|
|
"Please select a valid biometric device.",
|
|
"warning",
|
|
)
|
|
|
|
return HttpResponse(script)
|
|
|
|
|
|
def zk_employees_fetch(device):
|
|
"""
|
|
Fetch employee data from the specified ZK biometric device.
|
|
|
|
Parameters:
|
|
- device: Biometric device object containing machine IP, port, etc.
|
|
|
|
Returns:
|
|
- list: A list of dictionaries, where each dictionary represents an employee
|
|
with associated data including user ID, employee ID, work email,
|
|
phone number, job position, badge ID, and fingerprint data.
|
|
"""
|
|
zk_device = ZK(
|
|
device.machine_ip,
|
|
port=device.port,
|
|
timeout=1,
|
|
password=int(device.zk_password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
conn.enable_device()
|
|
users = conn.get_users()
|
|
fingers = conn.get_templates()
|
|
|
|
bio_employees = BiometricEmployees.objects.filter(device_id=device)
|
|
bio_lookup = {bio.user_id: bio for bio in bio_employees}
|
|
|
|
employees = []
|
|
for user in users:
|
|
user_id = user.user_id
|
|
uid = user.uid
|
|
bio_id = bio_lookup.get(user_id)
|
|
|
|
if bio_id:
|
|
employee = bio_id.employee_id
|
|
employee_work_info = EmployeeWorkInformation.objects.filter(
|
|
employee_id=employee
|
|
).first()
|
|
if employee_work_info:
|
|
work_email = (
|
|
employee_work_info.email if employee_work_info.email else None
|
|
)
|
|
phone = employee_work_info.mobile if employee_work_info.mobile else None
|
|
job_position = (
|
|
employee_work_info.job_position_id
|
|
if employee_work_info.job_position_id
|
|
else None
|
|
)
|
|
user.__dict__["work_email"] = work_email
|
|
user.__dict__["phone"] = phone
|
|
user.__dict__["job_position"] = job_position
|
|
else:
|
|
user.__dict__["work_email"] = None
|
|
user.__dict__["phone"] = None
|
|
user.__dict__["job_position"] = None
|
|
user.__dict__["employee"] = employee
|
|
user.__dict__["badge_id"] = employee.badge_id
|
|
finger_print = []
|
|
for finger in fingers:
|
|
if finger.uid == uid:
|
|
finger_print.append(finger.fid)
|
|
if not finger_print:
|
|
finger_print = []
|
|
user.__dict__["finger"] = finger_print
|
|
employees.append(user)
|
|
return employees
|
|
|
|
|
|
def cosec_employee_fetch(device_id):
|
|
"""
|
|
Fetch employee data from the COSEC biometric device associated with the specified device ID.
|
|
|
|
Parameters:
|
|
- device_id: ID of the biometric device.
|
|
|
|
Returns:
|
|
- list: A list of dictionaries, where each dictionary represents an employee with associated
|
|
data including user ID, employee ID, finger count, and card count.
|
|
"""
|
|
users = []
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
employees = BiometricEmployees.objects.filter(device_id=device)
|
|
cosec = COSECBiometric(
|
|
device.machine_ip, device.port, device.bio_username, device.bio_password
|
|
)
|
|
for employee in employees:
|
|
user = cosec.get_cosec_user(user_id=employee.user_id)
|
|
if user.get("user-id"):
|
|
user["employee_id"] = employee.employee_id
|
|
user_credential = cosec.get_user_credential_count(user_id=employee.user_id)
|
|
user["finger-count"] = user_credential.get("finger-count")
|
|
user["face-count"] = user_credential.get("face-count")
|
|
user["card-count"] = user_credential.get("card-count")
|
|
new_dict = {}
|
|
for key, value in user.items():
|
|
new_key = key.replace("-", "_")
|
|
new_dict[new_key] = value
|
|
users.append(new_dict)
|
|
else:
|
|
BiometricEmployees.objects.get(id=employee.id).delete()
|
|
return users
|
|
|
|
|
|
def find_employees_in_cosec(device_id):
|
|
"""
|
|
Synchronize active employees with a COSEC biometric device.
|
|
|
|
This function retrieves a list of active employees from the database,
|
|
checks their presence on a specified COSEC biometric device, and updates
|
|
the database with employees who are registered on the COSEC device.
|
|
|
|
Args:
|
|
device_id (uuid): The ID of the biometric device to synchronize with.
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
employees = Employee.objects.filter(is_active=True).values_list("id", "badge_id")
|
|
cosec = COSECBiometric(
|
|
device.machine_ip, device.port, device.bio_username, device.bio_password
|
|
)
|
|
existing_user_ids = BiometricEmployees.objects.filter(device_id=device).values_list(
|
|
"user_id", flat=True
|
|
)
|
|
biometric_employees_to_create = []
|
|
for employee_id, badge_id in employees:
|
|
if badge_id and badge_id.isalnum() and len(badge_id) <= 15:
|
|
user = cosec.get_cosec_user(user_id=badge_id)
|
|
if user.get("user-id") and user.get("user-id") not in existing_user_ids:
|
|
biometric_employees_to_create.append(
|
|
BiometricEmployees(
|
|
ref_user_id=user.get("ref-user-id"),
|
|
user_id=user.get("user-id"),
|
|
employee_id_id=employee_id,
|
|
device_id_id=device_id,
|
|
)
|
|
)
|
|
BiometricEmployees.objects.bulk_create(biometric_employees_to_create)
|
|
|
|
|
|
def find_employees_in_zk(device_id):
|
|
"""
|
|
Synchronize active employees with a COSEC biometric device.
|
|
|
|
This function retrieves a list of active employees from the database,
|
|
checks their presence on a specified COSEC biometric device, and updates
|
|
the database with employees who are registered on the COSEC device.
|
|
|
|
Args:
|
|
device_id (uuid): The ID of the biometric device to synchronize with.
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
employees = Employee.objects.filter(is_active=True).values_list("id", "badge_id")
|
|
existing_user_ids = set(
|
|
BiometricEmployees.objects.filter(device_id=device_id).values_list(
|
|
"user_id", flat=True
|
|
)
|
|
)
|
|
zk_device = ZK(
|
|
device.machine_ip, port=device.port, password=int(device.zk_password), timeout=5
|
|
)
|
|
conn = zk_device.connect()
|
|
zk_users = {user.user_id: user.uid for user in conn.get_users()}
|
|
biometric_employees_to_create = [
|
|
BiometricEmployees(
|
|
uid=zk_users[badge_id],
|
|
user_id=badge_id,
|
|
employee_id_id=employee_id,
|
|
device_id_id=device_id,
|
|
)
|
|
for employee_id, badge_id in employees
|
|
if badge_id and badge_id in zk_users and badge_id not in existing_user_ids
|
|
]
|
|
BiometricEmployees.objects.bulk_create(biometric_employees_to_create)
|
|
conn.disconnect()
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.view_biometricemployees")
|
|
def biometric_device_employees(request, device_id, **kwargs):
|
|
"""
|
|
View function to display employees associated with a biometric device.
|
|
|
|
Depending on the machine type of the biometric device (either "zk" or "cosec"),
|
|
this function fetches the relevant employees and renders the appropriate template.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
device_id (uuid): The ID of the biometric device.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
HttpResponse: The rendered template response or a redirect to `biometric_devices_view`
|
|
in case of an error.
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
device = BiometricDevices.find(device_id)
|
|
if device:
|
|
try:
|
|
if device.machine_type == "zk":
|
|
employee_add_form = EmployeeBiometricAddForm()
|
|
employees = zk_employees_fetch(device)
|
|
employees = paginator_qry(employees, request.GET.get("page"))
|
|
context = {
|
|
"employees": employees,
|
|
"device_id": device_id,
|
|
"form": employee_add_form,
|
|
"pd": previous_data,
|
|
}
|
|
return render(
|
|
request, "biometric/view_employees_biometric.html", context
|
|
)
|
|
if device.machine_type == "cosec":
|
|
employee_add_form = CosecUserAddForm()
|
|
employees = cosec_employee_fetch(device_id)
|
|
employees = biometric_paginator_qry(
|
|
employees, int(request.GET.get("page", 1))
|
|
)
|
|
context = {
|
|
"employees": employees,
|
|
"device_id": device.id,
|
|
"form": employee_add_form,
|
|
"pd": previous_data,
|
|
}
|
|
return render(request, "biometric/view_cosec_employees.html", context)
|
|
if device.machine_type == "dahua":
|
|
employees = BiometricEmployees.objects.filter(device_id=device_id)
|
|
context = {
|
|
"device_id": device.id,
|
|
"employees": employees,
|
|
}
|
|
return render(
|
|
request, "biometric_users/dahua/view_dahua_employees.html", context
|
|
)
|
|
if device.machine_type == "etimeoffice":
|
|
employees = BiometricEmployees.objects.filter(device_id=device_id)
|
|
context = {
|
|
"device_id": device.id,
|
|
"employees": employees,
|
|
}
|
|
return render(
|
|
request,
|
|
"biometric_users/etimeoffice/view_etimeoffice_employees.html",
|
|
context,
|
|
)
|
|
except Exception as error:
|
|
logger.error("An error occurred: ", error)
|
|
messages.info(
|
|
request,
|
|
_(
|
|
"Failed to establish a connection. Please verify the accuracy of the IP\
|
|
Address , Port No. and Password of the device."
|
|
),
|
|
)
|
|
else:
|
|
messages.error(request, _("Biometric device not found"))
|
|
return redirect(biometric_devices_view)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.view_biometricemployees")
|
|
def search_employee_device(request):
|
|
"""
|
|
View function to search for employees associated with a specific biometric device.
|
|
|
|
This function handles searching employees based on their first name and the type of
|
|
biometric device (either "zk" or "cosec"). It then renders the appropriate template
|
|
with the filtered employee list.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
|
|
Returns:
|
|
HttpResponse: The rendered template response with the context.
|
|
"""
|
|
previous_data = request.GET.urlencode()
|
|
device_id = request.GET.get("device")
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
search = request.GET.get("search")
|
|
if device.machine_type == "zk":
|
|
employees = zk_employees_fetch(device)
|
|
if search:
|
|
search_employees = BiometricEmployees.objects.filter(
|
|
employee_id__employee_first_name__icontains=search
|
|
)
|
|
search_uids = search_employees.values_list("uid", flat=True)
|
|
employees = [
|
|
employee for employee in employees if employee.uid in search_uids
|
|
]
|
|
employees = paginator_qry(employees, request.GET.get("page"))
|
|
template = "biometric/list_employees_biometric.html"
|
|
context = {
|
|
"employees": employees,
|
|
"device_id": device_id,
|
|
"pd": previous_data,
|
|
}
|
|
elif device.machine_type == "dahua" or device.machine_type == "etimeoffice":
|
|
search_employees = BiometricEmployees.objects.filter(device_id=device)
|
|
if search:
|
|
search_employees = BiometricEmployees.objects.filter(
|
|
employee_id__employee_first_name__icontains=search, device_id=device
|
|
)
|
|
template = (
|
|
"biometric_users/dahua/list_dahua_employees.html"
|
|
if device.machine_type == "dahua"
|
|
else "biometric_users/etimeoffice/list_etimeoffice_employees.html"
|
|
)
|
|
context = {
|
|
"device_id": device.id,
|
|
"employees": search_employees,
|
|
}
|
|
|
|
else:
|
|
employees = cosec_employee_fetch(device_id)
|
|
if search:
|
|
search_employees = BiometricEmployees.objects.filter(
|
|
employee_id__employee_first_name__icontains=search, device_id=device
|
|
)
|
|
else:
|
|
search_employees = BiometricEmployees.objects.filter(device_id=device)
|
|
queryset_user_ids = [employee.user_id for employee in search_employees]
|
|
filtered_employees = [
|
|
employee
|
|
for employee in employees
|
|
if employee["user_id"] in queryset_user_ids
|
|
]
|
|
filtered_employees = biometric_paginator_qry(
|
|
filtered_employees, int(request.GET.get("page", 1))
|
|
)
|
|
template = "biometric/list_employees_cosec_biometric.html"
|
|
context = {
|
|
"employees": filtered_employees,
|
|
"device_id": device_id,
|
|
"pd": previous_data,
|
|
}
|
|
return render(request, template, context)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.delete_biometricemployees")
|
|
def delete_biometric_user(request, uid, device_id):
|
|
"""
|
|
This function connects to the specified biometric device, deletes the user
|
|
identified by the given UID, and removes the corresponding entry from the
|
|
BiometricEmployees table in the local database
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
uid (str): The UID of the user to be deleted from the biometric device.
|
|
device_id (uuid): The ID of the biometric device.
|
|
|
|
Returns:
|
|
HttpResponse: A redirect response to the list of employees for the specified
|
|
biometric device.
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
zk_device = ZK(
|
|
device.machine_ip,
|
|
port=device.port,
|
|
timeout=5,
|
|
password=int(device.zk_password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
conn.delete_user(uid=uid)
|
|
employee_bio = BiometricEmployees.objects.filter(uid=uid).first()
|
|
employee_bio.delete()
|
|
messages.success(
|
|
request,
|
|
_("{} successfully removed from the biometric device.").format(
|
|
employee_bio.employee_id
|
|
),
|
|
)
|
|
redirect_url = f"/biometric/biometric-device-employees/{device_id}/"
|
|
return redirect(redirect_url)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.change_biometricemployees")
|
|
def enable_cosec_face_recognition(request, user_id, device_id):
|
|
"""
|
|
View function to enable face recognition for a user on a COSEC biometric device
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
user_id (str): The ID of the user for whom face recognition is to be enabled.
|
|
device_id (uuid): The ID of the COSEC biometric device.
|
|
|
|
Returns:
|
|
HttpResponse: A redirect response to the list of employees for the specified
|
|
biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device:
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
)
|
|
enable_fr = cosec.enable_user_face_recognition(user_id=user_id, enable_fr=True)
|
|
response_code = enable_fr.get("Response-Code")
|
|
if response_code == "0":
|
|
messages.success(request, _("Face recognition enabled successfully"))
|
|
else:
|
|
messages.error(request, _("Something went wrong when enabling face"))
|
|
else:
|
|
messages.error(request, _("Device not found"))
|
|
return redirect(f"/biometric/biometric-device-employees/{device_id}/")
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.change_biometricemployees")
|
|
def edit_cosec_user(request, user_id, device_id):
|
|
"""
|
|
View function to edit the details of a COSEC biometric user.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
user_id (str): The ID of the user to be edited.
|
|
device_id (uuid): The ID of the COSEC biometric device.
|
|
|
|
Returns:
|
|
HttpResponse: The rendered form template for GET requests, and a response with
|
|
a success message for valid POST requests. Reloads the page after
|
|
successful update.
|
|
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
)
|
|
user = cosec.get_cosec_user(user_id)
|
|
if user.get("name"):
|
|
year = int(user["validity-date-yyyy"])
|
|
month = int(user["validity-date-mm"])
|
|
day = int(user["validity-date-dd"])
|
|
date_object = datetime(year, month, day)
|
|
formatted_date = date_object.strftime("%Y-%m-%d")
|
|
initial_data = {
|
|
"name": user["name"],
|
|
"user_active": bool(int(user["user-active"])),
|
|
"vip": bool(int(user["vip"])),
|
|
"validity_enable": bool(int(user["validity-enable"])),
|
|
"validity_end_date": formatted_date,
|
|
}
|
|
|
|
if "by-pass-finger" in user:
|
|
initial_data["by_pass_finger"] = bool(int(user["by-pass-finger"]))
|
|
|
|
form = COSECUserForm(initial=initial_data)
|
|
|
|
if request.method == "POST":
|
|
form = COSECUserForm(request.POST)
|
|
if form.is_valid():
|
|
name = form.cleaned_data["name"]
|
|
user_active = form.cleaned_data["user_active"]
|
|
vip = form.cleaned_data["vip"]
|
|
validity_enable = form.cleaned_data["validity_enable"]
|
|
validity_end_date_str = str(form.cleaned_data["validity_end_date"])
|
|
validity_end_date = datetime.strptime(
|
|
validity_end_date_str, "%Y-%m-%d"
|
|
).date()
|
|
validity_year = validity_end_date.year
|
|
validity_month = validity_end_date.month
|
|
validity_day = validity_end_date.day
|
|
by_pass_finger = form.cleaned_data["by_pass_finger"]
|
|
update_user = cosec.set_cosec_user(
|
|
user_id=user["user-id"],
|
|
ref_user_id=user["ref-user-id"],
|
|
name=name,
|
|
user_active=int(user_active),
|
|
vip=int(vip),
|
|
by_pass_finger=int(by_pass_finger),
|
|
validity_enable=int(validity_enable),
|
|
validity_date_dd=validity_day,
|
|
validity_date_mm=validity_month,
|
|
validity_date_yyyy=validity_year,
|
|
)
|
|
if (
|
|
update_user.get("Response-Code")
|
|
and update_user.get("Response-Code") == "0"
|
|
):
|
|
messages.success(
|
|
request, _("Biometric user data updated successfully")
|
|
)
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
if update_user.get("error"):
|
|
error = update_user.get("error")
|
|
if "validity-date-yyyy" in error:
|
|
form.add_error(
|
|
None,
|
|
_(
|
|
"This date cannot be used as the Validity End Date for\
|
|
the COSEC Biometric."
|
|
),
|
|
)
|
|
return render(
|
|
request,
|
|
"biometric/edit_cosec_user.html",
|
|
context={"form": form, "user_id": user_id, "device_id": device_id},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.delete_biometricemployees")
|
|
def delete_horilla_cosec_user(request, user_id, device_id):
|
|
"""
|
|
View function to delete a user from a COSEC biometric device and database.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
user_id (str): The ID of the user to be deleted from the COSEC biometric device.
|
|
device_id (uuid): The ID of the COSEC biometric device.
|
|
|
|
Returns:
|
|
HttpResponse: A redirect response to the list of employees for the specified
|
|
biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device:
|
|
employee_bio = BiometricEmployees.objects.filter(
|
|
user_id=user_id, device_id=device
|
|
).first()
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
)
|
|
response = cosec.delete_cosec_user(user_id)
|
|
if response.get("Response-Code") and response.get("Response-Code") == "0":
|
|
employee_bio.delete()
|
|
messages.success(
|
|
request,
|
|
_("{} successfully removed from the biometric device.").format(
|
|
employee_bio.employee_id
|
|
),
|
|
)
|
|
else:
|
|
messages.error(request, _("Biometric user not found"))
|
|
else:
|
|
messages.error(request, _("Biometric device not found"))
|
|
redirect_url = (
|
|
f"/biometric/biometric-device-employees/{device_id}/"
|
|
if device
|
|
else "/biometric/view-biometric-devices/"
|
|
)
|
|
return redirect(redirect_url)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.delete_biometricemployees")
|
|
def bio_users_bulk_delete(request):
|
|
"""
|
|
View function to delete multiple users from a ZK biometric device and the local database.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
|
|
Returns:
|
|
JsonResponse: A JSON response indicating the success of the bulk delete operation.
|
|
|
|
"""
|
|
conn = None
|
|
json_ids = request.POST["ids"]
|
|
device_id = request.POST["deviceId"]
|
|
ids = json.loads(json_ids)
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
try:
|
|
zk_device = ZK(
|
|
device.machine_ip,
|
|
port=device.port,
|
|
timeout=5,
|
|
password=int(device.zk_password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
for user_id in ids:
|
|
user_id = int(user_id)
|
|
conn.delete_user(user_id=user_id)
|
|
employee_bio = BiometricEmployees.objects.filter(user_id=user_id).first()
|
|
employee_bio.delete()
|
|
conn.refresh_data()
|
|
messages.success(
|
|
request,
|
|
_("{} successfully removed from the biometric device.").format(
|
|
employee_bio.employee_id
|
|
),
|
|
)
|
|
except Exception as error:
|
|
logger.error("An error occurred: ", error)
|
|
return JsonResponse({"messages": "Success"})
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@permission_required("biometric.delete_biometricemployees")
|
|
def cosec_users_bulk_delete(request):
|
|
"""
|
|
View function to delete multiple users from a COSEC biometric device and database.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
|
|
Returns:
|
|
JsonResponse: A JSON response indicating the success of the bulk delete operation.
|
|
"""
|
|
json_ids = request.POST["ids"]
|
|
device_id = request.POST["deviceId"]
|
|
ids = json.loads(json_ids)
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
try:
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
)
|
|
for user_id in ids:
|
|
cosec.delete_cosec_user(user_id=user_id)
|
|
employee_bio = BiometricEmployees.objects.filter(
|
|
user_id=user_id, device_id=device
|
|
).first()
|
|
if employee_bio:
|
|
employee_bio.delete()
|
|
messages.success(
|
|
request,
|
|
f"{employee_bio.employee_id} "
|
|
+ _("successfully removed from the biometric device."),
|
|
)
|
|
|
|
except Exception as error:
|
|
logger.error("An error occurred: ", error)
|
|
return JsonResponse({"messages": "Success"})
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.add_biometricemployees")
|
|
def add_biometric_user(request, device_id):
|
|
"""
|
|
View function to add a new user to a biometric device.
|
|
|
|
This function adds a new user to the specified biometric device and stores their
|
|
information in the database.
|
|
|
|
Args:
|
|
request (HttpRequest): The HTTP request object.
|
|
device_id (uuid): The ID of the biometric device.
|
|
|
|
Returns:
|
|
HttpResponse: A JavaScript script to reload the current page after adding the user.
|
|
|
|
"""
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
employee_add_form = (
|
|
EmployeeBiometricAddForm()
|
|
if device.machine_type == "zk"
|
|
else CosecUserAddForm()
|
|
)
|
|
if request.method == "POST":
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
try:
|
|
if device.machine_type == "zk":
|
|
zk_device = ZK(
|
|
device.machine_ip,
|
|
port=device.port,
|
|
timeout=5,
|
|
password=int(device.zk_password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
conn.enable_device()
|
|
existing_uids = [user.uid for user in conn.get_users()]
|
|
existing_user_ids = [user.user_id for user in conn.get_users()]
|
|
uid = 1
|
|
user_id = 1000
|
|
employee_ids = request.POST.getlist("employee_ids")
|
|
for obj_id in employee_ids:
|
|
employee = Employee.objects.get(id=obj_id)
|
|
existing_biometric_employee = BiometricEmployees.objects.filter(
|
|
employee_id=employee, device_id=device
|
|
).first()
|
|
if existing_biometric_employee is None:
|
|
while uid in existing_uids or user_id in existing_user_ids:
|
|
user_id = int(user_id)
|
|
uid += 1
|
|
user_id += 1
|
|
existing_uids.append(uid)
|
|
existing_user_ids.append(user_id)
|
|
employee_name = employee.get_full_name()
|
|
conn.set_user(
|
|
uid=uid,
|
|
name=employee_name,
|
|
password="",
|
|
group_id="",
|
|
user_id=str(user_id),
|
|
card=0,
|
|
)
|
|
# The ZK Biometric user ID must be a character value
|
|
# that can be converted to an integer.
|
|
BiometricEmployees.objects.create(
|
|
uid=uid,
|
|
user_id=str(user_id),
|
|
employee_id=employee,
|
|
device_id=device,
|
|
)
|
|
messages.success(
|
|
request,
|
|
_("{} added to biometric device successfully").format(
|
|
employee
|
|
),
|
|
)
|
|
else:
|
|
messages.info(
|
|
request,
|
|
_("{} already added to biometric device").format(employee),
|
|
)
|
|
else:
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
)
|
|
basic = cosec.basic_config()
|
|
if basic.get("app"):
|
|
employee_ids = request.POST.getlist("employee_ids")
|
|
cosec_users = BiometricEmployees.objects.filter(device_id=device_id)
|
|
existing_ref_user_ids = list(
|
|
cosec_users.values_list("ref_user_id", flat=True)
|
|
)
|
|
for obj_id in employee_ids:
|
|
employee = Employee.objects.get(id=obj_id)
|
|
employee_name = employee.get_full_name()
|
|
user_id = employee.badge_id
|
|
ref_user_id = 100
|
|
while ref_user_id in existing_ref_user_ids:
|
|
ref_user_id += 1
|
|
existing_ref_user_ids.append(ref_user_id)
|
|
user = cosec.set_cosec_user(
|
|
user_id=user_id,
|
|
ref_user_id=ref_user_id,
|
|
name=employee_name,
|
|
user_active=True,
|
|
validity_enable=True,
|
|
validity_date_dd=1,
|
|
validity_date_mm=1,
|
|
validity_date_yyyy=2035,
|
|
)
|
|
response = user.get("Response-Code")
|
|
if response and response == "0":
|
|
BiometricEmployees.objects.create(
|
|
ref_user_id=ref_user_id,
|
|
user_id=user_id,
|
|
employee_id=employee,
|
|
device_id=device,
|
|
)
|
|
except Exception as error:
|
|
if device.machine_type == "zk":
|
|
conn.disable_device()
|
|
logger.error("An error occurred: ", str(error))
|
|
return HttpResponse("<script>window.location.reload()</script>")
|
|
return render(
|
|
request,
|
|
"biometric/add_biometric_user.html",
|
|
context={"form": employee_add_form, "device_id": device_id},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
def map_biometric_users(request, device_id):
|
|
"""
|
|
Maps an horilla employee to a biometric user on a specified biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
form = MapBioUsers(request.POST or None)
|
|
template = "biometric_users/dahua/map_dahua_users.html"
|
|
|
|
if device.machine_type == "etimeoffice":
|
|
template = "biometric_users/etimeoffice/map_etimeoffice_users.html"
|
|
form.fields["user_id"].label = _("Emp Code")
|
|
|
|
if request.method == "POST" and form.is_valid():
|
|
user_id = form.cleaned_data["user_id"]
|
|
employee = form.cleaned_data["employee_id"]
|
|
if device and employee:
|
|
BiometricEmployees.objects.create(
|
|
user_id=user_id, employee_id=employee, device_id=device
|
|
)
|
|
|
|
messages.success(
|
|
request,
|
|
_("Selected employee successfully mapped to the biometric user"),
|
|
)
|
|
form = MapBioUsers()
|
|
|
|
if device.machine_type == "etimeoffice":
|
|
form.fields["user_id"].label = _("Emp Code")
|
|
|
|
return render(
|
|
request,
|
|
template,
|
|
{"form": form, "device_id": device_id},
|
|
)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
def add_dahua_biometric_user(request, device_id):
|
|
"""
|
|
Adds a new employee to a Dahua biometric device.
|
|
|
|
This view handles the process of adding an employee as a user to a Dahua biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
form = DahuaUserForm()
|
|
if request.method == "POST":
|
|
form = DahuaUserForm(request.POST)
|
|
if form.is_valid():
|
|
employee_id = request.POST.get("employee")
|
|
card_no = request.POST.get("card_no")
|
|
user_id = request.POST.get("user_id")
|
|
card_status = request.POST.get("card_status")
|
|
card_type = request.POST.get("card_type")
|
|
password = request.POST.get("password")
|
|
valid_date_end = request.POST.get("valid_date_end")
|
|
|
|
try:
|
|
employee = Employee.objects.get(id=employee_id) if employee_id else None
|
|
except Employee.DoesNotExist:
|
|
messages.error(request, _("Employee not found."))
|
|
return render(
|
|
request, "biometric_users/dahua/add_dahua_user.html", context
|
|
)
|
|
|
|
dahua = DahuaAPI(
|
|
ip=device.machine_ip,
|
|
username=device.bio_username,
|
|
password=device.bio_password,
|
|
)
|
|
|
|
response = dahua.enroll_new_user(
|
|
card_name=employee.get_full_name() if employee else "",
|
|
card_no=card_no,
|
|
user_id=user_id,
|
|
card_status=card_status,
|
|
card_type=card_type,
|
|
password=password,
|
|
valid_date_end=valid_date_end,
|
|
)
|
|
|
|
if response.get("status_code") == 200:
|
|
BiometricEmployees.objects.create(
|
|
dahua_card_no=card_no,
|
|
user_id=user_id,
|
|
employee_id=employee,
|
|
device_id=device,
|
|
)
|
|
messages.success(
|
|
request,
|
|
_("{} added to biometric device successfully").format(
|
|
employee.get_full_name() if employee else ""
|
|
),
|
|
)
|
|
form = DahuaUserForm()
|
|
else:
|
|
messages.error(request, _("Failed to add user to biometric device."))
|
|
context = {"form": form, "device_id": device_id}
|
|
return render(request, "biometric_users/dahua/add_dahua_user.html", context)
|
|
|
|
|
|
@login_required
|
|
@hx_request_required
|
|
@install_required
|
|
def find_employee_badge_id(request):
|
|
"""
|
|
Retrieves the badge ID of an employee based on their employee ID.
|
|
"""
|
|
employee_id = request.GET.get("employee")
|
|
user_id = Employee.objects.get(id=employee_id).badge_id if employee_id else ""
|
|
input_field = f"""
|
|
<input type="text" name="user_id" maxlength="50" class="oh-input w-100"
|
|
placeholder="User ID" required="" id="id_user_id" value="{user_id}">
|
|
"""
|
|
return HttpResponse(input_field)
|
|
|
|
|
|
@login_required
|
|
@hx_request_required
|
|
@install_required
|
|
def delete_dahua_user(request, obj_id=None):
|
|
"""
|
|
Deletes a Dahua biometric user or multiple users from a device.
|
|
"""
|
|
script = "<script>window.location.reload();</script>"
|
|
try:
|
|
if request.method == "POST" and obj_id:
|
|
user = BiometricEmployees.objects.get(id=obj_id)
|
|
user.delete()
|
|
messages.success(
|
|
request, _("{} successfully deleted!").format(user.employee_id)
|
|
)
|
|
script = "<script>reloadMessage();</script>"
|
|
if request.method == "DELETE":
|
|
user_ids = request.GET.getlist("ids")
|
|
device_id = request.GET.get("device_id")
|
|
if device_id:
|
|
script = f"""
|
|
<span hx-get="/biometric/biometric-device-employees/{device_id}/"
|
|
hx-target="#dahuUsersList" hx-select="#dahuUsersList" hx-trigger="load delay:200ms"
|
|
hx-swap="outerHTML" hx-on-htmx-before-request="reloadMessage();">
|
|
</span>
|
|
"""
|
|
if user_ids:
|
|
users = BiometricEmployees.objects.filter(user_id__in=user_ids)
|
|
if users:
|
|
count = users.count()
|
|
users.delete()
|
|
messages.success(
|
|
request, _("{} users successfully deleted!").format(count)
|
|
),
|
|
else:
|
|
messages.warning(
|
|
request,
|
|
_("No rows are selected for deleting users from device."),
|
|
)
|
|
except Exception as e:
|
|
messages.error(request, _("An error occurred: {}").format(str(e)))
|
|
return HttpResponse(script)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.delete_biometricemployees")
|
|
def delete_etimeoffice_user(request, obj_id=None):
|
|
"""
|
|
Deletes a user or multiple users from the eTimeOffice biometric system.
|
|
"""
|
|
script = "<script>window.location.href = '/';</script>"
|
|
if request.method == "POST":
|
|
user = BiometricEmployees.objects.get(id=obj_id)
|
|
device_id = user.device_id.id
|
|
user.delete()
|
|
messages.success(
|
|
request, _("{} successfully deleted!").format(user.employee_id)
|
|
)
|
|
script = "<script>reloadMessage();</script>"
|
|
if request.method == "DELETE":
|
|
user_ids = request.GET.getlist("ids")
|
|
device_id = request.GET.get("device_id")
|
|
if device_id:
|
|
script = f"""
|
|
<span hx-get="/biometric/biometric-device-employees/{device_id}/"
|
|
hx-target="#eTimeOfficeUsersList" hx-select="#eTimeOfficeUsersList" hx-trigger="load delay:200ms"
|
|
hx-swap="outerHTML" hx-on-htmx-before-request="reloadMessage();">
|
|
</span>
|
|
"""
|
|
if user_ids:
|
|
users = BiometricEmployees.objects.filter(user_id__in=user_ids)
|
|
if users:
|
|
count = users.count()
|
|
users.delete()
|
|
messages.success(
|
|
request, _("{} users successfully deleted!").format(count)
|
|
),
|
|
else:
|
|
messages.warning(
|
|
request,
|
|
_("No rows are selected for deleting users from device."),
|
|
)
|
|
|
|
return HttpResponse(script)
|
|
|
|
|
|
@login_required
|
|
@install_required
|
|
@hx_request_required
|
|
@permission_required("biometric.change_biometricdevices")
|
|
def biometric_device_live(request):
|
|
"""
|
|
Activate or deactivate live capture mode for a biometric device based on the request parameters.
|
|
|
|
:param request: The Django request object.
|
|
:return: A JsonResponse containing a script to be executed on the client side.
|
|
"""
|
|
is_live = request.GET.get("is_live")
|
|
device_id = request.GET.get("deviceId")
|
|
device = BiometricDevices.objects.get(id=device_id)
|
|
is_live = is_live == "on"
|
|
if is_live:
|
|
port_no = device.port
|
|
machine_ip = device.machine_ip
|
|
password = int(device.zk_password)
|
|
conn = None
|
|
# create ZK instance
|
|
try:
|
|
if device.machine_type == "zk":
|
|
zk_device = ZK(
|
|
machine_ip,
|
|
port=port_no,
|
|
timeout=5,
|
|
password=int(password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
conn = zk_device.connect()
|
|
instance = ZKBioAttendance(machine_ip, port_no, password)
|
|
conn.test_voice(index=14)
|
|
if conn:
|
|
device.is_live = True
|
|
device.is_scheduler = False
|
|
device.save()
|
|
instance.start()
|
|
elif device.machine_type == "cosec":
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
timeout=10,
|
|
)
|
|
response = cosec.basic_config()
|
|
if response.get("app"):
|
|
device.is_live = True
|
|
device.is_scheduler = False
|
|
device.save()
|
|
thread = COSECBioAttendanceThread(device.id)
|
|
thread.start()
|
|
BIO_DEVICE_THREADS[device.id] = thread
|
|
else:
|
|
raise TimeoutError
|
|
else:
|
|
pass
|
|
|
|
script = """<script>
|
|
Swal.fire({
|
|
text: "The live capture mode has been activated successfully.",
|
|
icon: "success",
|
|
showConfirmButton: false,
|
|
timer: 1500,
|
|
timerProgressBar: true, // Show a progress bar as the timer counts down
|
|
didClose: () => {
|
|
location.reload(); // Reload the page after the SweetAlert is closed
|
|
},
|
|
});
|
|
</script>
|
|
"""
|
|
except TimeoutError as error:
|
|
device.is_live = False
|
|
device.save()
|
|
logger.error("An error comes in biometric_device_live", error)
|
|
script = """
|
|
<script>
|
|
Swal.fire({
|
|
title : "Connection unsuccessful",
|
|
text: "Please double-check the accuracy of the provided IP Address and Port Number for correctness",
|
|
icon: "warning",
|
|
showConfirmButton: false,
|
|
timer: 3000,
|
|
timerProgressBar: true,
|
|
didClose: () => {
|
|
location.reload();
|
|
},
|
|
});
|
|
</script>
|
|
"""
|
|
finally:
|
|
if conn:
|
|
conn.disconnect()
|
|
else:
|
|
device.is_live = False
|
|
device.save()
|
|
if device.machine_type == "cosec":
|
|
existing_thread = BIO_DEVICE_THREADS.get(device.id)
|
|
if existing_thread:
|
|
existing_thread.stop()
|
|
del BIO_DEVICE_THREADS[device.id]
|
|
|
|
script = """
|
|
<script>
|
|
Swal.fire({
|
|
text: "The live capture mode has been deactivated successfully.",
|
|
icon: "warning",
|
|
showConfirmButton: false,
|
|
timer: 3000,
|
|
timerProgressBar: true,
|
|
didClose: () => {
|
|
location.reload();
|
|
},
|
|
});
|
|
</script>
|
|
"""
|
|
return HttpResponse(script)
|
|
|
|
|
|
def zk_biometric_attendance_logs(device_or_devices):
|
|
"""
|
|
Retrieve and process attendance logs from one or more ZKTeco biometric devices.
|
|
|
|
Handles scenarios where the same user_id may exist across multiple devices for the same employee.
|
|
|
|
:param device_or_devices: A single BiometricDevice instance or a queryset/list of them.
|
|
:return: Tuple (number_of_attendance_processed, error_message or None)
|
|
"""
|
|
if hasattr(device_or_devices, "__iter__") and not isinstance(
|
|
device_or_devices, dict
|
|
):
|
|
devices = list(device_or_devices)
|
|
else:
|
|
devices = [device_or_devices]
|
|
|
|
errors = []
|
|
combined_attendances = []
|
|
patch_direction = {"in": 0, "out": 1}
|
|
|
|
bio_id_map = {
|
|
(bio.device_id_id, bio.user_id): bio
|
|
for bio in BiometricEmployees.objects.filter(device_id__in=devices)
|
|
}
|
|
|
|
for device in devices:
|
|
port_no = device.port
|
|
machine_ip = device.machine_ip
|
|
conn = None
|
|
zk_device = ZK(
|
|
machine_ip,
|
|
port=port_no,
|
|
timeout=5,
|
|
password=int(device.zk_password),
|
|
force_udp=False,
|
|
ommit_ping=False,
|
|
)
|
|
|
|
try:
|
|
conn = zk_device.connect()
|
|
conn.enable_device()
|
|
attendances = conn.get_attendance()
|
|
if not attendances:
|
|
continue
|
|
|
|
last_attendance_datetime = attendances[-1].timestamp
|
|
|
|
if device.last_fetch_date and device.last_fetch_time:
|
|
filtered = [
|
|
att
|
|
for att in attendances
|
|
if (att.timestamp.date() > device.last_fetch_date)
|
|
or (
|
|
att.timestamp.date() == device.last_fetch_date
|
|
and att.timestamp.time() > device.last_fetch_time
|
|
)
|
|
]
|
|
else:
|
|
filtered = attendances
|
|
|
|
# Update last fetch markers
|
|
device.last_fetch_date = last_attendance_datetime.date()
|
|
device.last_fetch_time = last_attendance_datetime.time()
|
|
device.save()
|
|
for attendance in filtered:
|
|
attendance.device = device # Attach device info
|
|
attendance.punch = (
|
|
patch_direction[device.device_direction]
|
|
if device.device_direction in patch_direction
|
|
else attendance.punch
|
|
) # Update punch code based on device direction
|
|
combined_attendances.append(attendance)
|
|
|
|
except zk_exception.ZKErrorResponse as e:
|
|
errors.append(f"[{device.name}] ZKError: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"[{device.name}] General Error", exc_info=True)
|
|
errors.append(f"[{device.name}] Error: {str(e)}")
|
|
finally:
|
|
if conn:
|
|
conn.disconnect()
|
|
|
|
# Sort all filtered attendances by time
|
|
combined_attendances.sort(key=lambda a: a.timestamp)
|
|
|
|
for attendance in combined_attendances:
|
|
user_id = attendance.user_id
|
|
punch_code = attendance.punch
|
|
date_time = django_timezone.make_aware(attendance.timestamp)
|
|
date = date_time.date()
|
|
time = date_time.time()
|
|
device_id = attendance.device.id
|
|
bio_id = bio_id_map.get((device_id, user_id))
|
|
if bio_id:
|
|
request_data = Request(
|
|
user=bio_id.employee_id.employee_user_id,
|
|
date=date,
|
|
time=time,
|
|
datetime=date_time,
|
|
)
|
|
try:
|
|
if punch_code in {0, 3, 4}:
|
|
clock_in(request_data)
|
|
elif punch_code in {1, 2, 5}:
|
|
clock_out(request_data)
|
|
except Exception:
|
|
logger.error(
|
|
f"[Device: {attendance.device.name}] Punch processing error",
|
|
exc_info=True,
|
|
)
|
|
|
|
return len(combined_attendances), "; ".join(errors) if errors else None
|
|
|
|
|
|
def zk_biometric_attendance_scheduler(device_id):
|
|
"""
|
|
Scheduler function used for attendance logs
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device and device.is_scheduler:
|
|
zk_biometric_attendance_logs(device)
|
|
|
|
|
|
def anviz_biometric_attendance_logs(device):
|
|
"""
|
|
Retrieves attendance records from an Anviz biometric device
|
|
and processes them based on device direction configuration.
|
|
"""
|
|
|
|
current_utc_time = datetime.utcnow()
|
|
|
|
anviz_device = CrossChexCloudAPI(
|
|
api_url=device.api_url,
|
|
api_key=device.api_key,
|
|
api_secret=device.api_secret,
|
|
anviz_request_id=device.anviz_request_id,
|
|
)
|
|
|
|
begin_time = (
|
|
datetime.combine(device.last_fetch_date, device.last_fetch_time)
|
|
if device.last_fetch_date and device.last_fetch_time
|
|
else current_utc_time.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
)
|
|
|
|
attendance_records = anviz_device.get_attendance_records(
|
|
begin_time=begin_time,
|
|
token=device.api_token,
|
|
)
|
|
|
|
# Update last fetch time immediately
|
|
device.last_fetch_date = current_utc_time.date()
|
|
device.last_fetch_time = current_utc_time.time()
|
|
device.save(update_fields=["last_fetch_date", "last_fetch_time"])
|
|
|
|
processed_count = 0
|
|
|
|
for attendance in attendance_records.get("list", []):
|
|
badge_id = attendance["employee"]["workno"]
|
|
punch_code = attendance["checktype"]
|
|
|
|
date_time_utc = datetime.strptime(
|
|
attendance["checktime"], "%Y-%m-%dT%H:%M:%S%z"
|
|
)
|
|
date_time_obj = date_time_utc.astimezone(django_timezone.get_current_timezone())
|
|
|
|
employee = Employee.objects.filter(badge_id=badge_id).first()
|
|
if not employee:
|
|
continue
|
|
|
|
request_data = Request(
|
|
user=employee.employee_user_id,
|
|
date=date_time_obj.date(),
|
|
time=date_time_obj.time(),
|
|
datetime=date_time_obj,
|
|
)
|
|
|
|
try:
|
|
# --------------------------------------------------
|
|
# SYSTEM DIRECTION (auto based on punch code)
|
|
# --------------------------------------------------
|
|
if device.device_direction == "system":
|
|
if punch_code in {0, 128}:
|
|
clock_in(request_data)
|
|
else:
|
|
clock_out(request_data)
|
|
|
|
# --------------------------------------------------
|
|
# FORCE IN DEVICE
|
|
# --------------------------------------------------
|
|
elif device.device_direction == "in":
|
|
clock_in(request_data)
|
|
|
|
# --------------------------------------------------
|
|
# FORCE OUT DEVICE
|
|
# --------------------------------------------------
|
|
elif device.device_direction == "out":
|
|
clock_out(request_data)
|
|
|
|
# --------------------------------------------------
|
|
# ALTERNATE IN / OUT DEVICE
|
|
# --------------------------------------------------
|
|
elif device.device_direction == "alternate":
|
|
last_activity = (
|
|
AttendanceActivity.objects.filter(
|
|
employee_id=employee,
|
|
attendance_date=date_time_obj.date(),
|
|
)
|
|
.order_by("-in_datetime", "-out_datetime")
|
|
.first()
|
|
)
|
|
|
|
# If no record or last record has clock_out → IN
|
|
if not last_activity or last_activity.clock_out:
|
|
clock_in(request_data)
|
|
else:
|
|
clock_out(request_data)
|
|
|
|
processed_count += 1
|
|
|
|
except Exception as error:
|
|
logger.error(
|
|
f"Attendance sync failed for employee {employee.id}",
|
|
exc_info=error,
|
|
)
|
|
|
|
return processed_count
|
|
|
|
|
|
def anviz_biometric_attendance_scheduler(device_id):
|
|
"""
|
|
Schedules the attendance log retrieval for an Anviz biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device and device.is_scheduler:
|
|
anviz_biometric_attendance_logs(device)
|
|
|
|
|
|
def cosec_biometric_attendance_logs(device):
|
|
"""
|
|
Retrieves and processes attendance logs from a COSEC biometric device.
|
|
"""
|
|
device_args = COSECAttendanceArguments.objects.filter(device_id=device).first()
|
|
last_fetch_roll_ovr_count = (
|
|
int(device_args.last_fetch_roll_ovr_count) if device_args else 0
|
|
)
|
|
last_fetch_seq_number = int(device_args.last_fetch_seq_number) if device_args else 1
|
|
|
|
cosec = COSECBiometric(
|
|
device.machine_ip,
|
|
device.port,
|
|
device.bio_username,
|
|
device.bio_password,
|
|
timeout=10,
|
|
)
|
|
attendances = cosec.get_attendance_events(
|
|
last_fetch_roll_ovr_count, int(last_fetch_seq_number) + 1
|
|
)
|
|
|
|
if not isinstance(attendances, list):
|
|
return
|
|
|
|
for attendance in attendances:
|
|
ref_user_id = attendance["detail-1"]
|
|
employee = BiometricEmployees.objects.filter(ref_user_id=ref_user_id).first()
|
|
if not employee:
|
|
continue
|
|
|
|
date_str = attendance["date"]
|
|
time_str = attendance["time"]
|
|
attendance_date = datetime.strptime(date_str, "%d/%m/%Y").date()
|
|
attendance_time = datetime.strptime(time_str, "%H:%M:%S").time()
|
|
attendance_datetime = datetime.combine(attendance_date, attendance_time)
|
|
punch_code = attendance["detail-2"]
|
|
|
|
request_data = Request(
|
|
user=employee.employee_id.employee_user_id,
|
|
date=attendance_date,
|
|
time=attendance_time,
|
|
datetime=django_timezone.make_aware(attendance_datetime),
|
|
)
|
|
|
|
try:
|
|
if punch_code in ["1", "3", "5", "7", "9", "0"]:
|
|
clock_in(request_data)
|
|
elif punch_code in ["2", "4", "6", "8", "10"]:
|
|
clock_out(request_data)
|
|
else:
|
|
pass
|
|
except Exception as error:
|
|
logger.error("Error processing attendance: ", error)
|
|
|
|
if attendances:
|
|
last_attendance = attendances[-1]
|
|
COSECAttendanceArguments.objects.update_or_create(
|
|
device_id=device,
|
|
defaults={
|
|
"last_fetch_roll_ovr_count": last_attendance["roll-over-count"],
|
|
"last_fetch_seq_number": last_attendance["seq-No"],
|
|
},
|
|
)
|
|
return len(attendances)
|
|
|
|
|
|
def cosec_biometric_attendance_scheduler(device_id):
|
|
"""
|
|
Retrieve and process attendance events from a COSEC biometric device.
|
|
|
|
This function fetches attendance events from the specified COSEC biometric device
|
|
and processes them to record clock-in and clock-out events for employees.
|
|
|
|
Args:
|
|
device_id (uuid): The ID of the COSEC biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device and device.is_scheduler:
|
|
cosec_biometric_attendance_logs(device)
|
|
|
|
|
|
def dahua_biometric_attendance_logs(device):
|
|
"""
|
|
Retrieves logs from a Dahua biometric device and marks attendance in Horilla.
|
|
|
|
This function fetches biometric logs from the specified device, processes the attendance records,
|
|
and updates the attendance system in Horilla. If an employee has an active clock-in record,
|
|
it marks their clock-out; otherwise, it registers a new clock-in entry.
|
|
|
|
Args:
|
|
device_id (int): The unique identifier of the biometric device.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
begin_time = (
|
|
datetime.combine(device.last_fetch_date, device.last_fetch_time)
|
|
+ timedelta(seconds=1)
|
|
if device.last_fetch_date and device.last_fetch_time
|
|
else datetime.combine(datetime.today(), datetime.min.time())
|
|
)
|
|
|
|
dahua = DahuaAPI(
|
|
ip=device.machine_ip, username=device.bio_username, password=device.bio_password
|
|
)
|
|
logs = dahua.get_control_card_rec(start_time=begin_time)
|
|
|
|
if logs.get("status_code") == 200:
|
|
for log in logs.get("records", []):
|
|
user_id = log.get("user_id")
|
|
if not user_id:
|
|
continue
|
|
|
|
employee = BiometricEmployees.objects.filter(
|
|
user_id=user_id, device_id=device
|
|
).first()
|
|
if not employee:
|
|
continue
|
|
|
|
attendance_datetime = log.get("create_time")
|
|
user_tz = pytz.timezone(TIME_ZONE)
|
|
attendance_datetime = attendance_datetime.astimezone(user_tz)
|
|
|
|
last_none_activity = (
|
|
AttendanceActivity.objects.filter(
|
|
employee_id=employee.employee_id,
|
|
clock_out=None,
|
|
)
|
|
.order_by("in_datetime")
|
|
.last()
|
|
)
|
|
|
|
request_data = Request(
|
|
user=employee.employee_id.employee_user_id,
|
|
date=attendance_datetime.date(),
|
|
time=attendance_datetime.time(),
|
|
datetime=attendance_datetime,
|
|
)
|
|
|
|
if last_none_activity:
|
|
clock_out(request_data)
|
|
else:
|
|
clock_in(request_data)
|
|
|
|
if logs.get("records"):
|
|
last_log = logs["records"][-1]
|
|
device.last_fetch_date = last_log["create_time"].date()
|
|
device.last_fetch_time = last_log["create_time"].time()
|
|
device.save()
|
|
return len(logs.get("records", []))
|
|
else:
|
|
return "error"
|
|
|
|
|
|
def dahua_biometric_attendance_scheduler(device_id):
|
|
"""
|
|
Schedules the attendance log retrieval for a Dahua biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device and device.is_scheduler:
|
|
dahua_biometric_attendance_logs(device)
|
|
|
|
|
|
def etimeoffice_biometric_attendance_logs(device):
|
|
"""
|
|
Retrieves and processes attendance logs from an eTimeOffice biometric device.
|
|
"""
|
|
now = datetime.now()
|
|
etimeoffice = ETimeOfficeAPI(
|
|
username=device.bio_username,
|
|
password=device.bio_password,
|
|
)
|
|
|
|
from_date = (
|
|
f"{(datetime.combine(device.last_fetch_date, device.last_fetch_time) + timedelta(minutes=1)):%d/%m/%Y_%H:%M}"
|
|
if device.last_fetch_date and device.last_fetch_time
|
|
else f"{now:%d/%m/%Y}_00:00"
|
|
)
|
|
to_date = f"{now:%d/%m/%Y_%H:%M}"
|
|
|
|
logs = etimeoffice.download_punch_data(from_date=from_date, to_date=to_date)
|
|
if logs.get("Msg") != "Success":
|
|
return "error"
|
|
|
|
punch_data = logs.get("PunchData", [])
|
|
if not punch_data:
|
|
return 0
|
|
|
|
user_tz = pytz.timezone(TIME_ZONE)
|
|
|
|
employee_map = {
|
|
emp.user_id: emp for emp in BiometricEmployees.objects.filter(device_id=device)
|
|
}
|
|
|
|
for log in reversed(punch_data):
|
|
user_id = log.get("Empcode")
|
|
if not user_id or user_id not in employee_map:
|
|
continue
|
|
|
|
employee = employee_map[user_id]
|
|
attendance_datetime = log["PunchDate"].astimezone(user_tz)
|
|
|
|
request_data = Request(
|
|
user=employee.employee_id.employee_user_id,
|
|
date=attendance_datetime.date(),
|
|
time=attendance_datetime.time(),
|
|
datetime=attendance_datetime,
|
|
)
|
|
|
|
last_none_activity = (
|
|
AttendanceActivity.objects.filter(
|
|
employee_id=employee.employee_id,
|
|
clock_out=None,
|
|
)
|
|
.order_by("in_datetime")
|
|
.last()
|
|
)
|
|
|
|
if last_none_activity:
|
|
clock_out(request_data)
|
|
else:
|
|
clock_in(request_data)
|
|
|
|
last_log = punch_data[0]
|
|
device.last_fetch_date, device.last_fetch_time = (
|
|
last_log["PunchDate"].date(),
|
|
last_log["PunchDate"].time(),
|
|
)
|
|
device.save()
|
|
|
|
return len(punch_data)
|
|
|
|
|
|
def etimeoffice_biometric_attendance_scheduler(device_id):
|
|
"""
|
|
Schedules the attendance log retrieval for an eTimeOffice biometric device.
|
|
"""
|
|
device = BiometricDevices.find(device_id)
|
|
if device and device.is_scheduler:
|
|
etimeoffice_biometric_attendance_logs(device)
|
|
|
|
|
|
try:
|
|
devices = BiometricDevices.objects.all().update(is_live=False)
|
|
for device in BiometricDevices.objects.filter(is_scheduler=True):
|
|
if device:
|
|
if str_time_seconds(device.scheduler_duration) > 0:
|
|
if device.machine_type == "anviz":
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: anviz_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
elif device.machine_type == "zk":
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: zk_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
id=f"biometric_{device.id}",
|
|
)
|
|
scheduler.start()
|
|
elif device.machine_type == "dahua":
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: dahua_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
|
|
elif device.machine_type == "cosec":
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: cosec_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
|
|
elif device.machine_type == "etimeoffice":
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
lambda: etimeoffice_biometric_attendance_scheduler(device.id),
|
|
"interval",
|
|
seconds=str_time_seconds(device.scheduler_duration),
|
|
)
|
|
scheduler.start()
|
|
else:
|
|
pass
|
|
except:
|
|
pass
|