Files
hrms/biometric/anviz.py
2026-01-16 15:08:17 +01:00

173 lines
5.9 KiB
Python

"""
CrossChexCloudAPI module for Anviz Biometric Integration
This module provides a wrapper for interacting with the CrossChex Cloud API to manage
authentication, attendance data retrieval, and token handling. It allows for secure
communication with the API, including fetching and validating tokens, and retrieving
attendance records .
"""
from datetime import datetime
import requests
class CrossChexCloudAPI:
"""
CrossChexCloudAPI: A class to interact with the CrossChex Cloud API for attendance data
and token management.
"""
def __init__(self, api_url, api_key, api_secret, anviz_request_id):
"""
Initializes the CrossChexCloudAPI object with necessary parameters, such as API URL,
credentials (API key and secret), and request ID for the connection.
"""
self.api_url = api_url
self.api_key = api_key
self.api_secret = api_secret
self.anviz_request_id = anviz_request_id
self.token = None
self.expires = None
self.auth_error = {
"header": {"nameSpace": "System", "name": "Exception"},
"payload": {"type": "AUTH_ERROR", "message": "AUTH_ERROR"},
}
self.expires_error = {
"header": {"nameSpace": "System", "name": "Exception"},
"payload": {"type": "TOKEN_EXPIRES", "message": "TOKEN_EXPIRES"},
}
def _get_timestamp(self):
"""
Generates a UTC timestamp in ISO 8601 format.
"""
return datetime.utcnow().isoformat() + "Z"
def _post(self, data):
"""
Sends a POST request with the given data to the API and handles the response, including
automatic token renewal in case of expiration or authentication error.
"""
response = requests.post(self.api_url, json=data, timeout=5)
response_data = response.json()
if "payload" in response_data:
if response_data["payload"] == self.expires_error:
self.get_token()
response = requests.post(self.api_url, json=data, timeout=5)
response_data = response.json()
elif response_data["payload"] == self.auth_error:
raise Exception("Authentication error: API key or secret is incorrect.")
response.raise_for_status()
return response_data
def _is_token_expired(self):
"""Check if the token is expired."""
if self.expires:
expires_datetime = datetime.fromisoformat(self.expires)
# Remove timezone info to make it offset-naive
expires_datetime = expires_datetime.replace(tzinfo=None)
return datetime.utcnow() > expires_datetime
return True
def get_token(self):
"""Fetch a new token if expired or not present, and store it in the database."""
if self.token is None or self._is_token_expired():
data = {
"header": {
"nameSpace": "authorize.token",
"nameAction": "token",
"version": "1.0",
"requestId": self.anviz_request_id,
"timestamp": self._get_timestamp(),
},
"payload": {"api_key": self.api_key, "api_secret": self.api_secret},
}
response = self._post(data)
self.token = response.get("payload").get("token")
self.expires = response.get("payload").get("expires")
return self.token, self.expires
def test_connection(self):
"""Test connection and fetch the token and expiry."""
token, expires = self.get_token()
return {"token": token, "expires": expires}
def get_attendance_payload(
self, begin_time, end_time, order, page, per_page, token
):
"""Constructs the payload for retrieving attendance records."""
current_utc_time = datetime.utcnow()
begin_time = begin_time or current_utc_time.replace(
hour=0, minute=0, second=0, microsecond=0
)
end_time = end_time or current_utc_time
begin_time_str = begin_time.isoformat() + "+00:00"
end_time_str = end_time.isoformat() + "+00:00"
return {
"header": {
"nameSpace": "attendance.record",
"nameAction": "getrecord",
"version": "1.0",
"requestId": self.anviz_request_id,
"timestamp": self._get_timestamp(),
},
"authorize": {
"type": "token",
"token": token,
},
"payload": {
"begin_time": begin_time_str,
"end_time": end_time_str,
"order": order,
"page": page,
"per_page": per_page,
},
}
def get_attendance_records(
self,
begin_time=None,
end_time=None,
order="asc",
page=1,
per_page=100,
token=None,
):
"""Get attendance records, optimizing token usage and handling pagination."""
all_records = []
token = token or self.get_token()[0]
while True:
payload_data = self.get_attendance_payload(
begin_time=begin_time,
end_time=end_time,
order=order,
page=str(page),
per_page=str(per_page),
token=token,
)
response = self._post(payload_data)
# Safe extraction to avoid KeyError
payload = response.get("payload", {}) if isinstance(response, dict) else {}
records = payload.get("list", [])
all_records.extend(records)
page_count = payload.get("pageCount", 0)
if page >= page_count or not records:
break
page += 1
return {
"token": self.token,
"expires": self.expires,
"list": all_records,
"count": len(all_records),
}