Files
hrms/base/backends.py
2026-01-16 15:38:54 +01:00

268 lines
7.5 KiB
Python

"""
email_backend.py
This module is used to write email backends
"""
import importlib
import logging
from django.core.cache import cache
from django.core.mail import EmailMessage
from django.core.mail.backends.smtp import EmailBackend
from base.models import DynamicEmailConfiguration, EmailLog
from horilla import settings
from horilla.horilla_middlewares import _thread_locals
logger = logging.getLogger(__name__)
class DefaultHorillaMailBackend(EmailBackend):
def __init__(
self,
host=None,
port=None,
username=None,
password=None,
use_tls=None,
fail_silently=None,
use_ssl=None,
timeout=None,
ssl_keyfile=None,
ssl_certfile=None,
**kwargs,
):
self.configuration = self.get_dynamic_email_config()
ssl_keyfile = (
getattr(self.configuration, "ssl_keyfile", None)
if self.configuration
else ssl_keyfile or getattr(settings, "ssl_keyfile", None)
)
ssl_certfile = (
getattr(self.configuration, "ssl_certfile", None)
if self.configuration
else ssl_certfile or getattr(settings, "ssl_certfile", None)
)
super().__init__(
host=self.dynamic_host,
port=self.dynamic_port,
username=self.dynamic_username,
password=self.dynamic_password,
use_tls=self.dynamic_use_tls,
fail_silently=self.dynamic_fail_silently,
use_ssl=self.dynamic_use_ssl,
timeout=self.dynamic_timeout,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
**kwargs,
)
@staticmethod
def get_dynamic_email_config():
request = getattr(_thread_locals, "request", None)
company = None
if request and not request.user.is_anonymous:
company = request.user.employee_get.get_company()
configuration = DynamicEmailConfiguration.objects.filter(
company_id=company
).first()
if configuration is None:
configuration = DynamicEmailConfiguration.objects.filter(
is_primary=True
).first()
if configuration:
display_email_name = (
f"{configuration.display_name} <{configuration.from_email}>"
)
user_id = ""
if request:
if (
configuration.use_dynamic_display_name
and request.user.is_authenticated
):
display_email_name = f"{request.user.employee_get.get_full_name()} <{request.user.employee_get.get_email()}>"
if request.user.is_authenticated:
user_id = request.user.pk
reply_to = [
f"{request.user.employee_get.get_full_name()} <{request.user.employee_get.get_email()}>",
]
cache.set(f"reply_to{request.user.pk}", reply_to)
cache.set(f"dynamic_display_name{user_id}", display_email_name)
return configuration
@property
def dynamic_host(self):
return (
self.configuration.host
if self.configuration
else getattr(settings, "EMAIL_HOST", None)
)
@property
def dynamic_port(self):
return (
self.configuration.port
if self.configuration
else getattr(settings, "EMAIL_PORT", None)
)
@property
def dynamic_username(self):
return (
self.configuration.username
if self.configuration
else getattr(settings, "EMAIL_HOST_USER", None)
)
@property
def dynamic_mail_sent_from(self):
return (
self.configuration.from_email
if self.configuration
else getattr(settings, "DEFAULT_FROM_EMAIL", None)
)
@property
def dynamic_display_name(self):
return self.configuration.display_name if self.configuration else None
@property
def dynamic_from_email_with_display_name(self):
return (
f"{self.dynamic_display_name} <{self.dynamic_mail_sent_from}>"
if self.dynamic_display_name
else self.dynamic_mail_sent_from
)
@property
def dynamic_password(self):
return (
self.configuration.password
if self.configuration
else getattr(settings, "EMAIL_HOST_PASSWORD", None)
)
@property
def dynamic_use_tls(self):
return (
self.configuration.use_tls
if self.configuration
else getattr(settings, "EMAIL_USE_TLS", None)
)
@property
def dynamic_fail_silently(self):
return (
self.configuration.fail_silently
if self.configuration
else getattr(settings, "EMAIL_FAIL_SILENTLY", True)
)
@property
def dynamic_use_ssl(self):
return (
self.configuration.use_ssl
if self.configuration
else getattr(settings, "EMAIL_USE_SSL", None)
)
@property
def dynamic_timeout(self):
return (
self.configuration.timeout
if self.configuration
else getattr(settings, "EMAIL_TIMEOUT", None)
)
EMAIL_BACKEND = getattr(settings, "EMAIL_BACKEND", "")
BACKEND_CLASS: EmailBackend = DefaultHorillaMailBackend
default = "base.backends.ConfiguredEmailBackend"
setattr(BACKEND_CLASS, "send_messages", DefaultHorillaMailBackend.send_messages)
if EMAIL_BACKEND and EMAIL_BACKEND != default:
module_path, class_name = EMAIL_BACKEND.rsplit(".", 1)
module = importlib.import_module(module_path)
BACKEND_CLASS = getattr(module, class_name)
class ConfiguredEmailBackend(BACKEND_CLASS):
def send_messages(self, email_messages):
response = super(BACKEND_CLASS, self).send_messages(email_messages)
for message in email_messages:
email_log = EmailLog(
subject=message.subject,
from_email=self.dynamic_from_email_with_display_name,
to=message.to,
body=message.body,
status="sent" if response else "failed",
)
email_log.save()
return response
if EMAIL_BACKEND != default:
from_mail = getattr(settings, "DEFAULT_FROM_EMAIL", "example@gmail.com")
username = getattr(settings, "EMAIL_HOST_USER", "example@gmail.com")
ConfiguredEmailBackend.dynamic_username = from_mail
ConfiguredEmailBackend.dynamic_from_email_with_display_name = from_mail
__all__ = ["ConfiguredEmailBackend"]
message_init = EmailMessage.__init__
def new_init(
self,
subject="",
body="",
from_email=None,
to=None,
bcc=None,
connection=None,
attachments=None,
headers=None,
cc=None,
reply_to=None,
):
"""
custom __init_method to override
"""
request = getattr(_thread_locals, "request", None)
DefaultHorillaMailBackend()
user_id = ""
if request and request.user and request.user.is_authenticated:
user_id = request.user.pk
reply_to = cache.get(f"reply_to{user_id}") if not reply_to else reply_to
if not from_email:
from_email = cache.get(f"dynamic_display_name{user_id}")
message_init(
self,
subject=subject,
body=body,
from_email=from_email,
to=to,
bcc=bcc,
connection=connection,
attachments=attachments,
headers=headers,
cc=cc,
reply_to=reply_to,
)
EmailMessage.__init__ = new_init