diff --git a/biometric/__init__.py b/biometric/__init__.py new file mode 100644 index 0000000..049a679 --- /dev/null +++ b/biometric/__init__.py @@ -0,0 +1,7 @@ +""" +biometric app + +This app contains modules for handling biometric devices. +""" + +from . import settings diff --git a/biometric/admin.py b/biometric/admin.py new file mode 100644 index 0000000..e1d5a55 --- /dev/null +++ b/biometric/admin.py @@ -0,0 +1,16 @@ +""" +Register models with the Django admin site. + +This section of code registers the BiometricDevices and BiometricEmployees models +with the Django admin site, allowing them to be managed via the admin interface. + +""" + +from django.contrib import admin + +from .models import BiometricDevices, BiometricEmployees, COSECAttendanceArguments + +# Register your models here. +admin.site.register(BiometricDevices) +admin.site.register(BiometricEmployees) +admin.site.register(COSECAttendanceArguments) diff --git a/biometric/anviz.py b/biometric/anviz.py new file mode 100644 index 0000000..302fd32 --- /dev/null +++ b/biometric/anviz.py @@ -0,0 +1,172 @@ +""" +CrossChexCloudAPI module for Anviz Biometric Integration + +This module provides a wrapper for interacting with the CrossChex Cloud API to manage +authentication, attendance data retrieval, and token handling. It allows for secure +communication with the API, including fetching and validating tokens, and retrieving +attendance records . +""" + +from datetime import datetime + +import requests + + +class CrossChexCloudAPI: + """ + CrossChexCloudAPI: A class to interact with the CrossChex Cloud API for attendance data + and token management. + """ + + def __init__(self, api_url, api_key, api_secret, anviz_request_id): + """ + Initializes the CrossChexCloudAPI object with necessary parameters, such as API URL, + credentials (API key and secret), and request ID for the connection. + """ + self.api_url = api_url + self.api_key = api_key + self.api_secret = api_secret + self.anviz_request_id = anviz_request_id + self.token = None + self.expires = None + self.auth_error = { + "header": {"nameSpace": "System", "name": "Exception"}, + "payload": {"type": "AUTH_ERROR", "message": "AUTH_ERROR"}, + } + self.expires_error = { + "header": {"nameSpace": "System", "name": "Exception"}, + "payload": {"type": "TOKEN_EXPIRES", "message": "TOKEN_EXPIRES"}, + } + + def _get_timestamp(self): + """ + Generates a UTC timestamp in ISO 8601 format. + """ + return datetime.utcnow().isoformat() + "Z" + + def _post(self, data): + """ + Sends a POST request with the given data to the API and handles the response, including + automatic token renewal in case of expiration or authentication error. + """ + response = requests.post(self.api_url, json=data, timeout=5) + response_data = response.json() + + if "payload" in response_data: + if response_data["payload"] == self.expires_error: + self.get_token() + response = requests.post(self.api_url, json=data, timeout=5) + response_data = response.json() + elif response_data["payload"] == self.auth_error: + raise Exception("Authentication error: API key or secret is incorrect.") + + response.raise_for_status() + return response_data + + def _is_token_expired(self): + """Check if the token is expired.""" + if self.expires: + expires_datetime = datetime.fromisoformat(self.expires) + # Remove timezone info to make it offset-naive + expires_datetime = expires_datetime.replace(tzinfo=None) + return datetime.utcnow() > expires_datetime + return True + + def get_token(self): + """Fetch a new token if expired or not present, and store it in the database.""" + if self.token is None or self._is_token_expired(): + data = { + "header": { + "nameSpace": "authorize.token", + "nameAction": "token", + "version": "1.0", + "requestId": self.anviz_request_id, + "timestamp": self._get_timestamp(), + }, + "payload": {"api_key": self.api_key, "api_secret": self.api_secret}, + } + response = self._post(data) + self.token = response.get("payload").get("token") + self.expires = response.get("payload").get("expires") + + return self.token, self.expires + + def test_connection(self): + """Test connection and fetch the token and expiry.""" + token, expires = self.get_token() + return {"token": token, "expires": expires} + + def get_attendance_payload( + self, begin_time, end_time, order, page, per_page, token + ): + """Constructs the payload for retrieving attendance records.""" + current_utc_time = datetime.utcnow() + begin_time = begin_time or current_utc_time.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + end_time = end_time or current_utc_time + begin_time_str = begin_time.isoformat() + "+00:00" + end_time_str = end_time.isoformat() + "+00:00" + return { + "header": { + "nameSpace": "attendance.record", + "nameAction": "getrecord", + "version": "1.0", + "requestId": self.anviz_request_id, + "timestamp": self._get_timestamp(), + }, + "authorize": { + "type": "token", + "token": token, + }, + "payload": { + "begin_time": begin_time_str, + "end_time": end_time_str, + "order": order, + "page": page, + "per_page": per_page, + }, + } + + def get_attendance_records( + self, + begin_time=None, + end_time=None, + order="asc", + page=1, + per_page=100, + token=None, + ): + """Get attendance records, optimizing token usage and handling pagination.""" + all_records = [] + token = token or self.get_token()[0] + + while True: + payload_data = self.get_attendance_payload( + begin_time=begin_time, + end_time=end_time, + order=order, + page=str(page), + per_page=str(per_page), + token=token, + ) + response = self._post(payload_data) + + # Safe extraction to avoid KeyError + payload = response.get("payload", {}) if isinstance(response, dict) else {} + records = payload.get("list", []) + + all_records.extend(records) + + page_count = payload.get("pageCount", 0) + if page >= page_count or not records: + break + + page += 1 + + return { + "token": self.token, + "expires": self.expires, + "list": all_records, + "count": len(all_records), + } diff --git a/biometric/apps.py b/biometric/apps.py new file mode 100644 index 0000000..89bbf6c --- /dev/null +++ b/biometric/apps.py @@ -0,0 +1,34 @@ +""" +Django application configuration for the biometric app. +""" + +from django.apps import AppConfig + + +class BiometricConfig(AppConfig): + """ + This class defines the configuration for the biometric Django app. It sets the + default auto field to use a BigAutoField for model primary keys. + + Attributes: + default_auto_field (str): The default auto field to use for model primary keys. + name (str): The name of the Django app, which is 'biometric'. + """ + + default_auto_field = "django.db.models.BigAutoField" + name = "biometric" + + def ready(self): + from django.urls import include, path + + from horilla.horilla_settings import APPS + from horilla.urls import urlpatterns + + APPS.append("biometric") + urlpatterns.append( + path("biometric/", include("biometric.urls")), + ) + + from biometric import sidebar + + super().ready() diff --git a/biometric/context_processors.py b/biometric/context_processors.py new file mode 100644 index 0000000..7565bd4 --- /dev/null +++ b/biometric/context_processors.py @@ -0,0 +1,35 @@ +""" +Utility functions related to biometric attendance. + +This file contains utility functions related to biometric attendance, +including a function to check if the biometric system is installed. + +Functions: + biometric_is_installed(request): Checks if the biometric system is installed. +""" + +from base.models import BiometricAttendance + + +def biometric_is_installed(_request): + """ + Check if the biometric system is installed. + + This function checks if the biometric system is installed by querying the + BiometricAttendance model. If no BiometricAttendance object exists, it + creates one with 'is_installed' set to False. + + Args: + request: The HTTP request object. + + Returns: + dict: A dictionary containing a single key-value pair indicating whether + the biometric system is installed. The key is 'is_installed', and the value + is a boolean indicating the installation status. + """ + instance = BiometricAttendance.objects.first() + if not instance: + BiometricAttendance.objects.create(is_installed=False) + instance = BiometricAttendance.objects.first() + is_installed = instance.is_installed + return {"is_installed": is_installed} diff --git a/biometric/cosec.py b/biometric/cosec.py new file mode 100644 index 0000000..e27e307 --- /dev/null +++ b/biometric/cosec.py @@ -0,0 +1,744 @@ +""" +This module provides a Python interface to interact with a COSEC biometric device. +It allows users to perform various operations such as configuring device settings, +managing users, retrieving attendance events, etc. +""" + +import xml.etree.ElementTree as ET +from base64 import b64encode + +import requests + +cosec_api_response_codes = { + "0": "Successful", + "1": "Failed - Invalid Login Credentials", + "2": "Date and time manual set failed", + "3": "Invalid Date/Time", + "4": "Maximum users are already configured.", + "5": "Image size is too big.", + "6": "Image format not supported", + "7": "Card 1 and card 2 are identical", + "8": "Card ID exists", + "9": "Finger print template/ Palm template/ Face template\ + already exists/ Face Image already exists", + "10": "No Record Found", + "11": "Template size/ format mismatch", + "12": "FP Memory full", + "13": "User id not found", + "14": "Credential limit reached", + "15": "Reader mismatch/ Reader not configured", + "16": "Device Busy", + "17": "Internal process error ", + "18": "PIN already exists", + "19": "Credential not found", + "20": "Memory Card Not Found", + "21": "Reference User ID exists", + "22": "Wrong Selection", + "23": "Palm template mode mismatch", + "24": "Feature not enabled in the configuration", + "25": "Message already exists for same user for same date", + "26": "Invalid smart card format/Parameters not applicable as per card type defined.", + "27": "Time Out", + "28": "Read/Write failed", + "29": "Wrong Card Type", + "30": "key mismatch", + "31": "invalid card", + "32": "Scan failed", + "33": "Invalid value", + "34": "Credential does not match", + "35": "Failure", + "36": "Face Not Detected", + "37": "User Conflict", + "38": "Enroll Conflict", + "39": "Face Mask Detected", + "40": "Full Face Not Visible", + "41": "Face Not Straight", +} + +true_false_arguments = [ + "enroll-on-device", + "week-day0", + "week-day1", + "week-day2", + "week-day3", + "week-day4", + "week-day5", + "week-day6", + "alarm", + "tamper-alarm", + "auto-alarm-ack", + "thresh-temp-exceeded", + "allow-exit-when-locked", + "auto-relock", + "asc-active", + "door-sense-active", + "exit-switch", + "aux-output-enable", + "greeting-msg-enable", + "buzzer-mute", + "enable", + "enable-signal-wait", + "read-csn", +] + + +class COSECBiometric: + """ + A Python interface to interact with a COSEC biometric device. + + This class provides methods for configuring device settings, managing users, + retrieving attendance events, and other operations. + + Usage: + 1. Instantiate the COSECBiometric class with the required parameters: + IP address, port, username, and password. + 2. Use the provided methods to perform specific actions on the biometric device. + """ + + def __init__(self, machine_ip, port, username, password, timeout=60): + """ + Initialize the COSECBiometric object with the specified parameters. + + Args: + machine_ip (str): The IP address of the COSEC biometric device. + port (int): The port number of the COSEC biometric device. + username (str): The username for accessing the biometric device. + password (str): The password for accessing the biometric device. + timeout (int, optional): The timeout for HTTP requests (default is 60 seconds). + """ + self.__ip = machine_ip + self.__port = port + self.__timeout = timeout + self.__username = username + self.__password = password + self.__header = {"Authorization": self.__generate_auth_header()} + self.__base_url = f"http://{self.__ip}/device.cgi" + self.__user_fields = [] + + def __generate_auth_header(self): + """ + Generate the Authorization header for making authenticated requests to the COSEC + biometric device. + + This method creates an Authorization header using the provided username and + password, encoded in Base64 format as per the Basic authentication scheme. + + Returns: + str: The Authorization header value in the format 'Basic '. + """ + credentials = f"{self.__username}:{self.__password}".encode() + return "Basic " + b64encode(credentials).decode() + + def __send_request(self, url): + """ + This method sends an HTTP GET request to the specified URL with the + appropriate headers, and then parses the response to handle different scenarios + such as timeouts, access errors, unsupported content types, and valid responses. + """ + try: + # Some Device API uses HTTPDigestAuth for authentication + # response = requests.get( + # url + "&format=xml", + # timeout=self.__timeout, + # auth=HTTPDigestAuth(self.__username, self.__password) + # ) + response = requests.get( + url + "&format=xml", headers=self.__header, timeout=self.__timeout + ) + return self.__parse_response(response) + except requests.Timeout: + return {"Timeout": "Request Timeout"} + + def __parse_response(self, response): + """ + Parse the response received from the COSEC biometric device. + + This method parses the HTTP response received from the COSEC biometric device, + handles different scenarios such as HTTP status codes, content types, + and response formats, and extracts relevant data from the response. + + Args: + response (requests.Response): The HTTP response object received from + the device. + + Returns: + dict: A dictionary representing the parsed response. If the response status + code is 200 (OK), and the content type is XML, the dictionary may contain + response data. If there is an error or unsupported content, the dictionary + will contain an appropriate error message. + """ + if response.status_code != 200: + return {"Error": "Access Error"} + if response.headers.get("Content-Type") != "text/xml": + return {"Error": "Unsupported content"} + + response_data = response.content.decode("utf-8") + root = ET.fromstring(response_data) + text_content = root.text.strip() + if not text_content: + events = root.findall("Events") + if events: + parsed_response = [] + for event in root.findall("Events"): + event_dict = {} + for elem in event: + event_dict[elem.tag] = elem.text + if event_dict.get("event-id") == "101": + parsed_response.append(event_dict) + else: + parsed_response = {elem.tag: elem.text for elem in root} + if parsed_response.get("Response-Code"): + message = cosec_api_response_codes.get( + parsed_response["Response-Code"] + ) + parsed_response["message"] = message + else: + parsed_response = {} + parsed_response["error"] = text_content + return parsed_response + + def __authenticate_arguments(self, url, kwargs): + """ + Authenticate and validate the arguments before sending a request to the COSEC + biometric device. + + This method verifies that the provided arguments are supported by the + specified URL endpoint and raises a ValueError if any unsupported arguments + are found. + + Args: + url (str): The URL endpoint for the request. + kwargs (dict): A dictionary containing the arguments to be authenticated. + + Raises: + ValueError: If any of the provided arguments are not supported by the specified + URL endpoint. + """ + if url == "special-function": + url = f"{self.__base_url}/{url}?action=get&sp-fn-index=1" + elif url == "smart-card-format": + url = f"{self.__base_url}/{url}?action=get&card-type=1&index=1" + else: + url = f"{self.__base_url}/{url}?action=get" + response = self.__send_request(url) + supported_args = response.keys() + unsupported_args = [arg for arg in kwargs.keys() if arg not in supported_args] + + if unsupported_args: + unsupported_args_str = ", ".join(unsupported_args) + raise ValueError( + f"The following argument(s) are not supported\ + : {unsupported_args_str}. Supported arguments are: {', '.join(supported_args)}" + ) + + def basic_config(self, action="get", **kwargs): + """ + Configure or retrieve basic settings of the COSEC biometric device. + + This method allows the user to configure or retrieve basic settings of + the COSEC biometric device, such as device name, IP settings, time settings, + etc. + """ + url = f"{self.__base_url}/device-basic-config?action={action}" + + if action == "set": + self.__authenticate_arguments("device-basic-config", kwargs) + url += "&" + "&".join([f"{key}={value}" for key, value in kwargs.items()]) + + return self.__send_request(url) + + def finger_reader_parameter_configuration(self, action="get", **kwargs): + """ + Configure or retrieve parameters related to the finger reader of the COSEC + biometric device. + + This method allows the user to configure or retrieve parameters related to + the finger reader of the COSEC biometric device, such as sensitivity, timeout, + template format, etc. + """ + url = f"{self.__base_url}/finger-parameter?action={action}" + + if action == "set": + self.__authenticate_arguments("finger-parameter", kwargs) + url += "&" + "&".join([f"{key}={value}" for key, value in kwargs.items()]) + + return self.__send_request(url) + + def enrollment_configuration(self, action="get", **kwargs): + """ + Configure or retrieve enrollment options of the COSEC biometric device. + + This method allows the user to configure or retrieve enrollment options + of the COSEC biometric device, such as enabling or disabling self-enrollment, + setting enrollment timeout, template format, etc. + """ + url = f"{self.__base_url}/enroll-options?action={action}" + + if action == "set": + self.__authenticate_arguments("enroll-options", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + + return self.__send_request(url) + + def access_settings_configuration(self, action="get", **kwargs): + """ + Configure or retrieve access settings of the COSEC biometric device. + + This method allows the user to configure or retrieve access settings of the + COSEC biometric device, such as door access control, alarm settings, + exit switches, etc + """ + url = f"{self.__base_url}/access-setting?action={action}" + + if action == "set": + self.__authenticate_arguments("access-setting", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + + return self.__send_request(url) + + def alarm_configuration(self, action="get", **kwargs): + """ + Configure or retrieve alarm settings of the COSEC biometric device. + + This method allows the user to configure or retrieve alarm settings of + the COSEC biometric device, such as enabling or disabling alarms, setting + alarm thresholds, configuring alarm acknowledgements, etc. + """ + + url = f"{self.__base_url}/alarm?action={action}" + if action == "set": + self.__authenticate_arguments("alarm", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def date_and_time_configuration(self, action="get", **kwargs): + """ + Configure or retrieve date and time settings of the COSEC biometric device. + + This method allows the user to configure or retrieve date and time settings + of the COSEC biometric device, such as setting the current date and time, + configuring time zones, enabling daylight saving time, etc. + """ + url = f"{self.__base_url}/date-time?action={action}" + if action == "set": + self.__authenticate_arguments("date-time", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def door_features_configuration(self, action="get", **kwargs): + """ + Configure or retrieve door features settings of the COSEC biometric device. + + This method allows the user to configure or retrieve door features settings + of the COSEC biometric device, such as enabling or disabling door senses, + setting door open durations, configuring auxiliary outputs, etc. + """ + url = f"{self.__base_url}/door-feature?action={action}" + if action == "set": + self.__authenticate_arguments("door-feature", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def system_timer_configuration(self, action="get", **kwargs): + """ + Configure or retrieve system timer settings of the COSEC biometric device. + + This method allows the user to configure or retrieve system timer settings + of the COSEC biometric device, such as setting the system idle timeout, + configuring system heartbeat intervals, etc. + """ + url = f"{self.__base_url}/system-timer?action={action}" + if action == "set": + self.__authenticate_arguments("system-timer", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def special_function_configuration(self, action="get", sp_fn_index="1", **kwargs): + """ + Configure special functions on the COSEC biometric device. + + This method allows configuring special functions on the COSEC biometric device, + such as enabling or disabling + specific functionalities + """ + url = f"{self.__base_url}/special-function?action={action}&sp-fn-index={sp_fn_index}" + if action == "set": + self.__authenticate_arguments("special-function", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def wiegand_interface(self, action="get", **kwargs): + """ + Configure or retrieve Wiegand interface settings of the COSEC biometric device. + + This method allows the user to configure or retrieve Wiegand interface settings + of the COSEC biometric device,such as setting up Wiegand card readers, configuring + Wiegand data formats, etc. + """ + url = f"{self.__base_url}/wiegand-interface?action={action}" + if action == "set": + self.__authenticate_arguments("wiegand-interface", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def smart_card_format(self, action="get", card_type="1", index="1", **kwargs): + """ + Configure or retrieve smart card format settings of the COSEC biometric device. + + This method allows the user to configure or retrieve smart card format settings + of the COSEC biometric device, + such as setting up card types and their corresponding formats. + """ + url = ( + f"{self.__base_url}/smart-card-format?action={action}" + f"&card-type={card_type}&index={index}" + ) + + if action == "set": + self.__authenticate_arguments("smart-card-format", kwargs) + url += "&" + "&".join( + [ + f"{key}={int(value) if key in true_false_arguments else value}" + for key, value in kwargs.items() + ] + ) + return self.__send_request(url) + + def get_cosec_user(self, user_id): + """ + Retrieve user information from the COSEC biometric device. + + This method retrieves user information, such as user details, + credentials, and access rights, from the COSEC biometric device based + on the provided user ID. + """ + url = f"{self.__base_url}/users?action=get&user-id={user_id}" + return self.__send_request(url) + + def check_user_url_arguments(self, user_id, url_arguments): + """ + Check if the provided URL arguments are supported for the COSEC + biometric device's user configuration. + + This method verifies if the provided URL arguments are supported + for configuring or retrieving user settings on the COSEC biometric + device. It compares the provided arguments with the supported fields + retrieved from the device for user configuration. + """ + user = self.get_cosec_user(user_id) + if not user.get("Response-Code"): + self.__user_fields = list(user.keys()) + + else: + ref_user_id = 1 + code = "13" + while code != "0": + url = ( + f"{self.__base_url}/users?action=set&user-id={user_id}" + f"&ref-user-id={ref_user_id}&format=xml" + ) + + response = requests.get( + url, headers=self.__header, timeout=self.__timeout + ) + if response.status_code == 200: + response_data = response.content.decode("utf-8") + root = ET.fromstring(response_data) + code = root.find("Response-Code").text + if code == "0": + user = self.get_cosec_user(user_id) + self.__user_fields = list(user.keys()) + self.delete_cosec_user(user_id) + break + ref_user_id += 1 + fields = "" + for arg in url_arguments: + if arg.split("=")[0] not in self.__user_fields: + fields += arg.split("=")[0] + " , " + if fields: + raise ValueError( + f"{fields} argument is not support on this biometric device API" + ) + + def set_cosec_user( + self, + user_id, + ref_user_id, + name=None, + user_active=None, + vip=None, + validity_enable=None, + validity_time_hh=None, + validity_time_mm=None, + validity_date_dd=None, + validity_date_mm=None, + validity_date_yyyy=None, + user_pin=None, + card1=None, + card2=None, + by_pass_finger=None, + dob_enable=None, + dob_dd=None, + dob_mm=None, + dob_yyyy=None, + by_pass_palm=None, + user_group=None, + self_enrollment_enable=None, + enable_fr=None, + ): + """ + Set or update user information on the COSEC biometric device. + + This method allows setting or updating user information on the COSEC + biometric device, including user details, access rights, credentials, + validity periods, etc. + """ + if not user_id or not ref_user_id: + raise ValueError( + "Both user_id and ref_id are mandatory for create & edit a user" + ) + # user_id : Mandatory To set or retrieve the alphanumeric user ID for the selected user. + # Note: If a set request is sent against an existing user ID, then configuration for this + # user will be updated with the new values. + # ref_user_id : Mandatory for the set action.Maximum 8 digits.To select the numeric user + # ID on which the specified operation is to be done. + url_arguments = [] + url = f"{self.__base_url}/users?action=set" + url_arguments.append(f"user-id={user_id}") + url_arguments.append(f"ref-user-id={ref_user_id}") + + if name: + # Truncate name if it exceeds 15 characters + truncated_name = name[:15] if len(name) > 15 else name + url_arguments.append(f"name={truncated_name}") + + if user_active is not None: + # To activate or deactivate a user. + if user_active not in [True, False]: + raise ValueError("user_active must be either True, False, or None") + url_arguments.append(f"user-active={int(user_active)}") + + if vip is not None: + # To define a user as VIP. + # Note: A VIP user is a user with the special privilege to access a particular door. + if vip not in [True, False]: + raise ValueError("vip must be either True, False, or None") + url_arguments.append(f"vip={int(vip)}") + + if validity_enable is not None: + # To enable/disable the user validity. + if validity_enable not in [True, False]: + raise ValueError("validity_enable must be either True, False, or None") + url_arguments.append(f"validity-enable={int(validity_enable)}") + + if validity_date_dd and validity_date_mm and validity_date_yyyy: + # To define the end date for user validity.Valid Values : validity_date_dd = 1-31 & + # validity_date_mm = 1-12 validity_date_yyyy = based on device model + url_arguments.append(f"validity-date-dd={validity_date_dd}") + url_arguments.append(f"validity-date-mm={validity_date_mm}") + url_arguments.append(f"validity-date-yyyy={validity_date_yyyy}") + + if validity_time_hh and validity_time_mm: + # To define the end time for user validity.Valid Values : validity_time_hh = 00-23 + # & validity_time_mm = 00-59 + url_arguments.append(f"validity-time-hh={validity_time_hh}") + url_arguments.append(f"validity-time-mm={validity_time_mm}") + + if user_pin: + # 1 to 6 Digits . To set the user PIN or get the event from user PIN. + # Note: The user-pin can be set to a blank value. + url_arguments.append(f"user-pin={user_pin}") + + if by_pass_finger is not None: + # To enable/disable the bypass finger option. + url_arguments.append(f"by-pass-finger={by_pass_finger}") + + if by_pass_palm is not None: + # To enable/disable the bypass palm option. + if by_pass_palm not in [True, False]: + raise ValueError("by_pass_palm must be either True, False, or None") + url_arguments.append(f"by-pass-palm={int(by_pass_palm)}") + + if card1: + # Values : 64 Bits (8 bytes) (max value - 18446744073709551615). + # Defines the value of access card 1 and 2. + url_arguments.append(f"card1={card1}") + if card2: + url_arguments.append(f"card2={card2}") + + if dob_enable is not None: + # To enable/disable the display of a birthday message. + if dob_enable not in [True, False]: + raise ValueError("dob_enable must be either True, False, or None") + url_arguments.append(f"dob-enable={int(dob_enable)}") + + if dob_dd and dob_mm and dob_yyyy: + # To set or delete the date of birth for a user Valid Values : + # dob_dd = 1-31 & dob_mm = 1-12 dob_yyyy = 1990-2037 + url_arguments.append(f"dob-dd={dob_dd}") + url_arguments.append(f"dob-mm={dob_mm}") + url_arguments.append(f"dob-yyyy={dob_yyyy}") + + if user_group: + # To set the user group number. + # Note: A user can be assigned to any user group ranging from 1 to 999. + # User group number can be set/update via “Set” action. + # To remove a user from an assigned user group, user group should be set to 0. + url_arguments.append(f"user-group={user_group}") + + if self_enrollment_enable is not None: + # To enable/disable self-enrollment for user + if self_enrollment_enable not in [True, False]: + raise ValueError( + "self_enrollment_enable must be either True, False, or None" + ) + url_arguments.append( + f"self-enrollment-enable={int(self_enrollment_enable)}" + ) + + if enable_fr is not None: + # To enable/disable face recognition for a user + if enable_fr not in [True, False]: + raise ValueError("enable_fr must be either True, False, or None") + url_arguments.append(f"enable-fr={int(enable_fr)}") + self.check_user_url_arguments(user_id, url_arguments) + url += "&" + "&".join(url_arguments) + return self.__send_request(url) + + def delete_cosec_user(self, user_id): + """ + Delete a user from the COSEC biometric device. + + This method deletes a user with the specified user ID from the COSEC + biometric device. + """ + url = f"{self.__base_url}/users?action=delete&user-id={user_id}" + return self.__send_request(url) + + def enable_user_face_recognition(self, user_id, enable_fr=True): + """ + Enable or disable face recognition for a user in cosec biometric device. + """ + url = ( + f"{self.__base_url}/users?action=set&user-id={user_id}" + f"&enable-fr={int(enable_fr)}&format=xml" + ) + return self.__send_request(url) + + def get_user_credential(self, user_id, credential_type=1, finger_index=1): + """ + Retrieve the credential of a user from the COSEC biometric device. + + This method retrieves the credential of a user, such as fingerprint, + card, palm template, face template, or face image, from the COSEC biometric + device based on the provided user ID + and credential type. + """ + # type values: 1 = Finger , 2 = Card , 3 = Palm , 4 = Palm template with + # guide mode , 5 = Face Template , 6 = Face Image + if not isinstance(credential_type, int) or not isinstance(finger_index, int): + raise ValueError("type and finger_index arguments value must be integers") + + if credential_type < 1 or credential_type > 6: + raise ValueError("Type must be between 1 and 6") + + if finger_index < 1 or finger_index > 10: + raise ValueError("Finger index must be between 1 and 10") + + url = ( + f"{self.__base_url}/credential?action=get&type={credential_type}" + f"&user-id={user_id}&finger-index={finger_index}" + ) + return self.__send_request(url) + + def get_user_credential_count(self, user_id): + """ + Retrieve the credential of a user from the COSEC biometric device. + + This method retrieves the count of credentials of a user, such as fingerprint, + card, palm template, face template, or face image, from the COSEC biometric + device based on the provided user ID + and credential type. + """ + url = f"{self.__base_url}/command?action=getcount&user-id={user_id}" + return self.__send_request(url) + + def delete_cosec_user_credential(self, user_id, credential_type): + """ + Delete a specific type of credential associated + with a user from the COSEC biometric device. + + This method deletes a specific type of credential + associated with a user, such as fingerprint, + card, palm template, face template, or face image, + from the COSEC biometric device. + """ + # type values: 0 = All , 1 = Finger , 2 = Card , 3 = Palm , 4 = Palm template + # with guide mode , 5 = Face Template , 6 = Face Image + # type= 5 and 6 are applicable only for ARGO FACE. + if type < 0 or type > 6: + raise ValueError("Type must be between 0 and 6") + url = f"{self.__base_url}/credential?action=delete&user-id={user_id}&type={credential_type}" + return self.__send_request(url) + + def get_user_count(self): + """ + Retrieve the total number of users configured on the COSEC biometric device. + + This method retrieves the total number of users configured on the COSEC biometric device. + """ + url = f"{self.__base_url}/command?action=getusercount" + return self.__send_request(url) + + def get_attendance_events(self, roll_over_count=0, seq_num=1, no_of_events=100): + """ + Retrieve attendance events from the COSEC biometric device. + + This method retrieves attendance events, such as punch-in and punch-out records, + from the COSEC biometric device. + """ + url = ( + f"{self.__base_url}/events?action=getevent&roll-over-count={roll_over_count}" + f"&seq-number={seq_num}&no-of-events={no_of_events}" + ) + return self.__send_request(url) diff --git a/biometric/dahua.py b/biometric/dahua.py new file mode 100644 index 0000000..6c61506 --- /dev/null +++ b/biometric/dahua.py @@ -0,0 +1,521 @@ +""" +DahuaAPI module for interacting with Dahua biometric and access control devices. + +This module provides a set of methods for managing and configuring Dahua devices, +including retrieving system information, managing users, setting up network configurations, +and interacting with attendance logs. It communicates with Dahua devices via HTTP requests +and supports basic operations such as system reboot, setting time, and language configuration. +""" + +import re +from collections import defaultdict +from datetime import datetime +from typing import Any, Dict + +import requests +from requests.auth import HTTPDigestAuth + +key_map = { + "AttendanceState": "attendance_state", + "CardID": "card_id", + "CardName": "card_name", + "CardNo": "card_no", + "CardType": "card_type", + "CreateTime": "create_time", + "CreateTimeRealUTC": "create_time_real_utc", + "Door": "door", + "ErrorCode": "error_code", + "FaceIndex": "face_index", + "FacilityCode": "facility_code", + "HatColor": "hat_color", + "HatType": "hat_type", + "Mask": "mask", + "Method": "method", + "Notes": "notes", + "Password": "password", + "ReaderID": "reader_id", + "RecNo": "rec_no", + "RemainingTimes": "remaining_times", + "ReservedInt": "reserved_int", + "ReservedString": "reserved_string", + "RoomNumber": "room_number", + "Status": "status", + "Type": "type", + "URL": "url", + "UserID": "user_id", + "UserType": "user_type", + "VTONumber": "vto_number", +} + + +def convert_logs_to_list(logs): + """ + Converts a dictionary of logs into a list of records. + + This function processes a dictionary containing log data, identifies records by + the keys starting with "records[", and converts the corresponding values into + a list of dictionaries with more readable keys. It also handles timestamp fields + by converting them to `datetime` objects. + + Args: + logs (dict): The dictionary containing the log data to be converted. + + Returns: + list: A list of dictionaries representing individual log records, with + formatted keys and values. Each record is a dictionary with keys + mapped according to the `key_map` and timestamps converted to `datetime`. + """ + records_list = [] + record_dict = defaultdict(dict) + previous_key = None + + for key, value in logs.items(): + if key.startswith("records["): + parts = key.split(".") + current_key = parts[0] + + if previous_key and previous_key != current_key: + records_list.append(dict(record_dict)) + record_dict.clear() + time_keys = ["CreateTime", "CreateTimeRealUTC"] + if parts[-1] in time_keys: + value = datetime.fromtimestamp(int(value)) + + record_dict[key_map.get(parts[-1])] = value + previous_key = current_key + if record_dict: + records_list.append(dict(record_dict)) + + return records_list + + +class DahuaAPI: + """ + A class for interacting with Dahua biometric and access control devices. + + This class provides methods to interact with Dahua devices, including retrieving + system information, configuring device settings (network, language, general, etc.), + managing users, and processing logs related to attendance and access control. + The class communicates with the Dahua device via HTTP requests and supports + actions like enrolling users, rebooting the device, and fetching various device logs. + """ + + def __init__(self, ip: str, username: str, password: str): + # self.base_url = f"http://{ip}/cgi-bin/" + self.base_url = f"{ip}/cgi-bin/" + self.auth = HTTPDigestAuth(username, password) + self.session = requests.Session() + self.session.auth = self.auth + + def parse_response(self, response): + """ + Parses the response from the Dahua API request. + + This method processes the HTTP response from the Dahua device API. It decodes + the content of the response, checks the status code, and returns a structured + result. If the response is successful (status code 200), it attempts to parse + the content as a dictionary of key-value pairs. If the response is an error, + it returns a relevant error message + """ + content = response.content.decode("utf-8").strip() + status_code = response.status_code + + if status_code == 200: + if "\r\n" not in content and "=" not in content: + return {"result": content, "status_code": status_code} + + try: + content_dict = dict( + line.split("=", 1) for line in content.split("\r\n") if "=" in line + ) + content_dict["status_code"] = status_code + return content_dict + except Exception: + return { + "result": f"Invalid parameter {content}", + "status_code": status_code, + } + + if status_code == 400: + return { + "result": "Error: Bad Request. Check the parameters.", + "status_code": status_code, + } + + return {"result": content, "status_code": status_code} + + def _get(self, endpoint: str, params: Dict[str, Any] = None): + url = f"{self.base_url}{endpoint}" + response = self.session.get(url, params=params) + return self.parse_response(response) + + def _post(self, endpoint: str, data: Dict[str, Any]): + url = f"{self.base_url}{endpoint}" + response = self.session.post(url, data=data) + return self.parse_response(response) + + def get_system_info(self): + """Get system information.""" + endpoint = "magicBox.cgi?action=getSystemInfo" + return self._get(endpoint) + + def get_serial_number(self): + """Get the device serial number.""" + endpoint = "magicBox.cgi?action=getSerialNo" + return self._get(endpoint) + + def get_hardware_version(self): + """Get the hardware version.""" + endpoint = "magicBox.cgi?action=getHardwareVersion" + return self._get(endpoint) + + def get_device_type(self): + """Get the device type.""" + endpoint = "magicBox.cgi?action=getDeviceType" + return self._get(endpoint) + + def get_basic_config(self): + """i""" + endpoint = "configManager.cgi?action=getConfig&name=Network" + return self._get(endpoint) + + def set_basic_config(self, params: Dict[str, Any]): + """Set basic network configuration.""" + endpoint = "configManager.cgi?action=setConfig" + return self._post(endpoint, data=params) + + def get_general_config(self): + """Get general system configuration.""" + endpoint = "configManager.cgi?action=getConfig&name=General" + return self._get(endpoint) + + def set_general_config(self, params: Dict[str, Any]): + """Set general system configuration.""" + endpoint = "configManager.cgi?action=setConfig" + return self._get(endpoint, params=params) + + def get_system_time(self): + """Get the current system time.""" + endpoint = "global.cgi?action=getCurrentTime" + return self._get(endpoint) + + def set_system_time(self, date: str, time: str): + """ + Set the system time. + Date format : YYYY-MM-DD + Time format : HH-MM-SS + Combines date and time with %20 between them. + %20 character used for add space + """ + # Validate the date format (YYYY-MM-DD) + date_pattern = r"^\d{4}-\d{2}-\d{2}$" + if not re.match(date_pattern, date): + raise ValueError("Invalid date format. Expected YYYY-MM-DD.") + + # Validate the time format (HH:MM:SS) + time_pattern = r"^\d{2}:\d{2}:\d{2}$" + if not re.match(time_pattern, time): + raise ValueError("Invalid time format. Expected HH:MM:SS.") + + device_datetime = f"{date}%20{time}" + endpoint = f"global.cgi?action=setCurrentTime&time={device_datetime}" + + return self._get(endpoint) + + def reboot_device(self): + """Reboot the device.""" + endpoint = "configManager.cgi?action=reboot" + return self._post(endpoint, data={}) + + def shutdown_device(self): + """Shut down the device.""" + endpoint = "configManager.cgi?action=shutdown" + return self._post(endpoint, data={}) + + def get_language_caps(self): + """Get supported language capabilities.""" + endpoint = "magicBox.cgi?action=getLanguageCaps" + return self._get(endpoint) + + def get_language_config(self): + """Get current language configuration.""" + endpoint = "magicBox.cgi?action=getLanguageConfig" + return self._get(endpoint) + + def set_language_config(self, language: str): + """Set the device language.""" + endpoint = "magicBox.cgi?action=setLanguageConfig" + data = {"Language": language} + return self._post(endpoint, data=data) + + def get_locales_config(self): + """Get current locales configuration.""" + endpoint = "magicBox.cgi?action=getLocalesConfig" + return self._get(endpoint) + + def set_locales_config(self, params: Dict[str, Any]): + """Set locales configuration.""" + endpoint = "magicBox.cgi?action=setLocalesConfig" + return self._post(endpoint, data=params) + + def get_group_info(self, name): + """ + Retrieves information about a specific user group. + """ + if not name: + raise ValueError("The 'name' parameter is required and cannot be empty.") + endpoint = f"userManager.cgi?action=getGroupInfoAll&name={name}" + return self._get(endpoint) + + def get_group_info_all(self): + """ + Retrieves information about all user groups. + """ + endpoint = "userManager.cgi?action=getGroupInfoAll" + return self._get(endpoint) + + def enroll_new_user( + self, + card_name: str, + card_no: str, + user_id: str, + card_status: int = 0, + card_type: int = 0, + password: str = "", + doors: list[int] = None, + time_sections: list[int] = None, + vto_position: str = "", + valid_date_start: str = "", + valid_date_end: str = "", + is_valid: bool = True, + ): + """ + Enroll a new user with access control card details. + + Args: + card_name (str): Card name, up to 32 characters. + card_no (str): Card number,Must be unique. + user_id (str): User ID,Must be unique.. + card_status (int): Card status, default is 0 (Normal). + card_type (int): Card type, default is 0 (Ordinary card). + password (str): Card password (default is an empty string). + doors (list[int]): Door permissions (default is None). + time_sections (list[int]): Time sections corresponding to + door permissions (default is None). + vto_position (str): Door number linked with indoor monitor (default is an empty string). + valid_date_start (str): Start time of the validity period, + format "yyyyMMdd hhmmss" (default is an empty string). + valid_date_end (str): End time of the validity period, + format "yyyyMMdd hhmmss" (default is an empty string). + is_valid (bool): Validity of the card, default is True. + + Returns: + Response from the server. + """ + endpoint = "recordUpdater.cgi?action=insert&name=AccessControlCard" + endpoint += f"&CardName={card_name}" + endpoint += f"&CardNo={card_no}" + endpoint += f"&UserID={user_id}" + endpoint += f"&CardStatus={card_status}" + endpoint += f"&CardType={card_type}" + + if password: + endpoint += f"&Password={password}" + if doors: + for index, door in enumerate(doors): + endpoint += f"&Doors[{index}]={door}" + if time_sections: + for index, section in enumerate(time_sections): + endpoint += f"&TimeSections[{index}]={section}" + if vto_position: + endpoint += f"&VTOPosition={vto_position}" + if valid_date_start: + endpoint += f"&ValidDateStart={valid_date_start}" + if valid_date_end: + endpoint += f"&ValidDateEnd={valid_date_end}" + endpoint += f"&IsValid={'true' if is_valid else 'false'}" + + return self._get(endpoint) + + def get_user_info_all(self): + """ + Retrieves information about all users. + """ + endpoint = "userManager.cgi?action=getUserInfoAll" + return self._get(endpoint) + + def get_user_info(self, username: str): + """ + Retrieves information about a specific user. + """ + if not username: + raise ValueError("The 'name' parameter is required and cannot be empty.") + endpoint = f"userManager.cgi?action=getUserInfo&name={username}" + return self._get(endpoint) + + def add_user( + self, + username: str, + password: str, + group: str, + sharable: bool, + reserved: bool, + memo: str = "", + ): + """ + Add a new user. + + Args: + username (str): The username of the new user. + password (str): The password for the new user. + group (str): The group of the new user, either "admin" or "user". + sharable (bool): Whether the user can have multi-point login. + reserved (bool): Whether the user is reserved and cannot be deleted. + memo (str): An optional memo for the user. + + Returns: + Response from the server. + """ + all_groups_info = self.get_group_info_all() + name_list = [ + value for key, value in all_groups_info.items() if key.endswith(".Name") + ] + if group not in name_list: + raise ValueError(f"Invalid group. It must be comes in {name_list}.") + + endpoint = ( + f"userManager.cgi?action=addUser&" + f"user.Name={username}&" + f"user.Password={password}&" + f"user.Group={group}&" + f"user.Sharable={'true' if sharable else 'false'}&" + f"user.Reserved={'true' if reserved else 'false'}&" + f"user.Memo={memo}" + ) + + return self._get(endpoint) + + def delete_user(self, username: str): + """Delete an existing user.""" + endpoint = f"userManager.cgi?action=deleteUser&name={username}" + return self._get(endpoint) + + def fetch_attendance_logs(self): + """Fetch attendance logs using the API.""" + endpoint = "log.cgi?action=doFind" + params = { + "name": "AttendanceLog", + "SessionID": "session-id", + "count": 100, + "offset": 0, + } + return self._get(endpoint, params=params) + + def get_logs(self, log_type: str = "SystemLog"): + """Fetch logs from the device.""" + endpoint = "log.cgi?action=doFind" + params = { + "name": log_type, + "SessionID": "session-id", + "count": 100, + "offset": 0, + } + return self._get(endpoint, params=params) + + def get_record_config(self): + """Get record configuration.""" + endpoint = "configManager.cgi?action=getConfig&name=Record" + return self._get(endpoint) + + def set_record_config(self, params: Dict[str, Any]): + """Set record configuration.""" + endpoint = "configManager.cgi?action=setConfig" + return self._post(endpoint, data=params) + + def get_record_mode_config(self): + """Get record mode configuration.""" + endpoint = "configManager.cgi?action=getConfig&name=RecordMode" + return self._get(endpoint) + + def set_record_mode_config(self, params: Dict[str, Any]): + """Set record mode configuration.""" + endpoint = "configManager.cgi?action=setConfig" + return self._post(endpoint, data=params) + + def get_snapshot_config(self): + """Get snapshot configuration.""" + endpoint = "configManager.cgi?action=getConfig&name=Snap" + return self._get(endpoint) + + def set_snapshot_config(self, params: Dict[str, Any]): + """Set snapshot configuration.""" + endpoint = "configManager.cgi?action=setConfig" + return self._post(endpoint, data=params) + + def get_control_card_rec( + self, + card_no: str = None, + start_time: datetime = None, + end_time: datetime = None, + ): + """ + Get offline records from device. + start_time and end_time must be Python datetime objects, used to convert to Unix timestamp. + """ + endpoint = "recordFinder.cgi?action=find&name=AccessControlCardRec" + + if start_time: + if isinstance(start_time, datetime): + start_time = int(start_time.timestamp()) + else: + raise ValueError("start_time must be a Python datetime object") + + if end_time: + if isinstance(end_time, datetime): + end_time = int(end_time.timestamp()) + else: + raise ValueError("end_time must be a Python datetime object") + + if card_no: + endpoint += f"&condition.CardNo={card_no}" + if start_time: + endpoint += f"&StartTime={start_time}" + if end_time: + endpoint += f"&EndTime={end_time}" + + card_records = self._get(endpoint) + + records = convert_logs_to_list(card_records) + logs = { + "records": records, + "found": card_records.get("found"), + "status_code": card_records.get("status_code"), + } + + return logs + + +# Example usage +# dahua = DahuaAPI(ip="192.168.100.195", username="admin", password="User@123") +# result = dahua.get_system_time() +# result1 = dahua.get_control_card_rec(start_time="1736418923") +# result1 = dahua.get_snapshot_config() +# result1 = dahua.delete_user(username="TestUser") +# result2 = dahua.get_user_info_all() +# result = dahua.get_user_info(name="TestUser") +# result1 = dahua.add_user( +# username="TestUser", +# password="Test@!12", +# group="admin", +# sharable=True, +# reserved=False, +# memo="TestUser Group", +# ) +# response = dahua.enroll_new_user( +# card_name="Nikhil Ravi", # Card name +# card_no="987564", # Card number +# user_id="CTS437", # User ID +# card_status=0, # Card status (Normal) +# card_type=0, # Card type (Ordinary card) +# password="Nikh@!12", # Password for card + password +# ) diff --git a/biometric/etimeoffice.py b/biometric/etimeoffice.py new file mode 100644 index 0000000..68d2a8b --- /dev/null +++ b/biometric/etimeoffice.py @@ -0,0 +1,122 @@ +""" +ETimeOfficeAPI Class for interacting with the ETimeOffice API. + +This module provides an interface to interact with the ETimeOffice API. +It includes methods to fetch punch data, validate dates, and convert response data +into Python datetime objects for further processing. +""" + +from datetime import datetime + +import requests +from requests.auth import HTTPBasicAuth + + +class ETimeOfficeAPI: + """ + A client for interacting with the ETimeOffice API to fetch punch data and related information. + """ + + def __init__(self, username, password, base_url="https://api.etimeoffice.com/api/"): + """ + Initialize the ETimeOfficeAPI client. + """ + self.username = username + self.password = password + self.base_url = base_url.rstrip("/") + "/" + + def _is_valid_date(self, date_str, with_time=True): + """ + Validate date format. + """ + try: + if with_time: + datetime.strptime(date_str, "%d/%m/%Y_%H:%M") + else: + datetime.strptime(date_str, "%d/%m/%Y") + return True + except ValueError: + return False + + def _convert_punch_dates(self, response_data): + """ + Convert punch data strings into datetime objects. + """ + if not response_data.get("Error", True): + if "PunchData" in response_data: + for punch in response_data["PunchData"]: + try: + punch["PunchDate"] = datetime.strptime( + punch["PunchDate"], "%d/%m/%Y %H:%M:%S" + ) + except ValueError: + pass + if "InOutPunchData" in response_data: + for punch in response_data["InOutPunchData"]: + try: + punch["DateString"] = datetime.strptime( + punch["DateString"], "%d/%m/%Y" + ).date() + except ValueError: + pass + try: + punch["INTime"] = datetime.strptime( + punch["INTime"], "%H:%M" + ).time() + except: + pass + try: + punch["OUTTime"] = datetime.strptime( + punch["OUTTime"], "%H:%M" + ).time() + except: + pass + return response_data + + def _fetch_data(self, endpoint, emp_code, from_date, to_date, with_time=True): + """ + Make an authenticated API request and parse punch data. + """ + if not ( + self._is_valid_date(from_date, with_time) + and self._is_valid_date(to_date, with_time) + ): + return { + "Error": True, + "Msg": "Error: Invalid date format. Expected format: " + + ("DD/MM/YYYY_HH:MM" if with_time else "DD/MM/YYYY"), + } + + url = f"{self.base_url}{endpoint}?Empcode={emp_code}&FromDate={from_date}&ToDate={to_date}" + response = requests.get( + url, auth=HTTPBasicAuth(self.username, self.password), timeout=20 + ) + return self._convert_punch_dates(response.json()) + + def download_punch_data(self, from_date, to_date, emp_code="ALL"): + """ + Download punch data with timestamps. + """ + return self._fetch_data( + "DownloadPunchData", emp_code, from_date, to_date, with_time=True + ) + + def download_punch_data_mcid(self, from_date, to_date, emp_code="ALL"): + """ + Download punch data with MCID (Machine Code ID). + """ + return self._fetch_data( + "DownloadPunchDataMCID", emp_code, from_date, to_date, with_time=True + ) + + def download_in_out_punch_data(self, from_date, to_date, emp_code="ALL"): + """ + Download in and out punch data (without timestamps). + """ + return self._fetch_data( + "DownloadInOutPunchData", emp_code, from_date, to_date, with_time=False + ) + + +# api = ETimeOfficeAPI(username={corporateid}:{usename}:{password}:true",password="") +# response = api.download_punch_data(from_date="25/03/2025_00:00",to_date="25/03/2025_12:22") diff --git a/biometric/filters.py b/biometric/filters.py new file mode 100644 index 0000000..c3b0f19 --- /dev/null +++ b/biometric/filters.py @@ -0,0 +1,37 @@ +# pylint: disable=too-few-public-methods +""" +Module for defining filters related to biometric devices. + +This module contains the definition of the BiometricDeviceFilter class, +which is used to filter instances of BiometricDevices +""" + +import django_filters + +from base.filters import FilterSet +from biometric.models import BiometricDevices + + +class BiometricDeviceFilter(FilterSet): + """ + Filter class for querying biometric devices. + + This class defines filters for querying instances of BiometricDevices + based on various criteria such as name, machine type, and activity status. + """ + + search = django_filters.CharFilter(field_name="name", lookup_expr="icontains") + + class Meta: + """ + Meta class to add additional options + """ + + model = BiometricDevices + fields = [ + "name", + "machine_type", + "is_active", + "is_scheduler", + "is_live", + ] diff --git a/biometric/forms.py b/biometric/forms.py new file mode 100644 index 0000000..31d2222 --- /dev/null +++ b/biometric/forms.py @@ -0,0 +1,362 @@ +# pylint: disable=too-few-public-methods +""" +Module containing forms for managing biometric devices and associated data. + +This module provides Django forms for creating and managing biometric devices, +employee biometric data, COSEC users, and related configurations. +""" + +from django import forms +from django.db.models import Q +from django.utils.translation import gettext_lazy as _ + +from base.forms import Form, ModelForm +from base.methods import reload_queryset +from employee.models import Employee +from horilla.horilla_middlewares import _thread_locals +from horilla_widgets.forms import default_select_option_template + +from .models import BiometricDevices, BiometricEmployees + + +class BiometricDeviceForm(ModelForm): + """ + Form for creating and updating biometric device configurations. + + This form is used to create and update biometric device configurations. + It includes fields for specifying the device name, IP address, TCP communication port, + and other relevant settings. Additionally, it excludes fields related to scheduler + settings and device activation status. + """ + + class Meta: + """ + Meta class to add additional options + """ + + model = BiometricDevices + fields = "__all__" + exclude = [ + "is_scheduler", + "scheduler_duration", + "last_fetch_date", + "last_fetch_time", + "is_active", + ] + widgets = { + "machine_type": forms.Select( + attrs={ + "id": "machineTypeInput", + "onchange": "machineTypeChange($(this))", + } + ), + "bio_password": forms.TextInput( + attrs={ + "class": "oh-input oh-input--password w-100", + "type": "password", + } + ), + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + company_widget = self.fields["company_id"].widget + if isinstance(company_widget, forms.Select): + company_widget.option_template_name = default_select_option_template + + +class BiometricDeviceSchedulerForm(ModelForm): + """ + Form for updating the scheduler duration of a biometric device. + + This form is used to update the scheduler duration of a biometric + device to fetch attendance data. + It includes a field for entering the scheduler duration in the format HH:MM. + """ + + class Meta: + """ + Meta class to add additional options + """ + + model = BiometricDevices + fields = ["scheduler_duration"] + labels = { + "scheduler_duration": _("Enter the duration in the format HH:MM"), + } + + +class EmployeeBiometricAddForm(Form): + """ + Form for adding employees to a biometric device. + + This form allows administrators to add employees to a biometric device + for biometric authentication. It includes a field for selecting employees from + a queryset and ensures that only active employees not already associated with + a 'zk' type biometric device are available for selection. + """ + + employee_ids = forms.ModelMultipleChoiceField( + queryset=Employee.objects.all(), + widget=forms.SelectMultiple(), + label=_("Employees"), + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.request = getattr(_thread_locals, "request") + self.device_id = ( + self.request.resolver_match.kwargs.get("device_id", None) + if self.request.resolver_match + else None + ) + self.device = BiometricDevices.find(self.device_id) + zk_employee_ids = BiometricEmployees.objects.filter( + device_id=self.device + ).values_list("employee_id", flat=True) + self.fields["employee_ids"].queryset = Employee.objects.filter( + is_active=True + ).exclude(id__in=zk_employee_ids) + + +class CosecUserAddForm(Form): + """ + Form for adding users to a COSEC biometric device. + + This form allows administrators to add multiple users to a COSEC biometric device + for biometric authentication. It includes a field for selecting users from + a queryset and ensures that only active users not already associated with + a 'cosec' type biometric device are available for selection. + """ + + employee_ids = forms.ModelMultipleChoiceField( + queryset=Employee.objects.all(), + widget=forms.SelectMultiple(), + label=_("Employees"), + ) + + def __init__(self, *args, device_id=None, **kwargs): + super().__init__(*args, **kwargs) + cosec_employee_ids = BiometricEmployees.objects.filter( + device_id=device_id + ).values_list("employee_id", flat=True) + self.fields["employee_ids"].queryset = Employee.objects.filter( + is_active=True + ).exclude(id__in=cosec_employee_ids) + + +class COSECUserForm(Form): + """ + Form for adding or updating users in a COSEC biometric device. + + This form allows administrators to add or update users in a COSEC biometric + device. It includes fields for specifying the user's name, whether the user + is active, whether the user is a VIP user, whether validity is enabled for + the user, the validity end date, and whether to bypass finger-based + authentication for the user. It provides validation to ensure that the + name does not exceed 15 characters and that the validity end date is + provided when validity is enabled. + """ + + name = forms.CharField( + label=_("Employee Name"), + help_text=_("15 characters max."), + widget=forms.TextInput(attrs={"class": "oh-input w-100"}), + ) + user_active = forms.BooleanField( + initial=False, + required=False, + widget=forms.CheckboxInput(), + ) + vip = forms.BooleanField( + initial=False, + required=False, + widget=forms.CheckboxInput(), + ) + validity_enable = forms.BooleanField( + initial=False, + required=False, + widget=forms.CheckboxInput(), + ) + validity_end_date = forms.DateField( + required=False, + widget=forms.DateInput( + attrs={"type": "date", "class": "oh-input w-100 form-control"} + ), + ) + by_pass_finger = forms.BooleanField( + initial=False, + required=False, + widget=forms.CheckboxInput(), + ) + + def clean(self): + cleaned_data = super().clean() + if len(cleaned_data["name"]) > 15: + raise forms.ValidationError( + "Maximum 15 characters allowed for Name in COSEC Biometric Device" + ) + if cleaned_data["validity_enable"]: + if cleaned_data.get("validity_end_date") is None: + raise forms.ValidationError( + "When the Validity field is enabled, a Validity End Date is required." + ) + + +class DahuaUserForm(Form): + """ + This form is used to map a Horilla employee to a user entry on a Dahua biometric device. + """ + + CARD_STATUS_CHOICES = [ + (0, "Normal"), + (1 << 0, "Reported for loss"), + (1 << 1, "Canceled"), + (1 << 2, "Frozen"), + (1 << 3, "Arrearage"), + (1 << 4, "Overdue"), + (1 << 5, "Pre-arrearage (The door still can be unlocked with a voice prompt)"), + ] + CARD_TYPE_CHOICES = [ + (0, "Ordinary card"), + (1, "VIP card"), + (2, "Guest card"), + (3, "Patrol card"), + (4, "Blocklist card"), + (5, "Duress card"), + ] + employee = forms.ModelChoiceField( + queryset=Employee.objects.all(), + widget=forms.Select(), + label=_("Employee"), + ) + card_no = forms.CharField(max_length=50, required=True, label=_("Card Number")) + user_id = forms.CharField(max_length=50, required=True, label=_("User ID")) + card_status = forms.ChoiceField( + choices=CARD_STATUS_CHOICES, + required=False, + label=_("Card Status"), + initial=0, + ) + card_type = forms.ChoiceField( + choices=CARD_TYPE_CHOICES, required=False, label=_("Card Type") + ) + password = forms.CharField(max_length=50, required=False, label=_("Password")) + forms.DateTimeField( + widget=forms.DateTimeInput( + attrs={"class": "oh-input w-100", "type": "datetime-local"} + ), + ) + valid_date_start = forms.DateTimeField( + required=False, + label=_("Valid Date Start"), + widget=forms.DateTimeInput( + attrs={"class": "oh-input w-100", "type": "datetime-local"} + ), + ) + valid_date_end = forms.DateTimeField( + required=False, + label=_("Valid Date End"), + widget=forms.DateTimeInput( + attrs={"class": "oh-input w-100", "type": "datetime-local"} + ), + ) + + def __init__(self, *args, **kwargs): + self.request = getattr(_thread_locals, "request") + self.device_id = ( + self.request.resolver_match.kwargs.get("device_id", None) + if self.request.resolver_match + else None + ) + super().__init__(*args, **kwargs) + reload_queryset(self.fields) + self.fields["employee"].widget.attrs.update( + { + "hx-include": "#dahuaBiometricUserForm", + "hx-target": "#id_user_id", + "hx-swap": "outerHTML", + "hx-trigger": "change", + "hx-get": "/biometric/find-employee-badge-id", + } + ) + + def clean(self): + cleaned_data = super().clean() + device = None + error_fields = {} + card_no = cleaned_data.get("card_no") + user_id = cleaned_data.get("user_id") + + if self.device_id: + device = BiometricDevices.find(self.device_id) + + if card_no and device: + if BiometricEmployees.objects.filter( + dahua_card_no=card_no, device_id=device + ).exists(): + error_fields["card_no"] = _("This Card Number already exists.") + + if user_id and device: + if BiometricEmployees.objects.filter( + user_id=user_id, device_id=device + ).exists(): + error_fields["user_id"] = _("This User ID already exists.") + + if error_fields: + raise forms.ValidationError(error_fields) + + return cleaned_data + + +class MapBioUsers(ModelForm): + """ + Form for mapping biometric users to Horilla employees. + + This form is used to associate a biometric user (from a biometric device) with + an employee in the Horilla system. + """ + + class Meta: + """ + Meta class to add additional options + """ + + model = BiometricEmployees + fields = ["employee_id", "user_id"] + + def __init__(self, *args, **kwargs): + self.request = getattr(_thread_locals, "request") + self.device_id = ( + self.request.resolver_match.kwargs.get("device_id", None) + if self.request.resolver_match + else None + ) + super().__init__(*args, **kwargs) + if self.device_id: + already_mapped_employees = BiometricEmployees.objects.filter( + device_id=self.device_id + ).values_list("employee_id", flat=True) + self.fields["employee_id"].queryset = Employee.objects.exclude( + Q(id__in=already_mapped_employees) | Q(is_active=False) + ) + self.fields["user_id"].required = True + + def clean(self): + cleaned_data = super().clean() + user_id = cleaned_data.get("user_id") + user_id_label = self.fields["user_id"].label or "User ID" + if self.device_id and user_id: + if BiometricEmployees.objects.filter( + user_id=user_id, device_id=self.device_id + ).exists(): + raise forms.ValidationError( + { + "user_id": _( + "This biometric %(label)s is already mapped with an employee" + ) + % {"label": user_id_label} + } + ) + + return cleaned_data diff --git a/biometric/models.py b/biometric/models.py new file mode 100644 index 0000000..40016df --- /dev/null +++ b/biometric/models.py @@ -0,0 +1,302 @@ +# pylint: disable=too-few-public-methods +""" +This module contains Django models for managing biometric devices +and employee attendance within a company. +""" + +import uuid + +import requests +from django.core.exceptions import ValidationError +from django.core.validators import MaxValueValidator +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from base.horilla_company_manager import HorillaCompanyManager +from base.models import Company +from employee.models import Employee +from horilla.models import HorillaModel + + +def validate_schedule_time_format(value): + """ + this method is used to validate the format of duration like fields. + """ + if len(value) > 6: + raise ValidationError(_("Invalid format, it should be HH:MM format")) + try: + hour, minute = value.split(":") + hour = int(hour) + minute = int(minute) + if len(str(hour)) > 3 or minute not in range(60): + raise ValidationError(_("Invalid time")) + if hour == 0 and minute == 0: + raise ValidationError(_("Both hour and minute cannot be zero")) + except ValueError as error: + raise ValidationError(_("Invalid format, it should be HH:MM format")) from error + + +class BiometricDevices(HorillaModel): + """ + Model: BiometricDevices + + Represents a biometric device used for attendance tracking within a + company. Each device can be of different types such as ZKTeco Biometric, + Anviz Biometric, or Matrix COSEC Biometric.The model includes fields for + device details, authentication credentials, scheduling information, and + company association. + """ + + BIO_DEVICE_TYPE = [ + ("zk", _("ZKTeco / eSSL Biometric")), + ("anviz", _("Anviz Biometric")), + ("cosec", _("Matrix COSEC Biometric")), + ("dahua", _("Dahua Biometric")), + ("etimeoffice", _("e-Time Office")), + ] + BIO_DEVICE_DIRECTION = [ + ("in", _("In Device")), + ("out", _("Out Device")), + ("alternate", _("Alternate In/Out Device")), + ("system", _("System Direction(In/Out) Device")), + ] + id = models.UUIDField(default=uuid.uuid4, primary_key=True, editable=False) + name = models.CharField(max_length=100, verbose_name=_("Name")) + machine_type = models.CharField( + max_length=18, choices=BIO_DEVICE_TYPE, null=True, verbose_name=_("Device Type") + ) + machine_ip = models.CharField( + max_length=150, null=True, blank=True, default="", verbose_name=_("Machine IP") + ) + port = models.IntegerField(null=True, blank=True, verbose_name=_("Port No")) + zk_password = models.CharField( + max_length=100, null=True, blank=True, default="0", verbose_name=_("Password") + ) + bio_username = models.CharField( + max_length=100, null=True, blank=True, default="", verbose_name=_("Username") + ) + bio_password = models.CharField( + max_length=100, null=True, blank=True, verbose_name=_("Password") + ) + anviz_request_id = models.CharField( + max_length=200, null=True, blank=True, verbose_name=_("Request ID") + ) + api_url = models.CharField( + max_length=200, null=True, blank=True, verbose_name=_("API Url") + ) + api_key = models.CharField( + max_length=100, null=True, blank=True, verbose_name=_("API Key") + ) + api_secret = models.CharField( + max_length=100, null=True, blank=True, verbose_name=_("API Secret") + ) + api_token = models.CharField(max_length=500, null=True, blank=True) + api_expires = models.CharField(max_length=100, null=True, blank=True) + is_live = models.BooleanField(default=False, verbose_name=_("Is Live")) + is_scheduler = models.BooleanField(default=False, verbose_name=_("Is Scheduled")) + scheduler_duration = models.CharField( + null=True, + default="00:00", + max_length=10, + validators=[validate_schedule_time_format], + ) + last_fetch_date = models.DateField(null=True, blank=True) + last_fetch_time = models.TimeField(null=True, blank=True) + device_direction = models.CharField( + max_length=50, + choices=BIO_DEVICE_DIRECTION, + default="system", + verbose_name=_("Device Direction"), + ) + company_id = models.ForeignKey( + Company, + null=True, + editable=True, + on_delete=models.PROTECT, + verbose_name=_("Company"), + ) + + objects = HorillaCompanyManager() + + def __str__(self): + return f"{self.name} - {self.machine_type}" + + def clean(self, *args, **kwargs): + super().clean(*args, **kwargs) + required_fields = {} + + if self.machine_type in ("zk", "cosec", "dahua"): + if not self.machine_ip: + required_fields["machine_ip"] = _( + "The Machine IP is required for the selected biometric device." + ) + if not self.port: + required_fields["port"] = _( + "The Port Number is required for the selected biometric device." + ) + + if self.machine_type == "zk": + if not self.zk_password: + required_fields["zk_password"] = _( + "The password is required for ZKTeco Biometric Device." + ) + else: + try: + int(self.zk_password) + except ValueError: + required_fields["zk_password"] = _( + "The password must be an integer (numeric) value for\ + ZKTeco Biometric Device." + ) + + if self.machine_type in ("cosec", "dahua"): + if not self.bio_username: + required_fields["bio_username"] = _( + "The Username is required for the selected biometric device." + ) + if not self.bio_password: + required_fields["bio_password"] = _( + "The Password is required for the selected biometric device." + ) + + if self.machine_type == "anviz": + if not self.anviz_request_id: + required_fields["anviz_request_id"] = _( + "The Request ID is required for the Anviz Biometric Device." + ) + if not self.api_url: + required_fields["api_url"] = _( + "The API URL is required for Anviz Biometric Device." + ) + if not self.api_key: + required_fields["api_key"] = _( + "The API Key is required for Anviz Biometric Device." + ) + if not self.api_secret: + required_fields["api_secret"] = _( + "The API Secret is required for Anviz Biometric Device." + ) + if self.anviz_request_id and self.api_key and self.api_secret: + payload = { + "header": { + "nameSpace": "authorize.token", + "nameAction": "token", + "version": "1.0", + "requestId": self.anviz_request_id, + "timestamp": "2022-10-21T07:39:07+00:00", + }, + "payload": {"api_key": self.api_key, "api_secret": self.api_secret}, + } + error = { + "header": {"nameSpace": "System", "name": "Exception"}, + "payload": {"type": "AUTH_ERROR", "message": "AUTH_ERROR"}, + } + try: + response = requests.post( + self.api_url, + json=payload, + timeout=10, + ) + if response.status_code != 200: + raise ValidationError( + {f"API call failed with status code {response.status_code}"} + ) + + api_response = response.json() + if api_response == error: + raise ValidationError( + { + "api_url": _( + "Authentication failed. Please check your API Url\ + , API Key and API Secret." + ) + } + ) + + payload = api_response["payload"] + api_token = payload["token"] + api_expires = payload["expires"] + self.api_token = api_token + self.api_expires = api_expires + except Exception as exc: + raise ValidationError( + { + "api_url": _( + "Authentication failed. Please check your API Url , API Key\ + and API Secret." + ) + } + ) from exc + if required_fields: + raise ValidationError(required_fields) + + class Meta: + """ + Meta class to add additional options + """ + + verbose_name = _("Biometric Device") + verbose_name_plural = _("Biometric Devices") + + +class BiometricEmployees(models.Model): + """ + Model: BiometricEmployees + + Description: + Represents the association between employees and biometric devices for + attendance tracking within a company.Each entry in this model maps an + employee to a specific biometric device. + """ + + id = models.UUIDField(default=uuid.uuid4, primary_key=True, editable=False) + uid = models.IntegerField(null=True, blank=True) + ref_user_id = models.IntegerField( + null=True, blank=True, validators=[MaxValueValidator(99999999)] + ) + user_id = models.CharField(max_length=100, verbose_name=_("User ID")) + dahua_card_no = models.CharField(max_length=100, null=True, blank=True) + employee_id = models.ForeignKey( + Employee, on_delete=models.CASCADE, verbose_name=_("Employee") + ) + device_id = models.ForeignKey( + BiometricDevices, on_delete=models.CASCADE, null=True, blank=True + ) + objects = models.Manager() + + def __str__(self): + return f"{self.employee_id} - {self.user_id} - {self.device_id}" + + class Meta: + """ + Meta class to add additional options + """ + + verbose_name = _("Employee in Biometric Device") + verbose_name_plural = _("Employees in Biometric Device") + + +class COSECAttendanceArguments(models.Model): + """ + Model: COSECAttendanceArguments + + Description: + Represents arguments related to attendance fetching for COSEC biometric + devices within a company.This model stores information such as the last + fetched roll-over count and sequence number for COSEC devices. + """ + + id = models.UUIDField(default=uuid.uuid4, primary_key=True, editable=False) + last_fetch_roll_ovr_count = models.CharField(max_length=100, null=True) + last_fetch_seq_number = models.CharField(max_length=100, null=True) + device_id = models.ForeignKey( + BiometricDevices, on_delete=models.CASCADE, null=True, blank=True + ) + objects = models.Manager() + + class Meta: + verbose_name = _("COSEC Attendance Arguments") + verbose_name_plural = _("COSEC Attendance Arguments") + + def __str__(self): + return f"{self.device_id} - {self.last_fetch_roll_ovr_count} - {self.last_fetch_seq_number}" diff --git a/biometric/settings.py b/biometric/settings.py new file mode 100644 index 0000000..15ffc5c --- /dev/null +++ b/biometric/settings.py @@ -0,0 +1,12 @@ +""" +This module extends the Django settings related to templates to include a +custom context processor for biometric functionality. +It imports the `TEMPLATES` setting from `horilla.settings` and appends a +custom context processor path to it. +""" + +from horilla.settings import TEMPLATES + +TEMPLATES[0]["OPTIONS"]["context_processors"].append( + "biometric.context_processors.biometric_is_installed", +) diff --git a/biometric/sidebar.py b/biometric/sidebar.py new file mode 100644 index 0000000..7bad1b4 --- /dev/null +++ b/biometric/sidebar.py @@ -0,0 +1,43 @@ +""" +Biometric App sidebar configuration +""" + +from django.urls import reverse_lazy +from django.utils.translation import gettext_lazy as trans + +from attendance.sidebar import SUBMENUS +from base.context_processors import biometric_app_exists +from biometric.context_processors import biometric_is_installed + +biometric_submenu = { + "menu": trans("Biometric Devices"), + "redirect": reverse_lazy("view-biometric-devices"), + "accessibility": "biometric.sidebar.biometric_device_accessibility", +} + +SUBMENUS.insert(1, biometric_submenu) + + +def biometric_device_accessibility(request, submenu, user_perms, *args, **kwargs): + """ + Determine if the biometric device submenu should be accessible to the user. + + This function checks if the biometric app exists, if the user has the + necessary permissions to view biometric devices, and if the biometric + system is installed. + + Args: + request: The HTTP request object. + submenu: The submenu being accessed. + user_perms: The permissions of the user. + *args: Additional arguments. + **kwargs: Additional keyword arguments. + + Returns: + bool: True if the submenu should be accessible, False otherwise. + """ + return ( + biometric_app_exists(None).get("biometric_app_exists") + and request.user.has_perm("biometric.view_biometricdevices") + and biometric_is_installed(None)["is_installed"] + ) diff --git a/biometric/urls.py b/biometric/urls.py new file mode 100644 index 0000000..240bbf0 --- /dev/null +++ b/biometric/urls.py @@ -0,0 +1,156 @@ +""" +Module: urls + +Description: +This module defines URL patterns for routing HTTP requests to views +in the biometric management application. +It imports the `path` function from `django.urls` for defining URL +patterns and imports views for handling requests. +Additionally, it imports the `BiometricDevices` model for use in URL patterns +that require device IDs. + +""" + +from django.urls import path + +from . import views +from .models import BiometricDevices + +urlpatterns = [ + path( + "view-biometric-devices/", + views.biometric_devices_view, + name="view-biometric-devices", + ), + path( + "biometric-device-live-capture", + views.biometric_device_live, + name="biometric-device-live-capture", + ), + path( + "biometric-device-schedule//", + views.biometric_device_schedule, + name="biometric-device-schedule", + ), + path( + "biometric-device-unschedule//", + views.biometric_device_unschedule, + name="biometric-device-unschedule", + ), + path( + "biometric-device-test//", + views.biometric_device_test, + name="biometric-device-test", + ), + path( + "biometric-device-fetch-logs//", + views.biometric_device_fetch_logs, + name="biometric-device-fetch-logs", + ), + path( + "biometric-device-bulk-fetch-logs/", + views.biometric_device_bulk_fetch_logs, + name="biometric-device-bulk-fetch-logs", + ), + path( + "biometric-device-add", + views.biometric_device_add, + name="biometric-device-add", + ), + path( + "biometric-device-edit//", + views.biometric_device_edit, + name="biometric-device-edit", + ), + path( + "biometric-device-delete//", + views.biometric_device_delete, + name="biometric-device-delete", + ), + path( + "biometric-device-archive//", + views.biometric_device_archive, + name="biometric-device-archive", + ), + path( + "biometric-device-employees//", + views.biometric_device_employees, + name="biometric-device-employees", + kwargs={"model": BiometricDevices}, + ), + path( + "search-employee-in-device", + views.search_employee_device, + name="search-employee-in-device", + ), + path( + "find-employee-badge-id", + views.find_employee_badge_id, + name="find-employee-badge-id", + ), + path( + "add-biometric-user//", + views.add_biometric_user, + name="add-biometric-user", + ), + path( + "map-biometric-users//", + views.map_biometric_users, + name="map-biometric-users", + ), + path( + "add-dahua-biometric-user//", + views.add_dahua_biometric_user, + name="add-dahua-biometric-user", + ), + path( + "delete-dahua-user/", + views.delete_dahua_user, + name="delete-dahua-user", + ), + path( + "delete-dahua-user", + views.delete_dahua_user, + name="delete-dahua-user", + ), + path( + "delete-etimeoffice-user", + views.delete_etimeoffice_user, + name="delete-etimeoffice-user", + ), + path( + "delete-etimeoffice-user/", + views.delete_etimeoffice_user, + name="delete-etimeoffice-user", + ), + path( + "enable-cosec-face-recognition///", + views.enable_cosec_face_recognition, + name="enable-cosec-face-recognition", + ), + path( + "edit-cosec-user///", + views.edit_cosec_user, + name="edit-cosec-user", + ), + path( + "delete-biometric-user///", + views.delete_biometric_user, + name="delete-biometric-user", + ), + path( + "delete-cosec-user///", + views.delete_horilla_cosec_user, + name="delete-cosec-user", + ), + path( + "biometric-users-bulk-delete", + views.bio_users_bulk_delete, + name="biometric-users-bulk-delete", + ), + path( + "cosec-users-bulk-delete", + views.cosec_users_bulk_delete, + name="cosec-users-bulk-delete", + ), +] diff --git a/biometric/views.py b/biometric/views.py new file mode 100644 index 0000000..cb752a8 --- /dev/null +++ b/biometric/views.py @@ -0,0 +1,2683 @@ +""" +Module for managing biometric devices and employee attendance. + +Includes classes and functions for adding, editing, and deleting biometric devices, +as well as scheduling attendance capture. Also provides views for managing employees, +registered on biometric devices. +""" + +import json +import logging +from datetime import datetime, timedelta +from threading import Event, Thread +from urllib.parse import parse_qs, unquote + +import pytz +from apscheduler.schedulers.background import BackgroundScheduler +from django.contrib import messages +from django.http import HttpResponse, JsonResponse +from django.shortcuts import redirect, render +from django.template.loader import render_to_string +from django.utils import timezone as django_timezone +from django.utils.translation import gettext as __ +from django.utils.translation import gettext_lazy as _ +from zk import ZK +from zk import exception as zk_exception + +from attendance.methods.utils import Request +from attendance.models import AttendanceActivity +from attendance.views.clock_in_out import clock_in, clock_out +from base.methods import get_key_instances, get_pagination +from employee.models import Employee, EmployeeWorkInformation +from horilla.decorators import ( + hx_request_required, + install_required, + login_required, + permission_required, +) +from horilla.filters import HorillaPaginator +from horilla.horilla_settings import BIO_DEVICE_THREADS +from horilla.settings import TIME_ZONE + +from .anviz import CrossChexCloudAPI +from .cosec import COSECBiometric +from .dahua import DahuaAPI +from .etimeoffice import ETimeOfficeAPI +from .filters import BiometricDeviceFilter +from .forms import ( + BiometricDeviceForm, + BiometricDeviceSchedulerForm, + CosecUserAddForm, + COSECUserForm, + DahuaUserForm, + EmployeeBiometricAddForm, + MapBioUsers, +) +from .models import BiometricDevices, BiometricEmployees, COSECAttendanceArguments + +logger = logging.getLogger(__name__) + + +def str_time_seconds(time): + """ + this method is used reconvert time in H:M formate string back to seconds and return it + args: + time : time in H:M format + """ + + ftr = [3600, 60, 1] + return sum(a * b for a, b in zip(ftr, map(int, time.split(":")))) + + +def paginator_qry(qryset, page_number): + """ + This method is used to paginate query set + """ + paginator = HorillaPaginator(qryset, get_pagination()) + qryset = paginator.get_page(page_number) + return qryset + + +def biometric_paginator_qry(data_list, page_number, per_page=25): + """ + This function is used to paginate a list of dictionaries. + """ + start_index = (page_number - 1) * per_page + end_index = page_number * per_page + paginated_data = {} + paginated_data["users"] = data_list[start_index:end_index] + + total_items = len(data_list) + total_pages = (total_items + per_page - 1) // per_page + has_previous = page_number > 1 + has_next = page_number < total_pages + + paginated_data["paginator"] = { + "number": page_number, + "previous_page_number": page_number - 1 if has_previous else None, + "next_page_number": page_number + 1 if has_next else None, + "num_pages": total_pages, + "has_previous": has_previous, + "has_next": has_next, + } + return paginated_data + + +def biometric_set_time(conn): + """ + Sets the time on the biometric device using the provided connection. + + Parameters: + - conn: The connection to the biometric device. + + Returns: + None + """ + new_time = datetime.today() + conn.set_time(new_time) + + +class ZKBioAttendance(Thread): + """ + Represents a thread for capturing live attendance data from a ZKTeco biometric device. + + Attributes: + - machine_ip: The IP address of the ZKTeco biometric device. + - port_no: The port number for communication with the ZKTeco biometric device. + - conn: The connection object to the ZKTeco biometric device. + - _stop_event: Event flag to signal thread termination. + + Methods: + - run(): Overrides the run method of the Thread class to capture live attendance data. + - stop(): Sets the _stop_event to signal the thread to stop gracefully. + """ + + def __init__(self, machine_ip, port_no, password): + super().__init__() + self.machine_ip = machine_ip + self.port_no = port_no + self.password = int(password) + self._stop_event = Event() # Initialize stop event + self.conn = None + + def run(self): + try: + zk_device = ZK( + self.machine_ip, + port=self.port_no, + timeout=5, + password=self.password, + force_udp=False, + ommit_ping=False, + ) + patch_direction = {"in": 0, "out": 1} + conn = zk_device.connect() + self.conn = conn + if conn: + device = BiometricDevices.objects.filter( + machine_ip=self.machine_ip, port=self.port_no + ).first() + if device and device.is_live: + while not self._stop_event.is_set(): + attendances = conn.live_capture() + for attendance in attendances: + if attendance: + user_id = attendance.user_id + punch_code = ( + patch_direction[device.device_direction] + if device.device_direction in patch_direction + else attendance.punch + ) + date_time = django_timezone.make_aware( + attendance.timestamp + ) + # date_time = attendance.timestamp + date = date_time.date() + time = date_time.time() + device.last_fetch_date = date + device.last_fetch_time = time + device.save() + bio_id = BiometricEmployees.objects.filter( + user_id=user_id, device_id=device + ).first() + if bio_id: + if punch_code in {0, 3, 4}: + try: + clock_in( + Request( + user=bio_id.employee_id.employee_user_id, + date=date, + time=time, + datetime=date_time, + ) + ) + except Exception as error: + logger.error( + "Got an error in clock_in %s", error + ) + + continue + else: + try: + clock_out( + Request( + user=bio_id.employee_id.employee_user_id, + date=date, + time=time, + datetime=date_time, + ) + ) + except Exception as error: + logger.error( + "Got an error in clock_out", error + ) + continue + else: + continue + except ConnectionResetError as error: + ZKBioAttendance(self.machine_ip, self.port_no, self.password).start() + + def stop(self): + """To stop the ZK live capture mode""" + self.conn.end_live_capture = True + + +class COSECBioAttendanceThread(Thread): + """ + A thread class that handles the real-time retrieval and processing of + biometric attendance data from a COSEC biometric device. + + Attributes: + device_id (int): The ID of the biometric device to interact with. + _stop_event (threading.Event): An event to signal when to stop the thread. + + Methods: + run(): + Continuously fetches attendance data from the COSEC device, processes + it, and updates the last fetched sequence and rollover count. + + stop(): + Signals the thread to stop by setting the _stop_event. + """ + + def __init__(self, device_id): + super().__init__() + self.device_id = device_id + self._stop_event = Event() + + def run(self): + try: + device = BiometricDevices.objects.get(id=self.device_id) + if not device.is_live: + return + + device_args = COSECAttendanceArguments.objects.filter( + device_id=device + ).first() + last_fetch_roll_ovr_count = ( + int(device_args.last_fetch_roll_ovr_count) if device_args else 0 + ) + last_fetch_seq_number = ( + int(device_args.last_fetch_seq_number) if device_args else 1 + ) + + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + timeout=10, + ) + while not self._stop_event.is_set(): + attendances = cosec.get_attendance_events( + last_fetch_roll_ovr_count, int(last_fetch_seq_number) + 1 + ) + if not isinstance(attendances, list): + self._stop_event.wait(5) + continue + + for attendance in attendances: + ref_user_id = attendance["detail-1"] + employee = BiometricEmployees.objects.filter( + ref_user_id=ref_user_id + ).first() + if not employee: + continue + + date_str = attendance["date"] + time_str = attendance["time"] + attendance_date = datetime.strptime(date_str, "%d/%m/%Y").date() + attendance_time = datetime.strptime(time_str, "%H:%M:%S").time() + attendance_datetime = datetime.combine( + attendance_date, attendance_time + ) + punch_code = attendance["detail-2"] + + request_data = Request( + user=employee.employee_id.employee_user_id, + date=attendance_date, + time=attendance_time, + datetime=django_timezone.make_aware(attendance_datetime), + ) + try: + if punch_code in ["1", "3", "5", "7", "9", "0"]: + clock_in(request_data) + elif punch_code in ["2", "4", "6", "8", "10"]: + clock_out(request_data) + except Exception as error: + logger.error("Error processing attendance: ", error) + + if attendances: + last_attendance = attendances[-1] + last_fetch_seq_number = last_attendance["seq-No"] + last_fetch_roll_ovr_count = last_attendance["roll-over-count"] + COSECAttendanceArguments.objects.update_or_create( + device_id=device, + defaults={ + "last_fetch_roll_ovr_count": last_fetch_roll_ovr_count, + "last_fetch_seq_number": last_fetch_seq_number, + }, + ) + # Sleep to prevent overwhelming the device with requests + self._stop_event.wait(2) + + except Exception as error: + device = BiometricDevices.objects.get(id=self.device_id) + device.is_live = False + device.save() + logger.error("Error in COSECBioAttendanceThread: ", error) + + def stop(self): + """Set the stop event to signal the thread to stop gracefully.""" + self._stop_event.set() + + +@login_required +@install_required +@permission_required("biometric.view_biometricdevices") +def biometric_devices_view(request): + """ + Renders and filters the list of biometric devices based on query parameters. + + Handles both initial page load and HTMX-based filter/search requests. + + Template: + - "biometric/view_biometric_devices.html" + + Context: + - biometric_form (BiometricDeviceForm): Form for adding new biometric devices. + - devices (QuerySet): Filtered and paginated queryset of biometric devices. + - f (BiometricDeviceFilter): Filter form. + - pd (str): URL-encoded query params for HTMX push. + - filter_dict (dict): Parsed filter query params. + """ + previous_data = request.GET.urlencode() + is_active = request.GET.get("is_active") + + # Apply filters + filter_form = BiometricDeviceFilter(request.GET) + biometric_devices = filter_form.qs.order_by("-created_at") + + # Default to is_active=True if not specified or "unknown" + if not is_active or is_active == "unknown": + biometric_devices = biometric_devices.filter(is_active=True) + + # Paginate + biometric_devices = paginator_qry(biometric_devices, request.GET.get("page")) + + # Parse filters for reuse + data_dict = parse_qs(previous_data) + get_key_instances(BiometricDevices, data_dict) + + # Render + return render( + request, + "biometric/view_biometric_devices.html", + { + "biometric_form": BiometricDeviceForm(), + "devices": biometric_devices, + "f": filter_form, + "pd": previous_data, + "filter_dict": data_dict, + }, + ) + + +@login_required +@install_required +@permission_required("biometric.change_biometricdevices") +def biometric_device_schedule(request, device_id): + """ + Handles scheduling of attendance capture from a biometric device. + + Parameters: + - request (HttpRequest): The HTTP request object. + - device_id (uuid): The ID of the biometric device for which scheduling is being done. + + Returns: + - HttpResponse: HTML response indicating success or failure of the scheduling operation. + """ + device = BiometricDevices.objects.get(id=device_id) + initial_data = {"scheduler_duration": device.scheduler_duration} + scheduler_form = BiometricDeviceSchedulerForm(initial=initial_data) + context = { + "scheduler_form": scheduler_form, + "device_id": device_id, + } + if request.method == "POST": + scheduler_form = BiometricDeviceSchedulerForm(request.POST) + if scheduler_form.is_valid(): + if device.machine_type == "zk": + try: + port_no = device.port + machine_ip = device.machine_ip + password = device.zk_password + conn = None + zk_device = ZK( + machine_ip, + port=port_no, + timeout=5, + password=int(password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + conn.test_voice(index=0) + duration = request.POST.get("scheduler_duration") + device = BiometricDevices.objects.get(id=device_id) + device.scheduler_duration = duration + device.is_scheduler = True + device.is_live = False + device.save() + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: zk_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + return HttpResponse("") + except Exception as error: + logger.error("An error comes in biometric_device_schedule ", error) + script = """ + + """ + return HttpResponse(script) + elif device.machine_type == "anviz": + duration = request.POST.get("scheduler_duration") + device.is_scheduler = True + device.scheduler_duration = duration + device.save() + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: anviz_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + return HttpResponse("") + elif device.machine_type == "dahua": + duration = request.POST.get("scheduler_duration") + device.is_scheduler = True + device.is_live = False + device.scheduler_duration = duration + device.save() + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: dahua_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + return HttpResponse("") + elif device.machine_type == "cosec": + duration = request.POST.get("scheduler_duration") + device.is_scheduler = True + device.is_live = False + device.scheduler_duration = duration + device.save() + scheduler = BackgroundScheduler() + existing_thread = BIO_DEVICE_THREADS.get(device.id) + if existing_thread: + existing_thread.stop() + del BIO_DEVICE_THREADS[device.id] + scheduler.add_job( + lambda: cosec_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + return HttpResponse("") + elif device.machine_type == "etimeoffice": + duration = request.POST.get("scheduler_duration") + device.is_scheduler = True + device.is_live = False + device.scheduler_duration = duration + device.save() + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: etimeoffice_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + return HttpResponse("") + else: + return HttpResponse("") + + context["scheduler_form"] = scheduler_form + response = render(request, "biometric/scheduler_device_form.html", context) + return HttpResponse( + response.content.decode("utf-8") + + "" + ) + return render(request, "biometric/scheduler_device_form.html", context) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.change_biometricdevices") +def biometric_device_unschedule(request, device_id): + """ + Handles unschedule of attendance capture for a biometric device. + + Parameters: + - request (HttpRequest): The HTTP request object. + - device_id (uuid): The ID of the biometric device for which unscheduling is being done. + + Returns: + - HttpResponseRedirect: Redirects to the biometric devices view after unscheduling. + """ + previous_data = request.GET.urlencode() + device = BiometricDevices.objects.get(id=device_id) + device.is_scheduler = False + device.save() + messages.success(request, _("Biometric device unscheduled successfully")) + return redirect(f"/biometric/view-biometric-devices/?{previous_data}") + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.add_biometricdevices") +def biometric_device_add(request): + """ + Handles the addition of a new biometric device. + + Parameters: + - request (HttpRequest): The HTTP request object containing data about the request. + + Returns: + - HttpResponse: Renders the 'add_biometric_device.html' template with the biometric device form. + """ + previous_data = unquote(request.GET.urlencode()) + previous_data = ( + previous_data[3:] if previous_data.startswith("pd=") else previous_data + ) + biometric_form = BiometricDeviceForm() + if request.method == "POST": + biometric_form = BiometricDeviceForm(request.POST) + if biometric_form.is_valid(): + biometric_form.save() + messages.success(request, _("Biometric device added successfully.")) + biometric_form = BiometricDeviceForm() + context = {"biometric_form": biometric_form, "pd": previous_data} + return render(request, "biometric/biometric_device_form.html", context) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.change_biometricdevices") +def biometric_device_edit(request, device_id): + """ + Handles the editing of an existing biometric device. + + Parameters: + - request (HttpRequest): The HTTP request object containing data about the request. + - device_id (uuid): The ID of the biometric device to be edited. + + Returns: + - HttpResponse: Renders the 'edit_biometric_device.html' template with the biometric + device form pre-filled with existing data. + """ + device = BiometricDevices.find(device_id) + if not device: + messages.error(request, _("Biometric device not found.")) + return render(request, "biometric/biometric_device_form.html") + biometric_form = BiometricDeviceForm(instance=device) + if request.method == "POST": + biometric_form = BiometricDeviceForm(request.POST, instance=device) + if biometric_form.is_valid(): + biometric_form.save() + messages.success(request, _("Biometric device updated successfully.")) + context = { + "biometric_form": biometric_form, + "device_id": device_id, + } + return render(request, "biometric/biometric_device_form.html", context) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.change_biometricdevices") +def biometric_device_archive(request, device_id): + """ + This method is used to archive or un-archive devices + """ + previous_data = request.GET.urlencode() + device_obj = BiometricDevices.find(device_id) + if not device_obj: + messages.error(request, _("Biometric device not found.")) + return redirect(f"/biometric/view-biometric-devices/?{previous_data}") + device_obj.is_active = not device_obj.is_active + device_obj.save() + message = _("archived") if not device_obj.is_active else _("un-archived") + messages.success(request, _("Device is %(message)s") % {"message": message}) + return redirect(f"/biometric/view-biometric-devices/?{previous_data}") + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.delete_biometricdevices") +def biometric_device_delete(request, device_id): + """ + Handles the deletion of a biometric device. + + Parameters: + - request (HttpRequest): The HTTP request object containing data about the request. + - device_id (uuid): The ID of the biometric device to be deleted. + + Returns: + - HttpResponseRedirect: Redirects to the 'view-biometric-devices' page after deleting the + biometric device. + + """ + previous_data = request.GET.urlencode() + device_obj = BiometricDevices.find(device_id) + if not device_obj: + messages.error(request, _("Biometric device not found.")) + return redirect(f"/biometric/view-biometric-devices/?{previous_data}") + device_obj.delete() + messages.success(request, _("Biometric device deleted successfully.")) + return redirect(f"/biometric/view-biometric-devices/?{previous_data}") + + +def render_connection_response(title, text, icon): + """ + Helper function to render the connection + response from device test connection. + """ + context = { + "title": title, + "text": text, + "icon": icon, + } + return render_to_string("biometric/test_connection_script.html", context) + + +def test_zkteco_connection(device): + """Test connection for ZKTeco device.""" + conn = None + port_no = device.port + machine_ip = device.machine_ip + password = device.zk_password + zk_device = ZK( + machine_ip, + port=port_no, + timeout=60, + password=int(password), + force_udp=False, + ommit_ping=False, + ) + try: + conn = zk_device.connect() + conn.test_voice(index=0) + find_employees_in_zk(device.id) + return render_connection_response( + _("Connection Successful"), + _("ZKTeco test connection successful."), + "success", + ) + except zk_exception.ZKErrorResponse: + return render_connection_response( + _("Authentication Error"), + _("Double-check the provided IP, Port, and Password."), + "warning", + ) + except Exception: + logger.error("ZKTeco connection error", exc_info=True) + return render_connection_response( + _("Connection unsuccessful"), + _("Please check the IP, Port, and Password."), + "warning", + ) + finally: + if conn is not None: + conn.disconnect() + + +def test_anviz_connection(device): + """Test connection for Anviz device.""" + + try: + from .anviz import CrossChexCloudAPI + + anviz = CrossChexCloudAPI( + api_url=device.api_url, + api_key=device.api_key, + api_secret=device.api_secret, + anviz_request_id=device.anviz_request_id, + ) + test_response = anviz.test_connection() + + if test_response.get("token"): + device.api_token = test_response.get("token") + device.api_expires = test_response.get("expires") + device.save() + records = anviz.get_attendance_records() + + return render_connection_response( + _("Connection Successful"), + _("Anviz test connection successful."), + "success", + ) + else: + return render_connection_response( + _("Connection unsuccessful"), + _("API credentials might be incorrect."), + "warning", + ) + except Exception as error: + logger.error("Anviz connection error", exc_info=True) + return render_connection_response( + _("Connection unsuccessful"), _("API request failed."), "warning" + ) + + +def test_cosec_connection(device): + """Test connection for COSEC device.""" + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + timeout=10, + ) + response = cosec.basic_config() + if response.get("app"): + find_employees_in_cosec(device.id) + return render_connection_response( + _("Connection Successful"), + _("Matrix test connection successful."), + "success", + ) + else: + return render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided Machine IP, Username, and Password."), + "warning", + ) + + +def test_dahua_connection(device): + """Test connection for Dahua device.""" + dahua = DahuaAPI( + ip=device.machine_ip, + username=device.bio_username, + password=device.bio_password, + ) + response = dahua.get_system_info() + if response.get("status_code") == 200: + return render_connection_response( + _("Connection Successful"), + _("Dahua test connection successful."), + "success", + ) + else: + return render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided Machine IP, Username, and Password."), + "warning", + ) + + +def test_etimeoffice_connection(device): + """Test connection for e-TimeOffice device.""" + now = datetime.now() + etimeoffice = ETimeOfficeAPI( + username=device.bio_username, + password=device.bio_password, + ) + + from_date = f"{now.day:02d}/{now.month:02d}/{now.year}_00:00" + to_date = ( + f"{now.day:02d}/{now.month:02d}/{now.year}_{now.hour:02d}:{now.minute:02d}" + ) + + try: + response = etimeoffice.download_punch_data(from_date=from_date, to_date=to_date) + if response.get("Msg") == "Success": + return render_connection_response( + _("Connection Successful"), + _("e-Time Office test connection successful."), + "success", + ) + + error_msg = response.get("Message") + return render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided API Url, Username, and Password: {}").format( + error_msg + ), + "warning", + ) + + except Exception as e: + return render_connection_response( + _("Connection error"), + _(f"API request failed with exception: {str(e)}"), + "danger", + ) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.view_biometricdevices") +def biometric_device_test(request, device_id): + """ + Test the connection with the specified biometric device. + """ + + # Retrieve device and validate + device = BiometricDevices.objects.filter(id=device_id).first() + if not device: + return HttpResponse("Device not found.", status=404) + + script = "" + try: + if device.machine_type == "zk": + script = test_zkteco_connection(device) + elif device.machine_type == "anviz": + script = test_anviz_connection(device) + elif device.machine_type == "cosec": + script = test_cosec_connection(device) + elif device.machine_type == "dahua": + script = test_dahua_connection(device) + elif device.machine_type == "etimeoffice": + script = test_etimeoffice_connection(device) + else: + script = render_connection_response( + "Connection unsuccessful", + "Please select a valid biometric device.", + "warning", + ) + except Exception as error: + logger.error("Error in biometric_device_test", exc_info=True) + script = render_connection_response( + "Connection unsuccessful", "An unexpected error occurred.", "warning" + ) + + return HttpResponse(script) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.view_biometricdevices") +def biometric_device_bulk_fetch_logs(request): + script = "" + zk_ids = request.GET.getlist("selected_device_ids") + zk_devices = BiometricDevices.objects.filter(id__in=zk_ids, machine_type="zk") + + if not zk_devices: + messages.error(request, _("")) + script = render_connection_response( + _("Biometric device not supported."), + _( + "Bulk log fetching is currently available only for ZKTeco / eSSL devices. Support for other biometric systems will be added soon." + ), + "warning", + ) + return HttpResponse(script) + + attendance_count, error_message = zk_biometric_attendance_logs(zk_devices) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + elif "Authentication" in error_message: + script = render_connection_response( + _("Authentication Error"), + _("Double-check the provided IP, Port, and Password."), + "warning", + ) + else: + script = render_connection_response( + _("Connection Unsuccessful"), + _(f"Please check the IP, Port, and Password. Error: {error_message}"), + "warning", + ) + return HttpResponse(script) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.view_biometricdevices") +def biometric_device_fetch_logs(request, device_id=None): + """ + Fetches biometric attendance logs from a specified device. + + This view function connects to a biometric device based on the device type (zk, anviz, cosec, dahua, or etimeoffice) + and retrieves the attendance logs. + """ + device = BiometricDevices.find(device_id) + if not device: + return HttpResponse("Device not found.", status=404) + script = "" + if device.machine_type == "zk": + attendance_count, error_message = zk_biometric_attendance_logs(device) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + elif "Authentication" in error_message: + script = render_connection_response( + _("Authentication Error"), + _("Double-check the provided IP, Port, and Password."), + "warning", + ) + else: + script = render_connection_response( + _("Connection Unsuccessful"), + _(f"Please check the IP, Port, and Password. Error: {error_message}"), + "warning", + ) + elif device.machine_type == "anviz": + attendance_count = anviz_biometric_attendance_logs(device) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + else: + script = render_connection_response( + _("Connection unsuccessful"), + _("API credentials might be incorrect."), + "warning", + ) + + elif device.machine_type == "cosec": + attendance_count = cosec_biometric_attendance_logs(device) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + else: + script = render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided Machine IP, Username, and Password."), + "warning", + ) + elif device.machine_type == "dahua": + attendance_count = dahua_biometric_attendance_logs(device) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + else: + script = render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided Machine IP, Username, and Password."), + "warning", + ) + elif device.machine_type == "etimeoffice": + attendance_count = etimeoffice_biometric_attendance_logs(device) + if isinstance(attendance_count, int): + script = render_connection_response( + _("Logs Fetched Successfully"), + _( + f"Biometric attendance logs fetched successfully. Total records: {attendance_count}" + ), + "success", + ) + else: + script = render_connection_response( + _("Connection unsuccessful"), + _("Double-check the provided API Url, Username, and Password"), + "warning", + ) + else: + script = render_connection_response( + "Connection unsuccessful", + "Please select a valid biometric device.", + "warning", + ) + + return HttpResponse(script) + + +def zk_employees_fetch(device): + """ + Fetch employee data from the specified ZK biometric device. + + Parameters: + - device: Biometric device object containing machine IP, port, etc. + + Returns: + - list: A list of dictionaries, where each dictionary represents an employee + with associated data including user ID, employee ID, work email, + phone number, job position, badge ID, and fingerprint data. + """ + zk_device = ZK( + device.machine_ip, + port=device.port, + timeout=1, + password=int(device.zk_password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + conn.enable_device() + users = conn.get_users() + fingers = conn.get_templates() + + bio_employees = BiometricEmployees.objects.filter(device_id=device) + bio_lookup = {bio.user_id: bio for bio in bio_employees} + + employees = [] + for user in users: + user_id = user.user_id + uid = user.uid + bio_id = bio_lookup.get(user_id) + + if bio_id: + employee = bio_id.employee_id + employee_work_info = EmployeeWorkInformation.objects.filter( + employee_id=employee + ).first() + if employee_work_info: + work_email = ( + employee_work_info.email if employee_work_info.email else None + ) + phone = employee_work_info.mobile if employee_work_info.mobile else None + job_position = ( + employee_work_info.job_position_id + if employee_work_info.job_position_id + else None + ) + user.__dict__["work_email"] = work_email + user.__dict__["phone"] = phone + user.__dict__["job_position"] = job_position + else: + user.__dict__["work_email"] = None + user.__dict__["phone"] = None + user.__dict__["job_position"] = None + user.__dict__["employee"] = employee + user.__dict__["badge_id"] = employee.badge_id + finger_print = [] + for finger in fingers: + if finger.uid == uid: + finger_print.append(finger.fid) + if not finger_print: + finger_print = [] + user.__dict__["finger"] = finger_print + employees.append(user) + return employees + + +def cosec_employee_fetch(device_id): + """ + Fetch employee data from the COSEC biometric device associated with the specified device ID. + + Parameters: + - device_id: ID of the biometric device. + + Returns: + - list: A list of dictionaries, where each dictionary represents an employee with associated + data including user ID, employee ID, finger count, and card count. + """ + users = [] + device = BiometricDevices.objects.get(id=device_id) + employees = BiometricEmployees.objects.filter(device_id=device) + cosec = COSECBiometric( + device.machine_ip, device.port, device.bio_username, device.bio_password + ) + for employee in employees: + user = cosec.get_cosec_user(user_id=employee.user_id) + if user.get("user-id"): + user["employee_id"] = employee.employee_id + user_credential = cosec.get_user_credential_count(user_id=employee.user_id) + user["finger-count"] = user_credential.get("finger-count") + user["face-count"] = user_credential.get("face-count") + user["card-count"] = user_credential.get("card-count") + new_dict = {} + for key, value in user.items(): + new_key = key.replace("-", "_") + new_dict[new_key] = value + users.append(new_dict) + else: + BiometricEmployees.objects.get(id=employee.id).delete() + return users + + +def find_employees_in_cosec(device_id): + """ + Synchronize active employees with a COSEC biometric device. + + This function retrieves a list of active employees from the database, + checks their presence on a specified COSEC biometric device, and updates + the database with employees who are registered on the COSEC device. + + Args: + device_id (uuid): The ID of the biometric device to synchronize with. + """ + device = BiometricDevices.objects.get(id=device_id) + employees = Employee.objects.filter(is_active=True).values_list("id", "badge_id") + cosec = COSECBiometric( + device.machine_ip, device.port, device.bio_username, device.bio_password + ) + existing_user_ids = BiometricEmployees.objects.filter(device_id=device).values_list( + "user_id", flat=True + ) + biometric_employees_to_create = [] + for employee_id, badge_id in employees: + if badge_id and badge_id.isalnum() and len(badge_id) <= 15: + user = cosec.get_cosec_user(user_id=badge_id) + if user.get("user-id") and user.get("user-id") not in existing_user_ids: + biometric_employees_to_create.append( + BiometricEmployees( + ref_user_id=user.get("ref-user-id"), + user_id=user.get("user-id"), + employee_id_id=employee_id, + device_id_id=device_id, + ) + ) + BiometricEmployees.objects.bulk_create(biometric_employees_to_create) + + +def find_employees_in_zk(device_id): + """ + Synchronize active employees with a COSEC biometric device. + + This function retrieves a list of active employees from the database, + checks their presence on a specified COSEC biometric device, and updates + the database with employees who are registered on the COSEC device. + + Args: + device_id (uuid): The ID of the biometric device to synchronize with. + """ + device = BiometricDevices.objects.get(id=device_id) + employees = Employee.objects.filter(is_active=True).values_list("id", "badge_id") + existing_user_ids = set( + BiometricEmployees.objects.filter(device_id=device_id).values_list( + "user_id", flat=True + ) + ) + zk_device = ZK( + device.machine_ip, port=device.port, password=int(device.zk_password), timeout=5 + ) + conn = zk_device.connect() + zk_users = {user.user_id: user.uid for user in conn.get_users()} + biometric_employees_to_create = [ + BiometricEmployees( + uid=zk_users[badge_id], + user_id=badge_id, + employee_id_id=employee_id, + device_id_id=device_id, + ) + for employee_id, badge_id in employees + if badge_id and badge_id in zk_users and badge_id not in existing_user_ids + ] + BiometricEmployees.objects.bulk_create(biometric_employees_to_create) + conn.disconnect() + + +@login_required +@install_required +@permission_required("biometric.view_biometricemployees") +def biometric_device_employees(request, device_id, **kwargs): + """ + View function to display employees associated with a biometric device. + + Depending on the machine type of the biometric device (either "zk" or "cosec"), + this function fetches the relevant employees and renders the appropriate template. + + Args: + request (HttpRequest): The HTTP request object. + device_id (uuid): The ID of the biometric device. + **kwargs: Additional keyword arguments. + + Returns: + HttpResponse: The rendered template response or a redirect to `biometric_devices_view` + in case of an error. + """ + previous_data = request.GET.urlencode() + device = BiometricDevices.find(device_id) + if device: + try: + if device.machine_type == "zk": + employee_add_form = EmployeeBiometricAddForm() + employees = zk_employees_fetch(device) + employees = paginator_qry(employees, request.GET.get("page")) + context = { + "employees": employees, + "device_id": device_id, + "form": employee_add_form, + "pd": previous_data, + } + return render( + request, "biometric/view_employees_biometric.html", context + ) + if device.machine_type == "cosec": + employee_add_form = CosecUserAddForm() + employees = cosec_employee_fetch(device_id) + employees = biometric_paginator_qry( + employees, int(request.GET.get("page", 1)) + ) + context = { + "employees": employees, + "device_id": device.id, + "form": employee_add_form, + "pd": previous_data, + } + return render(request, "biometric/view_cosec_employees.html", context) + if device.machine_type == "dahua": + employees = BiometricEmployees.objects.filter(device_id=device_id) + context = { + "device_id": device.id, + "employees": employees, + } + return render( + request, "biometric_users/dahua/view_dahua_employees.html", context + ) + if device.machine_type == "etimeoffice": + employees = BiometricEmployees.objects.filter(device_id=device_id) + context = { + "device_id": device.id, + "employees": employees, + } + return render( + request, + "biometric_users/etimeoffice/view_etimeoffice_employees.html", + context, + ) + except Exception as error: + logger.error("An error occurred: ", error) + messages.info( + request, + _( + "Failed to establish a connection. Please verify the accuracy of the IP\ + Address , Port No. and Password of the device." + ), + ) + else: + messages.error(request, _("Biometric device not found")) + return redirect(biometric_devices_view) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.view_biometricemployees") +def search_employee_device(request): + """ + View function to search for employees associated with a specific biometric device. + + This function handles searching employees based on their first name and the type of + biometric device (either "zk" or "cosec"). It then renders the appropriate template + with the filtered employee list. + + Args: + request (HttpRequest): The HTTP request object. + + Returns: + HttpResponse: The rendered template response with the context. + """ + previous_data = request.GET.urlencode() + device_id = request.GET.get("device") + device = BiometricDevices.objects.get(id=device_id) + search = request.GET.get("search") + if device.machine_type == "zk": + employees = zk_employees_fetch(device) + if search: + search_employees = BiometricEmployees.objects.filter( + employee_id__employee_first_name__icontains=search + ) + search_uids = search_employees.values_list("uid", flat=True) + employees = [ + employee for employee in employees if employee.uid in search_uids + ] + employees = paginator_qry(employees, request.GET.get("page")) + template = "biometric/list_employees_biometric.html" + context = { + "employees": employees, + "device_id": device_id, + "pd": previous_data, + } + elif device.machine_type == "dahua" or device.machine_type == "etimeoffice": + search_employees = BiometricEmployees.objects.filter(device_id=device) + if search: + search_employees = BiometricEmployees.objects.filter( + employee_id__employee_first_name__icontains=search, device_id=device + ) + template = ( + "biometric_users/dahua/list_dahua_employees.html" + if device.machine_type == "dahua" + else "biometric_users/etimeoffice/list_etimeoffice_employees.html" + ) + context = { + "device_id": device.id, + "employees": search_employees, + } + + else: + employees = cosec_employee_fetch(device_id) + if search: + search_employees = BiometricEmployees.objects.filter( + employee_id__employee_first_name__icontains=search, device_id=device + ) + else: + search_employees = BiometricEmployees.objects.filter(device_id=device) + queryset_user_ids = [employee.user_id for employee in search_employees] + filtered_employees = [ + employee + for employee in employees + if employee["user_id"] in queryset_user_ids + ] + filtered_employees = biometric_paginator_qry( + filtered_employees, int(request.GET.get("page", 1)) + ) + template = "biometric/list_employees_cosec_biometric.html" + context = { + "employees": filtered_employees, + "device_id": device_id, + "pd": previous_data, + } + return render(request, template, context) + + +@login_required +@install_required +@permission_required("biometric.delete_biometricemployees") +def delete_biometric_user(request, uid, device_id): + """ + This function connects to the specified biometric device, deletes the user + identified by the given UID, and removes the corresponding entry from the + BiometricEmployees table in the local database + + Args: + request (HttpRequest): The HTTP request object. + uid (str): The UID of the user to be deleted from the biometric device. + device_id (uuid): The ID of the biometric device. + + Returns: + HttpResponse: A redirect response to the list of employees for the specified + biometric device. + """ + device = BiometricDevices.objects.get(id=device_id) + zk_device = ZK( + device.machine_ip, + port=device.port, + timeout=5, + password=int(device.zk_password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + conn.delete_user(uid=uid) + employee_bio = BiometricEmployees.objects.filter(uid=uid).first() + employee_bio.delete() + messages.success( + request, + _("{} successfully removed from the biometric device.").format( + employee_bio.employee_id + ), + ) + redirect_url = f"/biometric/biometric-device-employees/{device_id}/" + return redirect(redirect_url) + + +@login_required +@install_required +@permission_required("biometric.change_biometricemployees") +def enable_cosec_face_recognition(request, user_id, device_id): + """ + View function to enable face recognition for a user on a COSEC biometric device + + Args: + request (HttpRequest): The HTTP request object. + user_id (str): The ID of the user for whom face recognition is to be enabled. + device_id (uuid): The ID of the COSEC biometric device. + + Returns: + HttpResponse: A redirect response to the list of employees for the specified + biometric device. + """ + device = BiometricDevices.find(device_id) + if device: + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + ) + enable_fr = cosec.enable_user_face_recognition(user_id=user_id, enable_fr=True) + response_code = enable_fr.get("Response-Code") + if response_code == "0": + messages.success(request, _("Face recognition enabled successfully")) + else: + messages.error(request, _("Something went wrong when enabling face")) + else: + messages.error(request, _("Device not found")) + return redirect(f"/biometric/biometric-device-employees/{device_id}/") + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.change_biometricemployees") +def edit_cosec_user(request, user_id, device_id): + """ + View function to edit the details of a COSEC biometric user. + + Args: + request (HttpRequest): The HTTP request object. + user_id (str): The ID of the user to be edited. + device_id (uuid): The ID of the COSEC biometric device. + + Returns: + HttpResponse: The rendered form template for GET requests, and a response with + a success message for valid POST requests. Reloads the page after + successful update. + + """ + device = BiometricDevices.objects.get(id=device_id) + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + ) + user = cosec.get_cosec_user(user_id) + if user.get("name"): + year = int(user["validity-date-yyyy"]) + month = int(user["validity-date-mm"]) + day = int(user["validity-date-dd"]) + date_object = datetime(year, month, day) + formatted_date = date_object.strftime("%Y-%m-%d") + initial_data = { + "name": user["name"], + "user_active": bool(int(user["user-active"])), + "vip": bool(int(user["vip"])), + "validity_enable": bool(int(user["validity-enable"])), + "validity_end_date": formatted_date, + } + + if "by-pass-finger" in user: + initial_data["by_pass_finger"] = bool(int(user["by-pass-finger"])) + + form = COSECUserForm(initial=initial_data) + + if request.method == "POST": + form = COSECUserForm(request.POST) + if form.is_valid(): + name = form.cleaned_data["name"] + user_active = form.cleaned_data["user_active"] + vip = form.cleaned_data["vip"] + validity_enable = form.cleaned_data["validity_enable"] + validity_end_date_str = str(form.cleaned_data["validity_end_date"]) + validity_end_date = datetime.strptime( + validity_end_date_str, "%Y-%m-%d" + ).date() + validity_year = validity_end_date.year + validity_month = validity_end_date.month + validity_day = validity_end_date.day + by_pass_finger = form.cleaned_data["by_pass_finger"] + update_user = cosec.set_cosec_user( + user_id=user["user-id"], + ref_user_id=user["ref-user-id"], + name=name, + user_active=int(user_active), + vip=int(vip), + by_pass_finger=int(by_pass_finger), + validity_enable=int(validity_enable), + validity_date_dd=validity_day, + validity_date_mm=validity_month, + validity_date_yyyy=validity_year, + ) + if ( + update_user.get("Response-Code") + and update_user.get("Response-Code") == "0" + ): + messages.success( + request, _("Biometric user data updated successfully") + ) + return HttpResponse("") + if update_user.get("error"): + error = update_user.get("error") + if "validity-date-yyyy" in error: + form.add_error( + None, + _( + "This date cannot be used as the Validity End Date for\ + the COSEC Biometric." + ), + ) + return render( + request, + "biometric/edit_cosec_user.html", + context={"form": form, "user_id": user_id, "device_id": device_id}, + ) + + +@login_required +@install_required +@permission_required("biometric.delete_biometricemployees") +def delete_horilla_cosec_user(request, user_id, device_id): + """ + View function to delete a user from a COSEC biometric device and database. + + Args: + request (HttpRequest): The HTTP request object. + user_id (str): The ID of the user to be deleted from the COSEC biometric device. + device_id (uuid): The ID of the COSEC biometric device. + + Returns: + HttpResponse: A redirect response to the list of employees for the specified + biometric device. + """ + device = BiometricDevices.find(device_id) + if device: + employee_bio = BiometricEmployees.objects.filter( + user_id=user_id, device_id=device + ).first() + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + ) + response = cosec.delete_cosec_user(user_id) + if response.get("Response-Code") and response.get("Response-Code") == "0": + employee_bio.delete() + messages.success( + request, + _("{} successfully removed from the biometric device.").format( + employee_bio.employee_id + ), + ) + else: + messages.error(request, _("Biometric user not found")) + else: + messages.error(request, _("Biometric device not found")) + redirect_url = ( + f"/biometric/biometric-device-employees/{device_id}/" + if device + else "/biometric/view-biometric-devices/" + ) + return redirect(redirect_url) + + +@login_required +@install_required +@permission_required("biometric.delete_biometricemployees") +def bio_users_bulk_delete(request): + """ + View function to delete multiple users from a ZK biometric device and the local database. + + Args: + request (HttpRequest): The HTTP request object. + + Returns: + JsonResponse: A JSON response indicating the success of the bulk delete operation. + + """ + conn = None + json_ids = request.POST["ids"] + device_id = request.POST["deviceId"] + ids = json.loads(json_ids) + device = BiometricDevices.objects.get(id=device_id) + try: + zk_device = ZK( + device.machine_ip, + port=device.port, + timeout=5, + password=int(device.zk_password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + for user_id in ids: + user_id = int(user_id) + conn.delete_user(user_id=user_id) + employee_bio = BiometricEmployees.objects.filter(user_id=user_id).first() + employee_bio.delete() + conn.refresh_data() + messages.success( + request, + _("{} successfully removed from the biometric device.").format( + employee_bio.employee_id + ), + ) + except Exception as error: + logger.error("An error occurred: ", error) + return JsonResponse({"messages": "Success"}) + + +@login_required +@install_required +@permission_required("biometric.delete_biometricemployees") +def cosec_users_bulk_delete(request): + """ + View function to delete multiple users from a COSEC biometric device and database. + + Args: + request (HttpRequest): The HTTP request object. + + Returns: + JsonResponse: A JSON response indicating the success of the bulk delete operation. + """ + json_ids = request.POST["ids"] + device_id = request.POST["deviceId"] + ids = json.loads(json_ids) + device = BiometricDevices.objects.get(id=device_id) + try: + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + ) + for user_id in ids: + cosec.delete_cosec_user(user_id=user_id) + employee_bio = BiometricEmployees.objects.filter( + user_id=user_id, device_id=device + ).first() + if employee_bio: + employee_bio.delete() + messages.success( + request, + f"{employee_bio.employee_id} " + + _("successfully removed from the biometric device."), + ) + + except Exception as error: + logger.error("An error occurred: ", error) + return JsonResponse({"messages": "Success"}) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.add_biometricemployees") +def add_biometric_user(request, device_id): + """ + View function to add a new user to a biometric device. + + This function adds a new user to the specified biometric device and stores their + information in the database. + + Args: + request (HttpRequest): The HTTP request object. + device_id (uuid): The ID of the biometric device. + + Returns: + HttpResponse: A JavaScript script to reload the current page after adding the user. + + """ + device = BiometricDevices.objects.get(id=device_id) + employee_add_form = ( + EmployeeBiometricAddForm() + if device.machine_type == "zk" + else CosecUserAddForm() + ) + if request.method == "POST": + device = BiometricDevices.objects.get(id=device_id) + try: + if device.machine_type == "zk": + zk_device = ZK( + device.machine_ip, + port=device.port, + timeout=5, + password=int(device.zk_password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + conn.enable_device() + existing_uids = [user.uid for user in conn.get_users()] + existing_user_ids = [user.user_id for user in conn.get_users()] + uid = 1 + user_id = 1000 + employee_ids = request.POST.getlist("employee_ids") + for obj_id in employee_ids: + employee = Employee.objects.get(id=obj_id) + existing_biometric_employee = BiometricEmployees.objects.filter( + employee_id=employee, device_id=device + ).first() + if existing_biometric_employee is None: + while uid in existing_uids or user_id in existing_user_ids: + user_id = int(user_id) + uid += 1 + user_id += 1 + existing_uids.append(uid) + existing_user_ids.append(user_id) + employee_name = employee.get_full_name() + conn.set_user( + uid=uid, + name=employee_name, + password="", + group_id="", + user_id=str(user_id), + card=0, + ) + # The ZK Biometric user ID must be a character value + # that can be converted to an integer. + BiometricEmployees.objects.create( + uid=uid, + user_id=str(user_id), + employee_id=employee, + device_id=device, + ) + messages.success( + request, + _("{} added to biometric device successfully").format( + employee + ), + ) + else: + messages.info( + request, + _("{} already added to biometric device").format(employee), + ) + else: + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + ) + basic = cosec.basic_config() + if basic.get("app"): + employee_ids = request.POST.getlist("employee_ids") + cosec_users = BiometricEmployees.objects.filter(device_id=device_id) + existing_ref_user_ids = list( + cosec_users.values_list("ref_user_id", flat=True) + ) + for obj_id in employee_ids: + employee = Employee.objects.get(id=obj_id) + employee_name = employee.get_full_name() + user_id = employee.badge_id + ref_user_id = 100 + while ref_user_id in existing_ref_user_ids: + ref_user_id += 1 + existing_ref_user_ids.append(ref_user_id) + user = cosec.set_cosec_user( + user_id=user_id, + ref_user_id=ref_user_id, + name=employee_name, + user_active=True, + validity_enable=True, + validity_date_dd=1, + validity_date_mm=1, + validity_date_yyyy=2035, + ) + response = user.get("Response-Code") + if response and response == "0": + BiometricEmployees.objects.create( + ref_user_id=ref_user_id, + user_id=user_id, + employee_id=employee, + device_id=device, + ) + except Exception as error: + if device.machine_type == "zk": + conn.disable_device() + logger.error("An error occurred: ", str(error)) + return HttpResponse("") + return render( + request, + "biometric/add_biometric_user.html", + context={"form": employee_add_form, "device_id": device_id}, + ) + + +@login_required +@install_required +@hx_request_required +def map_biometric_users(request, device_id): + """ + Maps an horilla employee to a biometric user on a specified biometric device. + """ + device = BiometricDevices.find(device_id) + form = MapBioUsers(request.POST or None) + template = "biometric_users/dahua/map_dahua_users.html" + + if device.machine_type == "etimeoffice": + template = "biometric_users/etimeoffice/map_etimeoffice_users.html" + form.fields["user_id"].label = _("Emp Code") + + if request.method == "POST" and form.is_valid(): + user_id = form.cleaned_data["user_id"] + employee = form.cleaned_data["employee_id"] + if device and employee: + BiometricEmployees.objects.create( + user_id=user_id, employee_id=employee, device_id=device + ) + + messages.success( + request, + _("Selected employee successfully mapped to the biometric user"), + ) + form = MapBioUsers() + + if device.machine_type == "etimeoffice": + form.fields["user_id"].label = _("Emp Code") + + return render( + request, + template, + {"form": form, "device_id": device_id}, + ) + + +@login_required +@install_required +@hx_request_required +def add_dahua_biometric_user(request, device_id): + """ + Adds a new employee to a Dahua biometric device. + + This view handles the process of adding an employee as a user to a Dahua biometric device. + """ + device = BiometricDevices.find(device_id) + form = DahuaUserForm() + if request.method == "POST": + form = DahuaUserForm(request.POST) + if form.is_valid(): + employee_id = request.POST.get("employee") + card_no = request.POST.get("card_no") + user_id = request.POST.get("user_id") + card_status = request.POST.get("card_status") + card_type = request.POST.get("card_type") + password = request.POST.get("password") + valid_date_end = request.POST.get("valid_date_end") + + try: + employee = Employee.objects.get(id=employee_id) if employee_id else None + except Employee.DoesNotExist: + messages.error(request, _("Employee not found.")) + return render( + request, "biometric_users/dahua/add_dahua_user.html", context + ) + + dahua = DahuaAPI( + ip=device.machine_ip, + username=device.bio_username, + password=device.bio_password, + ) + + response = dahua.enroll_new_user( + card_name=employee.get_full_name() if employee else "", + card_no=card_no, + user_id=user_id, + card_status=card_status, + card_type=card_type, + password=password, + valid_date_end=valid_date_end, + ) + + if response.get("status_code") == 200: + BiometricEmployees.objects.create( + dahua_card_no=card_no, + user_id=user_id, + employee_id=employee, + device_id=device, + ) + messages.success( + request, + _("{} added to biometric device successfully").format( + employee.get_full_name() if employee else "" + ), + ) + form = DahuaUserForm() + else: + messages.error(request, _("Failed to add user to biometric device.")) + context = {"form": form, "device_id": device_id} + return render(request, "biometric_users/dahua/add_dahua_user.html", context) + + +@login_required +@hx_request_required +@install_required +def find_employee_badge_id(request): + """ + Retrieves the badge ID of an employee based on their employee ID. + """ + employee_id = request.GET.get("employee") + user_id = Employee.objects.get(id=employee_id).badge_id if employee_id else "" + input_field = f""" + + """ + return HttpResponse(input_field) + + +@login_required +@hx_request_required +@install_required +def delete_dahua_user(request, obj_id=None): + """ + Deletes a Dahua biometric user or multiple users from a device. + """ + script = "" + try: + if request.method == "POST" and obj_id: + user = BiometricEmployees.objects.get(id=obj_id) + user.delete() + messages.success( + request, _("{} successfully deleted!").format(user.employee_id) + ) + script = "" + if request.method == "DELETE": + user_ids = request.GET.getlist("ids") + device_id = request.GET.get("device_id") + if device_id: + script = f""" + + + """ + if user_ids: + users = BiometricEmployees.objects.filter(user_id__in=user_ids) + if users: + count = users.count() + users.delete() + messages.success( + request, _("{} users successfully deleted!").format(count) + ), + else: + messages.warning( + request, + _("No rows are selected for deleting users from device."), + ) + except Exception as e: + messages.error(request, _("An error occurred: {}").format(str(e))) + return HttpResponse(script) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.delete_biometricemployees") +def delete_etimeoffice_user(request, obj_id=None): + """ + Deletes a user or multiple users from the eTimeOffice biometric system. + """ + script = "" + if request.method == "POST": + user = BiometricEmployees.objects.get(id=obj_id) + device_id = user.device_id.id + user.delete() + messages.success( + request, _("{} successfully deleted!").format(user.employee_id) + ) + script = "" + if request.method == "DELETE": + user_ids = request.GET.getlist("ids") + device_id = request.GET.get("device_id") + if device_id: + script = f""" + + + """ + if user_ids: + users = BiometricEmployees.objects.filter(user_id__in=user_ids) + if users: + count = users.count() + users.delete() + messages.success( + request, _("{} users successfully deleted!").format(count) + ), + else: + messages.warning( + request, + _("No rows are selected for deleting users from device."), + ) + + return HttpResponse(script) + + +@login_required +@install_required +@hx_request_required +@permission_required("biometric.change_biometricdevices") +def biometric_device_live(request): + """ + Activate or deactivate live capture mode for a biometric device based on the request parameters. + + :param request: The Django request object. + :return: A JsonResponse containing a script to be executed on the client side. + """ + is_live = request.GET.get("is_live") + device_id = request.GET.get("deviceId") + device = BiometricDevices.objects.get(id=device_id) + is_live = is_live == "on" + if is_live: + port_no = device.port + machine_ip = device.machine_ip + password = int(device.zk_password) + conn = None + # create ZK instance + try: + if device.machine_type == "zk": + zk_device = ZK( + machine_ip, + port=port_no, + timeout=5, + password=int(password), + force_udp=False, + ommit_ping=False, + ) + conn = zk_device.connect() + instance = ZKBioAttendance(machine_ip, port_no, password) + conn.test_voice(index=14) + if conn: + device.is_live = True + device.is_scheduler = False + device.save() + instance.start() + elif device.machine_type == "cosec": + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + timeout=10, + ) + response = cosec.basic_config() + if response.get("app"): + device.is_live = True + device.is_scheduler = False + device.save() + thread = COSECBioAttendanceThread(device.id) + thread.start() + BIO_DEVICE_THREADS[device.id] = thread + else: + raise TimeoutError + else: + pass + + script = """ + """ + except TimeoutError as error: + device.is_live = False + device.save() + logger.error("An error comes in biometric_device_live", error) + script = """ + + """ + finally: + if conn: + conn.disconnect() + else: + device.is_live = False + device.save() + if device.machine_type == "cosec": + existing_thread = BIO_DEVICE_THREADS.get(device.id) + if existing_thread: + existing_thread.stop() + del BIO_DEVICE_THREADS[device.id] + + script = """ + + """ + return HttpResponse(script) + + +def zk_biometric_attendance_logs(device_or_devices): + """ + Retrieve and process attendance logs from one or more ZKTeco biometric devices. + + Handles scenarios where the same user_id may exist across multiple devices for the same employee. + + :param device_or_devices: A single BiometricDevice instance or a queryset/list of them. + :return: Tuple (number_of_attendance_processed, error_message or None) + """ + if hasattr(device_or_devices, "__iter__") and not isinstance( + device_or_devices, dict + ): + devices = list(device_or_devices) + else: + devices = [device_or_devices] + + errors = [] + combined_attendances = [] + patch_direction = {"in": 0, "out": 1} + + bio_id_map = { + (bio.device_id_id, bio.user_id): bio + for bio in BiometricEmployees.objects.filter(device_id__in=devices) + } + + for device in devices: + port_no = device.port + machine_ip = device.machine_ip + conn = None + zk_device = ZK( + machine_ip, + port=port_no, + timeout=5, + password=int(device.zk_password), + force_udp=False, + ommit_ping=False, + ) + + try: + conn = zk_device.connect() + conn.enable_device() + attendances = conn.get_attendance() + if not attendances: + continue + + last_attendance_datetime = attendances[-1].timestamp + + if device.last_fetch_date and device.last_fetch_time: + filtered = [ + att + for att in attendances + if (att.timestamp.date() > device.last_fetch_date) + or ( + att.timestamp.date() == device.last_fetch_date + and att.timestamp.time() > device.last_fetch_time + ) + ] + else: + filtered = attendances + + # Update last fetch markers + device.last_fetch_date = last_attendance_datetime.date() + device.last_fetch_time = last_attendance_datetime.time() + device.save() + for attendance in filtered: + attendance.device = device # Attach device info + attendance.punch = ( + patch_direction[device.device_direction] + if device.device_direction in patch_direction + else attendance.punch + ) # Update punch code based on device direction + combined_attendances.append(attendance) + + except zk_exception.ZKErrorResponse as e: + errors.append(f"[{device.name}] ZKError: {str(e)}") + except Exception as e: + logger.error(f"[{device.name}] General Error", exc_info=True) + errors.append(f"[{device.name}] Error: {str(e)}") + finally: + if conn: + conn.disconnect() + + # Sort all filtered attendances by time + combined_attendances.sort(key=lambda a: a.timestamp) + + for attendance in combined_attendances: + user_id = attendance.user_id + punch_code = attendance.punch + date_time = django_timezone.make_aware(attendance.timestamp) + date = date_time.date() + time = date_time.time() + device_id = attendance.device.id + bio_id = bio_id_map.get((device_id, user_id)) + if bio_id: + request_data = Request( + user=bio_id.employee_id.employee_user_id, + date=date, + time=time, + datetime=date_time, + ) + try: + if punch_code in {0, 3, 4}: + clock_in(request_data) + elif punch_code in {1, 2, 5}: + clock_out(request_data) + except Exception: + logger.error( + f"[Device: {attendance.device.name}] Punch processing error", + exc_info=True, + ) + + return len(combined_attendances), "; ".join(errors) if errors else None + + +def zk_biometric_attendance_scheduler(device_id): + """ + Scheduler function used for attendance logs + """ + device = BiometricDevices.find(device_id) + if device and device.is_scheduler: + zk_biometric_attendance_logs(device) + + +def anviz_biometric_attendance_logs(device): + """ + Retrieves attendance records from an Anviz biometric device + and processes them based on device direction configuration. + """ + + current_utc_time = datetime.utcnow() + + anviz_device = CrossChexCloudAPI( + api_url=device.api_url, + api_key=device.api_key, + api_secret=device.api_secret, + anviz_request_id=device.anviz_request_id, + ) + + begin_time = ( + datetime.combine(device.last_fetch_date, device.last_fetch_time) + if device.last_fetch_date and device.last_fetch_time + else current_utc_time.replace(hour=0, minute=0, second=0, microsecond=0) + ) + + attendance_records = anviz_device.get_attendance_records( + begin_time=begin_time, + token=device.api_token, + ) + + # Update last fetch time immediately + device.last_fetch_date = current_utc_time.date() + device.last_fetch_time = current_utc_time.time() + device.save(update_fields=["last_fetch_date", "last_fetch_time"]) + + processed_count = 0 + + for attendance in attendance_records.get("list", []): + badge_id = attendance["employee"]["workno"] + punch_code = attendance["checktype"] + + date_time_utc = datetime.strptime( + attendance["checktime"], "%Y-%m-%dT%H:%M:%S%z" + ) + date_time_obj = date_time_utc.astimezone(django_timezone.get_current_timezone()) + + employee = Employee.objects.filter(badge_id=badge_id).first() + if not employee: + continue + + request_data = Request( + user=employee.employee_user_id, + date=date_time_obj.date(), + time=date_time_obj.time(), + datetime=date_time_obj, + ) + + try: + # -------------------------------------------------- + # SYSTEM DIRECTION (auto based on punch code) + # -------------------------------------------------- + if device.device_direction == "system": + if punch_code in {0, 128}: + clock_in(request_data) + else: + clock_out(request_data) + + # -------------------------------------------------- + # FORCE IN DEVICE + # -------------------------------------------------- + elif device.device_direction == "in": + clock_in(request_data) + + # -------------------------------------------------- + # FORCE OUT DEVICE + # -------------------------------------------------- + elif device.device_direction == "out": + clock_out(request_data) + + # -------------------------------------------------- + # ALTERNATE IN / OUT DEVICE + # -------------------------------------------------- + elif device.device_direction == "alternate": + last_activity = ( + AttendanceActivity.objects.filter( + employee_id=employee, + attendance_date=date_time_obj.date(), + ) + .order_by("-in_datetime", "-out_datetime") + .first() + ) + + # If no record or last record has clock_out → IN + if not last_activity or last_activity.clock_out: + clock_in(request_data) + else: + clock_out(request_data) + + processed_count += 1 + + except Exception as error: + logger.error( + f"Attendance sync failed for employee {employee.id}", + exc_info=error, + ) + + return processed_count + + +def anviz_biometric_attendance_scheduler(device_id): + """ + Schedules the attendance log retrieval for an Anviz biometric device. + """ + device = BiometricDevices.find(device_id) + if device and device.is_scheduler: + anviz_biometric_attendance_logs(device) + + +def cosec_biometric_attendance_logs(device): + """ + Retrieves and processes attendance logs from a COSEC biometric device. + """ + device_args = COSECAttendanceArguments.objects.filter(device_id=device).first() + last_fetch_roll_ovr_count = ( + int(device_args.last_fetch_roll_ovr_count) if device_args else 0 + ) + last_fetch_seq_number = int(device_args.last_fetch_seq_number) if device_args else 1 + + cosec = COSECBiometric( + device.machine_ip, + device.port, + device.bio_username, + device.bio_password, + timeout=10, + ) + attendances = cosec.get_attendance_events( + last_fetch_roll_ovr_count, int(last_fetch_seq_number) + 1 + ) + + if not isinstance(attendances, list): + return + + for attendance in attendances: + ref_user_id = attendance["detail-1"] + employee = BiometricEmployees.objects.filter(ref_user_id=ref_user_id).first() + if not employee: + continue + + date_str = attendance["date"] + time_str = attendance["time"] + attendance_date = datetime.strptime(date_str, "%d/%m/%Y").date() + attendance_time = datetime.strptime(time_str, "%H:%M:%S").time() + attendance_datetime = datetime.combine(attendance_date, attendance_time) + punch_code = attendance["detail-2"] + + request_data = Request( + user=employee.employee_id.employee_user_id, + date=attendance_date, + time=attendance_time, + datetime=django_timezone.make_aware(attendance_datetime), + ) + + try: + if punch_code in ["1", "3", "5", "7", "9", "0"]: + clock_in(request_data) + elif punch_code in ["2", "4", "6", "8", "10"]: + clock_out(request_data) + else: + pass + except Exception as error: + logger.error("Error processing attendance: ", error) + + if attendances: + last_attendance = attendances[-1] + COSECAttendanceArguments.objects.update_or_create( + device_id=device, + defaults={ + "last_fetch_roll_ovr_count": last_attendance["roll-over-count"], + "last_fetch_seq_number": last_attendance["seq-No"], + }, + ) + return len(attendances) + + +def cosec_biometric_attendance_scheduler(device_id): + """ + Retrieve and process attendance events from a COSEC biometric device. + + This function fetches attendance events from the specified COSEC biometric device + and processes them to record clock-in and clock-out events for employees. + + Args: + device_id (uuid): The ID of the COSEC biometric device. + """ + device = BiometricDevices.find(device_id) + if device and device.is_scheduler: + cosec_biometric_attendance_logs(device) + + +def dahua_biometric_attendance_logs(device): + """ + Retrieves logs from a Dahua biometric device and marks attendance in Horilla. + + This function fetches biometric logs from the specified device, processes the attendance records, + and updates the attendance system in Horilla. If an employee has an active clock-in record, + it marks their clock-out; otherwise, it registers a new clock-in entry. + + Args: + device_id (int): The unique identifier of the biometric device. + + Returns: + None + """ + begin_time = ( + datetime.combine(device.last_fetch_date, device.last_fetch_time) + + timedelta(seconds=1) + if device.last_fetch_date and device.last_fetch_time + else datetime.combine(datetime.today(), datetime.min.time()) + ) + + dahua = DahuaAPI( + ip=device.machine_ip, username=device.bio_username, password=device.bio_password + ) + logs = dahua.get_control_card_rec(start_time=begin_time) + + if logs.get("status_code") == 200: + for log in logs.get("records", []): + user_id = log.get("user_id") + if not user_id: + continue + + employee = BiometricEmployees.objects.filter( + user_id=user_id, device_id=device + ).first() + if not employee: + continue + + attendance_datetime = log.get("create_time") + user_tz = pytz.timezone(TIME_ZONE) + attendance_datetime = attendance_datetime.astimezone(user_tz) + + last_none_activity = ( + AttendanceActivity.objects.filter( + employee_id=employee.employee_id, + clock_out=None, + ) + .order_by("in_datetime") + .last() + ) + + request_data = Request( + user=employee.employee_id.employee_user_id, + date=attendance_datetime.date(), + time=attendance_datetime.time(), + datetime=attendance_datetime, + ) + + if last_none_activity: + clock_out(request_data) + else: + clock_in(request_data) + + if logs.get("records"): + last_log = logs["records"][-1] + device.last_fetch_date = last_log["create_time"].date() + device.last_fetch_time = last_log["create_time"].time() + device.save() + return len(logs.get("records", [])) + else: + return "error" + + +def dahua_biometric_attendance_scheduler(device_id): + """ + Schedules the attendance log retrieval for a Dahua biometric device. + """ + device = BiometricDevices.find(device_id) + if device and device.is_scheduler: + dahua_biometric_attendance_logs(device) + + +def etimeoffice_biometric_attendance_logs(device): + """ + Retrieves and processes attendance logs from an eTimeOffice biometric device. + """ + now = datetime.now() + etimeoffice = ETimeOfficeAPI( + username=device.bio_username, + password=device.bio_password, + ) + + from_date = ( + f"{(datetime.combine(device.last_fetch_date, device.last_fetch_time) + timedelta(minutes=1)):%d/%m/%Y_%H:%M}" + if device.last_fetch_date and device.last_fetch_time + else f"{now:%d/%m/%Y}_00:00" + ) + to_date = f"{now:%d/%m/%Y_%H:%M}" + + logs = etimeoffice.download_punch_data(from_date=from_date, to_date=to_date) + if logs.get("Msg") != "Success": + return "error" + + punch_data = logs.get("PunchData", []) + if not punch_data: + return 0 + + user_tz = pytz.timezone(TIME_ZONE) + + employee_map = { + emp.user_id: emp for emp in BiometricEmployees.objects.filter(device_id=device) + } + + for log in reversed(punch_data): + user_id = log.get("Empcode") + if not user_id or user_id not in employee_map: + continue + + employee = employee_map[user_id] + attendance_datetime = log["PunchDate"].astimezone(user_tz) + + request_data = Request( + user=employee.employee_id.employee_user_id, + date=attendance_datetime.date(), + time=attendance_datetime.time(), + datetime=attendance_datetime, + ) + + last_none_activity = ( + AttendanceActivity.objects.filter( + employee_id=employee.employee_id, + clock_out=None, + ) + .order_by("in_datetime") + .last() + ) + + if last_none_activity: + clock_out(request_data) + else: + clock_in(request_data) + + last_log = punch_data[0] + device.last_fetch_date, device.last_fetch_time = ( + last_log["PunchDate"].date(), + last_log["PunchDate"].time(), + ) + device.save() + + return len(punch_data) + + +def etimeoffice_biometric_attendance_scheduler(device_id): + """ + Schedules the attendance log retrieval for an eTimeOffice biometric device. + """ + device = BiometricDevices.find(device_id) + if device and device.is_scheduler: + etimeoffice_biometric_attendance_logs(device) + + +try: + devices = BiometricDevices.objects.all().update(is_live=False) + for device in BiometricDevices.objects.filter(is_scheduler=True): + if device: + if str_time_seconds(device.scheduler_duration) > 0: + if device.machine_type == "anviz": + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: anviz_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + elif device.machine_type == "zk": + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: zk_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + id=f"biometric_{device.id}", + ) + scheduler.start() + elif device.machine_type == "dahua": + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: dahua_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + + elif device.machine_type == "cosec": + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: cosec_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + + elif device.machine_type == "etimeoffice": + scheduler = BackgroundScheduler() + scheduler.add_job( + lambda: etimeoffice_biometric_attendance_scheduler(device.id), + "interval", + seconds=str_time_seconds(device.scheduler_duration), + ) + scheduler.start() + else: + pass +except: + pass