2025-06-11 14:36:06 +05:30
|
|
|
import base64
|
2025-06-11 15:04:41 +05:30
|
|
|
import json
|
|
|
|
|
|
2025-06-11 14:36:06 +05:30
|
|
|
from cryptography.hazmat.backends import default_backend
|
2025-06-11 15:04:41 +05:30
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
2025-06-11 14:36:06 +05:30
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
|
|
|
|
|
|
PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
|
|
|
|
|
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQCitHXNMGC04pSI
|
|
|
|
|
di+ynQjLDQd1LrExd1nYMxdOFnX/CnVEJGVEUKPdDm5M300ULugeFPx76MsjQx97
|
|
|
|
|
vWAgZDMEA2+WyKFAHqmgRvVoiESZZnduIj2akVjFZvA24sbFpGunaiMPq61kWUBR
|
|
|
|
|
0pFRw4pnXEbPQnqu9H5vEJ7W5+G6CDlJD3eeTTkM85/ch1/REJbSDyp+Tozrc0Wa
|
|
|
|
|
Mv5xMUbVtSjwxaqwSfdMbAfJxnaaZ5SKwgiUEIsLKsQISWX3+qq6USWOCbJs6oTV
|
|
|
|
|
eWgeZfxgOjCUovJsZ2TuJ4aNfMtI+dFm3OMaeB2ypa4F4lWu8pq6FVhTseQntfUu
|
|
|
|
|
fQv4P+iRAgMBAAECggEABGOp6ecsNLUIHMZTcxYZbqDjWp3v2c3GdraqIkko1cCK
|
|
|
|
|
eVQiBz3Frej9wMUlZy38xRL73Lvi/wiIiOYK+dS6K5mMIR04fGpXWSOQ60kB0MGa
|
|
|
|
|
5zW1Q744DttAD7r+ccaFwPZ0C7At9U8TFSIBGZuU2ET9BApfFOkzn/tqzZFj3Yjg
|
|
|
|
|
OgWaGCvtOGCjLgjN1CWRTq+U66SUuVEtm8cXX8o4hVGfy2ITdg10xW+88qgLqxLj
|
|
|
|
|
/2RKPTixjmlwp1/28Z0rotp4GFUU5yplDq86YkdYNF8wHIPx0NiEUiLmAtfZrpmM
|
|
|
|
|
0xyJVJbgWh8QASzOxM1lq8WOWOOhJnVnkowYJsxVgQKBgQDZisbd8n59rJQfDMbK
|
|
|
|
|
7/cQ0gedl7No+0taY30hSckeR73yxAvmz83jqiD6qOlbaCbb+Rx1PPKewRZybrQV
|
|
|
|
|
n0CdFJ5oB1lbtLY7ftIlGQx2MiI4e+lAiCn+Zo2FnIm+C6LJKWqR18vLlHKebbJK
|
|
|
|
|
IAHdW76roZsdqAyBSyn0fh2gXQKBgQC/d+/30lIBafNCc6oDXbZFrH9/sRqCEg1u
|
|
|
|
|
jOhgNjwYZ23IkBMghyhD3eAiymvUFYCQ2pUfSMqygBbFUay5h5/PF34vzJmSVwC0
|
|
|
|
|
6SVemod/2Z7ZcVUogplSdAW1+V4f/3pyIptgKAAqlsE7lAQJAiyd7FnXx8/Rx7Cw
|
|
|
|
|
IVh+Jz11xQKBgBaTOTn1HT1LeH+UYtjSeDAtq46mHH8rfNFfe6/FqXJT/ZlA0P9d
|
|
|
|
|
1z7l+9AnUTgkIcw4GMTt0zu4S+0KIfQQd7MVXa7r/FDw+uxHp+UjqVBmuXhlG3qP
|
|
|
|
|
5tO4rr0L1pt7N6RqgN2rqEFzIUXhmlvo4GipSasj9SXpt4p/U1ZE9CwdAoGBAKSU
|
|
|
|
|
5jMyGMeaWT4Pyl5mWV1+r4IFrHGOLvmOKdk6BWI81cOHBMn7JANiX13IffOqH/9j
|
|
|
|
|
xLdFjObu76PhVwWLrTUITrGrv35pRvQ7TKILVtnxKHhk0Pyndj/H93i6x8vdgVVG
|
|
|
|
|
piR7fdkeCS+7RdSwh8Wf+oJfASaj7h8YKscV1+C5An9SKKO185jufNw+6cToVvBG
|
|
|
|
|
ii91044lvc8XidFgR6t3M9cJS3pHNemIfg2dZCu7i2/UbAIrsMDt4Uv4QGOaRGr/
|
|
|
|
|
rZ7JoquIPXVTaUjSJLWOELUi9TDBB1F04duW/xNsKswrFIk+mT0jMO3zB/LyCT5m
|
|
|
|
|
BCyI7ou85astK8+e4Q12
|
|
|
|
|
-----END PRIVATE KEY-----
|
|
|
|
|
"""
|
|
|
|
|
PASSPHRASE = "Horilla"
|
|
|
|
|
APP_SECRET = "6c18bf72d5f486479bf66c5807c2b393"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FlowEndpointException(Exception):
|
|
|
|
|
def __init__(self, status_code, message):
|
|
|
|
|
super().__init__(message)
|
|
|
|
|
self.status_code = status_code
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt_request(body, private_pem, passphrase):
|
|
|
|
|
# Extract encrypted data from the request body
|
2025-06-11 15:04:41 +05:30
|
|
|
encrypted_aes_key = body["encrypted_aes_key"]
|
|
|
|
|
encrypted_flow_data = body["encrypted_flow_data"]
|
|
|
|
|
initial_vector = body["initial_vector"]
|
2025-06-11 14:36:06 +05:30
|
|
|
|
|
|
|
|
private_key = serialization.load_pem_private_key(
|
2025-06-11 15:04:41 +05:30
|
|
|
private_pem.encode(), password=None, backend=default_backend()
|
2025-06-11 14:36:06 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
|
|
decrypted_aes_key = private_key.decrypt(
|
|
|
|
|
base64.b64decode(encrypted_aes_key),
|
|
|
|
|
padding.OAEP(
|
|
|
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
|
|
|
algorithm=hashes.SHA256(),
|
2025-06-11 15:04:41 +05:30
|
|
|
label=None,
|
|
|
|
|
),
|
2025-06-11 14:36:06 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
|
|
flow_data_buffer = base64.b64decode(encrypted_flow_data)
|
|
|
|
|
iv_buffer = base64.b64decode(initial_vector)
|
|
|
|
|
|
|
|
|
|
TAG_LENGTH = 16
|
|
|
|
|
encrypted_flow_data_body = flow_data_buffer[:-TAG_LENGTH]
|
|
|
|
|
encrypted_flow_data_tag = flow_data_buffer[-TAG_LENGTH:]
|
|
|
|
|
|
2025-06-11 15:04:41 +05:30
|
|
|
cipher = Cipher(
|
|
|
|
|
algorithms.AES(decrypted_aes_key),
|
|
|
|
|
modes.GCM(iv_buffer, encrypted_flow_data_tag),
|
|
|
|
|
backend=default_backend(),
|
|
|
|
|
)
|
2025-06-11 14:36:06 +05:30
|
|
|
decryptor = cipher.decryptor()
|
|
|
|
|
|
2025-06-11 15:04:41 +05:30
|
|
|
decrypted_json_string = (
|
|
|
|
|
decryptor.update(encrypted_flow_data_body) + decryptor.finalize()
|
|
|
|
|
)
|
2025-06-11 14:36:06 +05:30
|
|
|
|
|
|
|
|
return {
|
2025-06-11 15:04:41 +05:30
|
|
|
"decrypted_body": json.loads(decrypted_json_string.decode("utf-8")),
|
|
|
|
|
"aes_key_buffer": decrypted_aes_key,
|
|
|
|
|
"initial_vector_buffer": iv_buffer,
|
2025-06-11 14:36:06 +05:30
|
|
|
}
|
|
|
|
|
|
2025-06-11 15:04:41 +05:30
|
|
|
|
2025-06-11 14:36:06 +05:30
|
|
|
def encrypt_response(response, aes_key_buffer, initial_vector_buffer):
|
2025-06-11 15:04:41 +05:30
|
|
|
response_bytes = json.dumps(response).encode("utf-8")
|
2025-06-11 14:36:06 +05:30
|
|
|
|
|
|
|
|
flipped_iv = bytearray((b ^ 0xFF) for b in initial_vector_buffer)
|
|
|
|
|
|
|
|
|
|
cipher = Cipher(
|
|
|
|
|
algorithms.AES(aes_key_buffer),
|
|
|
|
|
modes.GCM(bytes(flipped_iv)),
|
2025-06-11 15:04:41 +05:30
|
|
|
backend=default_backend(),
|
2025-06-11 14:36:06 +05:30
|
|
|
)
|
|
|
|
|
encryptor = cipher.encryptor()
|
|
|
|
|
ciphertext = encryptor.update(response_bytes) + encryptor.finalize()
|
|
|
|
|
ciphertext_with_tag = ciphertext + encryptor.tag
|
2025-06-11 15:04:41 +05:30
|
|
|
encrypted_message = base64.b64encode(ciphertext_with_tag).decode("utf-8")
|
2025-06-11 14:36:06 +05:30
|
|
|
|
|
|
|
|
return encrypted_message
|
|
|
|
|
|
2025-06-11 15:04:41 +05:30
|
|
|
|
2025-06-11 14:36:06 +05:30
|
|
|
# -------------------------------------------------views.py ------------------------------------------------
|
|
|
|
|
|
|
|
|
|
import base64
|
|
|
|
|
import hashlib
|
|
|
|
|
import hmac
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
from base64 import b64decode, b64encode
|
2025-06-11 15:04:41 +05:30
|
|
|
|
|
|
|
|
from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, hashes
|
|
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
2025-06-11 14:36:06 +05:30
|
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
2025-06-11 15:04:41 +05:30
|
|
|
from django.http import HttpResponse, JsonResponse
|
2025-06-11 14:36:06 +05:30
|
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
|
|
|
|
|
|
# Load the private key string
|
|
|
|
|
# PRIVATE_KEY = os.environ.get('PRIVATE_KEY')
|
|
|
|
|
# Example:
|
|
|
|
|
# '''-----BEGIN RSA PRIVATE KEY-----
|
|
|
|
|
# MIIE...
|
|
|
|
|
# ...
|
|
|
|
|
# ...AQAB
|
|
|
|
|
# -----END RSA PRIVATE KEY-----'''
|
|
|
|
|
|
|
|
|
|
# PRIVATE_KEY = settings.PRIVATE_KEY
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# @csrf_exempt
|
|
|
|
|
# def end_point(request):
|
|
|
|
|
# try:
|
|
|
|
|
# # Parse the request body
|
|
|
|
|
# body = json.loads(request.body)
|
|
|
|
|
|
|
|
|
|
# # Read the request fields
|
|
|
|
|
# encrypted_flow_data_b64 = body["encrypted_flow_data"]
|
|
|
|
|
# encrypted_aes_key_b64 = body["encrypted_aes_key"]
|
|
|
|
|
# initial_vector_b64 = body["initial_vector"]
|
|
|
|
|
|
|
|
|
|
# decrypted_data, aes_key, iv = decrypt_request(
|
|
|
|
|
# encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64
|
|
|
|
|
# )
|
|
|
|
|
|
|
|
|
|
# # Return the next screen & data to the client
|
|
|
|
|
# if decrypted_data["action"] == "ping":
|
|
|
|
|
# response = {"data": {"status": "active"}}
|
|
|
|
|
|
|
|
|
|
# else:
|
|
|
|
|
# response = {"screen": "screen_one", "data": {"some_key": "some_value"}}
|
|
|
|
|
|
|
|
|
|
# # Return the response as plaintext
|
|
|
|
|
# return HttpResponse(
|
|
|
|
|
# encrypt_response(response, aes_key, iv), content_type="text/plain"
|
|
|
|
|
# )
|
|
|
|
|
# except Exception as e:
|
|
|
|
|
# print(e)
|
|
|
|
|
# return JsonResponse({}, status=500)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@csrf_exempt
|
|
|
|
|
def end_point(request):
|
|
|
|
|
try:
|
|
|
|
|
body = json.loads(request.body)
|
|
|
|
|
|
|
|
|
|
encrypted_flow_data_b64 = body["encrypted_flow_data"]
|
|
|
|
|
encrypted_aes_key_b64 = body["encrypted_aes_key"]
|
|
|
|
|
initial_vector_b64 = body["initial_vector"]
|
|
|
|
|
|
|
|
|
|
decrypted_data, aes_key, iv = decrypt_request(
|
|
|
|
|
encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64
|
|
|
|
|
)
|
|
|
|
|
if decrypted_data["action"] == "ping":
|
|
|
|
|
response = {"data": {"status": "active"}}
|
|
|
|
|
else:
|
|
|
|
|
leave_types = get_leave_types()
|
|
|
|
|
|
|
|
|
|
response = {"screen": "screen_one", "data": {"leave_types": leave_types}}
|
|
|
|
|
|
|
|
|
|
return HttpResponse(
|
|
|
|
|
encrypt_response(response, aes_key, iv), content_type="text/plain"
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(e)
|
|
|
|
|
return JsonResponse({}, status=500)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_leave_types():
|
|
|
|
|
"""
|
|
|
|
|
Fetch leave types from database or settings.
|
|
|
|
|
You can modify this function based on your data source.
|
|
|
|
|
"""
|
|
|
|
|
# Option 1: If you have a LeaveType model
|
|
|
|
|
try:
|
|
|
|
|
from leave.models import LeaveType
|
|
|
|
|
|
|
|
|
|
leave_types = list(LeaveType.objects.values("id", "name"))
|
|
|
|
|
# Convert any non-string IDs to strings to match the schema
|
|
|
|
|
for leave_type in leave_types:
|
|
|
|
|
leave_type["id"] = str(leave_type["id"])
|
|
|
|
|
leave_type["title"] = str(leave_type["name"])
|
|
|
|
|
del leave_type["name"]
|
|
|
|
|
|
|
|
|
|
return leave_types
|
|
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
# Option 2: Return hardcoded values if no model exists
|
|
|
|
|
return [
|
|
|
|
|
{"id": "1", "title": "Casual Leave"},
|
|
|
|
|
{"id": "2", "title": "Sick Leave"},
|
|
|
|
|
{"id": "3", "title": "Paid Leave"},
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt_request(encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64):
|
|
|
|
|
flow_data = b64decode(encrypted_flow_data_b64)
|
|
|
|
|
iv = b64decode(initial_vector_b64)
|
|
|
|
|
|
|
|
|
|
# Decrypt the AES encryption key
|
|
|
|
|
encrypted_aes_key = b64decode(encrypted_aes_key_b64)
|
|
|
|
|
private_key = load_pem_private_key(PRIVATE_KEY.encode("utf-8"), password=None)
|
|
|
|
|
aes_key = private_key.decrypt(
|
|
|
|
|
encrypted_aes_key,
|
|
|
|
|
OAEP(
|
|
|
|
|
mgf=MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Decrypt the Flow data
|
|
|
|
|
encrypted_flow_data_body = flow_data[:-16]
|
|
|
|
|
encrypted_flow_data_tag = flow_data[-16:]
|
|
|
|
|
decryptor = Cipher(
|
|
|
|
|
algorithms.AES(aes_key), modes.GCM(iv, encrypted_flow_data_tag)
|
|
|
|
|
).decryptor()
|
|
|
|
|
decrypted_data_bytes = (
|
|
|
|
|
decryptor.update(encrypted_flow_data_body) + decryptor.finalize()
|
|
|
|
|
)
|
|
|
|
|
decrypted_data = json.loads(decrypted_data_bytes.decode("utf-8"))
|
|
|
|
|
return decrypted_data, aes_key, iv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encrypt_response(response, aes_key, iv):
|
|
|
|
|
# Flip the initialization vector
|
|
|
|
|
flipped_iv = bytearray()
|
|
|
|
|
for byte in iv:
|
|
|
|
|
flipped_iv.append(byte ^ 0xFF)
|
|
|
|
|
|
|
|
|
|
# Encrypt the response data
|
|
|
|
|
encryptor = Cipher(algorithms.AES(aes_key), modes.GCM(flipped_iv)).encryptor()
|
|
|
|
|
return b64encode(
|
|
|
|
|
encryptor.update(json.dumps(response).encode("utf-8"))
|
|
|
|
|
+ encryptor.finalize()
|
|
|
|
|
+ encryptor.tag
|
|
|
|
|
).decode("utf-8")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_request_signature_valid(request, app_secret, request_body):
|
|
|
|
|
if not app_secret:
|
|
|
|
|
print(
|
|
|
|
|
"App Secret is not set up. Please Add your app secret in settings to check for request validation"
|
|
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
signature_header = request.META.get("HTTP_X_HUB_SIGNATURE_256", "")
|
|
|
|
|
if not signature_header.startswith("sha256="):
|
|
|
|
|
print("Invalid signature header")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
signature = signature_header[7:] # Remove "sha256="
|
|
|
|
|
|
|
|
|
|
hmac_digest = hmac.new(
|
|
|
|
|
app_secret.encode(), request_body.encode(), hashlib.sha256
|
|
|
|
|
).hexdigest()
|
|
|
|
|
|
|
|
|
|
return hmac.compare_digest(signature, hmac_digest)
|
|
|
|
|
|
2025-06-11 15:04:41 +05:30
|
|
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------------
|