Files
ihrm/whatsapp/encryption.py

286 lines
9.7 KiB
Python
Raw Permalink Normal View History

import base64
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQCitHXNMGC04pSI
di+ynQjLDQd1LrExd1nYMxdOFnX/CnVEJGVEUKPdDm5M300ULugeFPx76MsjQx97
vWAgZDMEA2+WyKFAHqmgRvVoiESZZnduIj2akVjFZvA24sbFpGunaiMPq61kWUBR
0pFRw4pnXEbPQnqu9H5vEJ7W5+G6CDlJD3eeTTkM85/ch1/REJbSDyp+Tozrc0Wa
Mv5xMUbVtSjwxaqwSfdMbAfJxnaaZ5SKwgiUEIsLKsQISWX3+qq6USWOCbJs6oTV
eWgeZfxgOjCUovJsZ2TuJ4aNfMtI+dFm3OMaeB2ypa4F4lWu8pq6FVhTseQntfUu
fQv4P+iRAgMBAAECggEABGOp6ecsNLUIHMZTcxYZbqDjWp3v2c3GdraqIkko1cCK
eVQiBz3Frej9wMUlZy38xRL73Lvi/wiIiOYK+dS6K5mMIR04fGpXWSOQ60kB0MGa
5zW1Q744DttAD7r+ccaFwPZ0C7At9U8TFSIBGZuU2ET9BApfFOkzn/tqzZFj3Yjg
OgWaGCvtOGCjLgjN1CWRTq+U66SUuVEtm8cXX8o4hVGfy2ITdg10xW+88qgLqxLj
/2RKPTixjmlwp1/28Z0rotp4GFUU5yplDq86YkdYNF8wHIPx0NiEUiLmAtfZrpmM
0xyJVJbgWh8QASzOxM1lq8WOWOOhJnVnkowYJsxVgQKBgQDZisbd8n59rJQfDMbK
7/cQ0gedl7No+0taY30hSckeR73yxAvmz83jqiD6qOlbaCbb+Rx1PPKewRZybrQV
n0CdFJ5oB1lbtLY7ftIlGQx2MiI4e+lAiCn+Zo2FnIm+C6LJKWqR18vLlHKebbJK
IAHdW76roZsdqAyBSyn0fh2gXQKBgQC/d+/30lIBafNCc6oDXbZFrH9/sRqCEg1u
jOhgNjwYZ23IkBMghyhD3eAiymvUFYCQ2pUfSMqygBbFUay5h5/PF34vzJmSVwC0
6SVemod/2Z7ZcVUogplSdAW1+V4f/3pyIptgKAAqlsE7lAQJAiyd7FnXx8/Rx7Cw
IVh+Jz11xQKBgBaTOTn1HT1LeH+UYtjSeDAtq46mHH8rfNFfe6/FqXJT/ZlA0P9d
1z7l+9AnUTgkIcw4GMTt0zu4S+0KIfQQd7MVXa7r/FDw+uxHp+UjqVBmuXhlG3qP
5tO4rr0L1pt7N6RqgN2rqEFzIUXhmlvo4GipSasj9SXpt4p/U1ZE9CwdAoGBAKSU
5jMyGMeaWT4Pyl5mWV1+r4IFrHGOLvmOKdk6BWI81cOHBMn7JANiX13IffOqH/9j
xLdFjObu76PhVwWLrTUITrGrv35pRvQ7TKILVtnxKHhk0Pyndj/H93i6x8vdgVVG
piR7fdkeCS+7RdSwh8Wf+oJfASaj7h8YKscV1+C5An9SKKO185jufNw+6cToVvBG
ii91044lvc8XidFgR6t3M9cJS3pHNemIfg2dZCu7i2/UbAIrsMDt4Uv4QGOaRGr/
rZ7JoquIPXVTaUjSJLWOELUi9TDBB1F04duW/xNsKswrFIk+mT0jMO3zB/LyCT5m
BCyI7ou85astK8+e4Q12
-----END PRIVATE KEY-----
"""
PASSPHRASE = "Horilla"
APP_SECRET = "6c18bf72d5f486479bf66c5807c2b393"
class FlowEndpointException(Exception):
def __init__(self, status_code, message):
super().__init__(message)
self.status_code = status_code
def decrypt_request(body, private_pem, passphrase):
# Extract encrypted data from the request body
encrypted_aes_key = body["encrypted_aes_key"]
encrypted_flow_data = body["encrypted_flow_data"]
initial_vector = body["initial_vector"]
private_key = serialization.load_pem_private_key(
private_pem.encode(), password=None, backend=default_backend()
)
decrypted_aes_key = private_key.decrypt(
base64.b64decode(encrypted_aes_key),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
flow_data_buffer = base64.b64decode(encrypted_flow_data)
iv_buffer = base64.b64decode(initial_vector)
TAG_LENGTH = 16
encrypted_flow_data_body = flow_data_buffer[:-TAG_LENGTH]
encrypted_flow_data_tag = flow_data_buffer[-TAG_LENGTH:]
cipher = Cipher(
algorithms.AES(decrypted_aes_key),
modes.GCM(iv_buffer, encrypted_flow_data_tag),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted_json_string = (
decryptor.update(encrypted_flow_data_body) + decryptor.finalize()
)
return {
"decrypted_body": json.loads(decrypted_json_string.decode("utf-8")),
"aes_key_buffer": decrypted_aes_key,
"initial_vector_buffer": iv_buffer,
}
def encrypt_response(response, aes_key_buffer, initial_vector_buffer):
response_bytes = json.dumps(response).encode("utf-8")
flipped_iv = bytearray((b ^ 0xFF) for b in initial_vector_buffer)
cipher = Cipher(
algorithms.AES(aes_key_buffer),
modes.GCM(bytes(flipped_iv)),
backend=default_backend(),
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(response_bytes) + encryptor.finalize()
ciphertext_with_tag = ciphertext + encryptor.tag
encrypted_message = base64.b64encode(ciphertext_with_tag).decode("utf-8")
return encrypted_message
# -------------------------------------------------views.py ------------------------------------------------
import base64
import hashlib
import hmac
import json
import os
from base64 import b64decode, b64encode
from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt
# Load the private key string
# PRIVATE_KEY = os.environ.get('PRIVATE_KEY')
# Example:
# '''-----BEGIN RSA PRIVATE KEY-----
# MIIE...
# ...
# ...AQAB
# -----END RSA PRIVATE KEY-----'''
# PRIVATE_KEY = settings.PRIVATE_KEY
# @csrf_exempt
# def end_point(request):
# try:
# # Parse the request body
# body = json.loads(request.body)
# # Read the request fields
# encrypted_flow_data_b64 = body["encrypted_flow_data"]
# encrypted_aes_key_b64 = body["encrypted_aes_key"]
# initial_vector_b64 = body["initial_vector"]
# decrypted_data, aes_key, iv = decrypt_request(
# encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64
# )
# # Return the next screen & data to the client
# if decrypted_data["action"] == "ping":
# response = {"data": {"status": "active"}}
# else:
# response = {"screen": "screen_one", "data": {"some_key": "some_value"}}
# # Return the response as plaintext
# return HttpResponse(
# encrypt_response(response, aes_key, iv), content_type="text/plain"
# )
# except Exception as e:
# print(e)
# return JsonResponse({}, status=500)
@csrf_exempt
def end_point(request):
try:
body = json.loads(request.body)
encrypted_flow_data_b64 = body["encrypted_flow_data"]
encrypted_aes_key_b64 = body["encrypted_aes_key"]
initial_vector_b64 = body["initial_vector"]
decrypted_data, aes_key, iv = decrypt_request(
encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64
)
if decrypted_data["action"] == "ping":
response = {"data": {"status": "active"}}
else:
leave_types = get_leave_types()
response = {"screen": "screen_one", "data": {"leave_types": leave_types}}
return HttpResponse(
encrypt_response(response, aes_key, iv), content_type="text/plain"
)
except Exception as e:
print(e)
return JsonResponse({}, status=500)
def get_leave_types():
"""
Fetch leave types from database or settings.
You can modify this function based on your data source.
"""
# Option 1: If you have a LeaveType model
try:
from leave.models import LeaveType
leave_types = list(LeaveType.objects.values("id", "name"))
# Convert any non-string IDs to strings to match the schema
for leave_type in leave_types:
leave_type["id"] = str(leave_type["id"])
leave_type["title"] = str(leave_type["name"])
del leave_type["name"]
return leave_types
except ImportError:
# Option 2: Return hardcoded values if no model exists
return [
{"id": "1", "title": "Casual Leave"},
{"id": "2", "title": "Sick Leave"},
{"id": "3", "title": "Paid Leave"},
]
def decrypt_request(encrypted_flow_data_b64, encrypted_aes_key_b64, initial_vector_b64):
flow_data = b64decode(encrypted_flow_data_b64)
iv = b64decode(initial_vector_b64)
# Decrypt the AES encryption key
encrypted_aes_key = b64decode(encrypted_aes_key_b64)
private_key = load_pem_private_key(PRIVATE_KEY.encode("utf-8"), password=None)
aes_key = private_key.decrypt(
encrypted_aes_key,
OAEP(
mgf=MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None
),
)
# Decrypt the Flow data
encrypted_flow_data_body = flow_data[:-16]
encrypted_flow_data_tag = flow_data[-16:]
decryptor = Cipher(
algorithms.AES(aes_key), modes.GCM(iv, encrypted_flow_data_tag)
).decryptor()
decrypted_data_bytes = (
decryptor.update(encrypted_flow_data_body) + decryptor.finalize()
)
decrypted_data = json.loads(decrypted_data_bytes.decode("utf-8"))
return decrypted_data, aes_key, iv
def encrypt_response(response, aes_key, iv):
# Flip the initialization vector
flipped_iv = bytearray()
for byte in iv:
flipped_iv.append(byte ^ 0xFF)
# Encrypt the response data
encryptor = Cipher(algorithms.AES(aes_key), modes.GCM(flipped_iv)).encryptor()
return b64encode(
encryptor.update(json.dumps(response).encode("utf-8"))
+ encryptor.finalize()
+ encryptor.tag
).decode("utf-8")
def is_request_signature_valid(request, app_secret, request_body):
if not app_secret:
print(
"App Secret is not set up. Please Add your app secret in settings to check for request validation"
)
return True
signature_header = request.META.get("HTTP_X_HUB_SIGNATURE_256", "")
if not signature_header.startswith("sha256="):
print("Invalid signature header")
return False
signature = signature_header[7:] # Remove "sha256="
hmac_digest = hmac.new(
app_secret.encode(), request_body.encode(), hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, hmac_digest)
# ----------------------------------------------------------------------------------------------------------