import importlib from collections import defaultdict from django import forms from django.apps import apps from django.contrib import messages from django.contrib.admin.utils import NestedObjects from django.core.cache import cache as CACHE from django.db import router from django.http import HttpResponse, JsonResponse from django.shortcuts import render from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.csrf import csrf_protect from base.methods import eval_validate from horilla.signals import post_generic_delete, pre_generic_delete from horilla_views import models from horilla_views.cbv_methods import get_short_uuid, login_required, merge_dicts from horilla_views.forms import SavedFilterForm from horilla_views.generic.cbv.views import HorillaFormView, HorillaListView # Create your views here. @method_decorator(login_required, name="dispatch") class ToggleColumn(View): """ ToggleColumn """ def get(self, *args, **kwargs): """ method to toggle columns """ query_dict = self.request.GET path = query_dict["path"] query_dict = dict(query_dict) del query_dict["path"] hidden_fields = [key for key, value in query_dict.items() if value[0]] existing_instance = models.ToggleColumn.objects.filter( user_id=self.request.user, path=path ).first() instance = models.ToggleColumn() if not existing_instance else existing_instance instance.path = path instance.excluded_columns = hidden_fields instance.save() return HttpResponse("success") @method_decorator(login_required, name="dispatch") class ReloadField(View): """ ReloadField """ def get(self, request, *args, **kwargs): """ Http method to reload dynamic create fields """ class_path = request.GET["form_class_path"] reload_field = request.GET["dynamic_field"] module_name, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_name) parent_form = getattr(module, class_name)() dynamic_cache = CACHE.get(request.session.session_key + "cbv" + reload_field) onchange = CACHE.get( request.session.session_key + "cbv" + reload_field + "onchange" ) if not onchange: onchange = "" model: models.HorillaModel = dynamic_cache["model"] value = dynamic_cache.get("value", "") cache_field = dynamic_cache["dynamic_field"] if cache_field != reload_field: cache_field = reload_field field = parent_form.fields[cache_field] queryset = model.objects.all() queryset = field.queryset choices = [(instance.id, instance) for instance in queryset] choices.insert(0, ("", "Select option")) choices.append(("dynamic_create", "Dynamic create")) form_field = forms.ChoiceField if isinstance(field, forms.ModelMultipleChoiceField): form_field = forms.MultipleChoiceField dynamic_initial = request.GET.get("dynamic_initial", []) value = eval_validate(f"""[{dynamic_cache["value"]},{dynamic_initial}]""") else: if not value and self.request.GET.get("dynamic_initial"): value = eval_validate(self.request.GET.get("dynamic_initial")) parent_form.fields[cache_field] = form_field( choices=choices, label=field.label, required=field.required, ) parent_form.fields[cache_field].widget.option_template_name = ( "horilla_widgets/select_option.html", ) parent_form.fields[cache_field].widget.attrs = field.widget.attrs parent_form.fields[cache_field].initial = value parent_form.fields[cache_field].widget.attrs["onchange"] = onchange field = parent_form[cache_field] dynamic_id: str = get_short_uuid(4) return render( request, "generic/reload_select_field.html", {"field": field, "dynamic_id": dynamic_id}, ) @method_decorator(login_required, name="dispatch") class ActiveTab(View): def get(self, *args, **kwargs): """ CBV method to handle active tab """ path = self.request.GET.get("path") target = self.request.GET.get("target") if path and target and self.request.user: existing_instance = models.ActiveTab.objects.filter( created_by=self.request.user, path=path ).first() instance = ( models.ActiveTab() if not existing_instance else existing_instance ) instance.path = path instance.tab_target = target instance.save() return JsonResponse({"message": "Success"}) @method_decorator(login_required, name="dispatch") class ActiveGroup(View): def get(self, *args, **kwargs): """ ActiveGroup """ path = self.request.GET.get("path") target = self.request.GET.get("target") group_field = self.request.GET.get("field") if path and target and group_field and self.request.user: existing_instance = models.ActiveGroup.objects.filter( created_by=self.request.user, path=path, group_by_field=group_field, ).first() instance = ( models.ActiveGroup() if not existing_instance else existing_instance ) instance.path = path instance.group_by_field = group_field instance.group_target = target instance.save() return JsonResponse({"message": "Success"}) @method_decorator(login_required, name="dispatch") class SavedFilter(HorillaFormView): """ SavedFilter """ model = models.SavedFilter form_class = SavedFilterForm new_display_title = "Save Applied Filter" template_name = "generic/saved_filter_form.html" form_disaply_attr = "Blah" def form_valid(self, form: SavedFilterForm) -> HttpResponse: referrer = self.request.POST.get("referrer", "") path = self.request.POST.get("path", "/") result_dict = {key: value[0] for key, value in self.request.GET.lists()} if form.is_valid(): instance: models.SavedFilter = form.save(commit=False) if not instance.pk: instance.path = path instance.referrer = referrer instance.filter = result_dict instance.urlencode = self.request.GET.urlencode() instance.save() messages.success(self.request, "Filter Saved") return self.HttpResponse() return super().form_valid(form) def get_context_data(self, **kwargs) -> dict: context = super().get_context_data(**kwargs) referrer = self.request.GET.get("referrer", "") if referrer: # Remove the protocol and domain part referrer = "/" + "/".join(referrer.split("/")[3:]) context["path"] = self.request.GET.get("path", "") context["referrer"] = referrer return context @method_decorator(login_required, name="dispatch") class DeleteSavedFilter(View): """ Delete saved filter """ def get(self, *args, **kwargs): pk = kwargs["pk"] models.SavedFilter.objects.filter(created_by=self.request.user, pk=pk).delete() return HttpResponse("") @method_decorator(login_required, name="dispatch") class ActiveView(View): """ ActiveView CBV """ def get(self, *args, **kwargs): path = self.request.GET["path"] view_type = self.request.GET["view"] active_view = models.ActiveView.objects.filter( path=path, created_by=self.request.user ).first() active_view = active_view if active_view else models.ActiveView() active_view.path = path active_view.type = view_type active_view.save() return HttpResponse("") @method_decorator(login_required, name="dispatch") @method_decorator(csrf_protect, name="dispatch") class SearchInIds(View): """ Search in ids view """ def get(self, *args, **kwargs): """ Search in instance ids method """ cache_key = f"{self.request.session.session_key}search_in_instance_ids" context: dict = CACHE.get(cache_key) context["instances"] = context["filter_class"](self.request.GET).qs return render(self.request, "generic/filter_result.html", context) @method_decorator(login_required, name="dispatch") class LastAppliedFilter(View): """ Class view to handle last applied filter """ def get(self, *args, **kwargs): """ Get method """ nav_path = self.request.GET.get( "nav_url", ) if nav_path: CACHE.set( self.request.session.session_key + "last-applied-filter" + nav_path, self.request.GET, timeout=600, ) return HttpResponse("success") class DynamiListView(HorillaListView): """ DynamicListView for Generic Delete """ instances = [] def get_queryset(self): search = self.request.GET.get("search", "") def _search_filter(instance): return search in str(instance).lower() return filter(_search_filter, self.instances) class HorillaDeleteConfirmationView(View): """ Generic Delete Confirmation View """ confirmation_target = "deleteConfirmationBody" def get(self, *args, **kwargs): """ GET method """ from horilla.urls import path, urlpatterns pk = self.request.GET["pk"] app, MODEL_NAME = self.request.GET["model"].split(".") if not self.request.user.has_perm(app + ".delete_" + MODEL_NAME.lower()): return render(self.request, "no_perm.html") model = apps.get_model(app, MODEL_NAME) delete_object = model.objects.get(pk=pk) objs = [delete_object] using = router.db_for_write(delete_object._meta.model) collector = NestedObjects(using=using, origin=objs) collector.collect(objs) MODEL_MAP = {} PROTECTED_MODEL_MAP = {} DYNAMIC_PATH_MAP = {} MODEL_RELATED_FIELD_MAP = {} MODEL_RELATED_PROTECTED_FIELD_MAP = {} def format_callback(instance, protected=False): if not MODEL_RELATED_FIELD_MAP.get(instance._meta.model): MODEL_RELATED_FIELD_MAP[instance._meta.model] = [] MODEL_RELATED_PROTECTED_FIELD_MAP[instance._meta.model] = [] def find_related_field(obj, related_instance): for field in obj._meta.get_fields(): # Check if the field is a foreign key (or related model) if isinstance( field, (models.models.ForeignKey, models.models.OneToOneField) ): # Get the field value field_value = getattr(obj, field.name) # If the field value matches the related instance, return the field name if field_value == related_instance: if "PROTECT" in field.remote_field.on_delete.__name__: MODEL_RELATED_PROTECTED_FIELD_MAP[ instance._meta.model ].append((field.name, field.verbose_name)) MODEL_RELATED_FIELD_MAP[instance._meta.model].append( field.name ) find_related_field(instance, delete_object) app_label = instance._meta.app_label app_label = apps.get_app_config(app_label).verbose_name model = instance._meta.model model.verbose_name = model.__name__.split("_")[0] model_map = PROTECTED_MODEL_MAP if protected else MODEL_MAP if app_label not in model_map: model_map[app_label] = {} if model not in model_map[app_label]: model_map[app_label][model] = [] DYNAMIC_PATH_MAP[model.verbose_name] = ( f"{get_short_uuid(prefix='generic-delete',length=10)}" ) class DynamiListView(HorillaListView): """ DynamicListView for Generic Delete """ instances = [] columns = [ ( "Record", "dynamic_display_name_generic_delete", ), ] records_per_page = 5 selected_instances_key_id = "storedIds" + app_label def dynamic_display_name_generic_delete(self): is_protected = False classname = self.__class__.__name__ app_label = self._meta.app_label app_verbose_name = apps.get_app_config(app_label).verbose_name protected = PROTECTED_MODEL_MAP.get(app_verbose_name, {}).get( self._meta.model, [] ) ids = [instance.pk for instance in protected] if self.pk in ids: is_protected = True if "_" in classname: field_name = classname.split("_", 1)[1] classname = classname.split("_")[0] object_field_name = classname.lower() model = apps.get_model(app_label, classname) field = model._meta.get_field(field_name) return f""" {getattr(self, object_field_name)} (In {field.verbose_name}) """ indication = f""" {self} """ if is_protected: verbose_names = [ str(i[1]) for i in list( set( MODEL_RELATED_PROTECTED_FIELD_MAP.get( self._meta.model, "" ) ) ) ] indication = ( indication + f""" (Record in {",".join(verbose_names)}) """ ) return indication def __init__(self, **kwargs): super().__init__(**kwargs) self._saved_filters = self.request.GET def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["search_url"] = "/" + self.search_url return context def get_queryset(self): search = self.request.GET.get("search", "") def _search_filter(instance): return search in str(instance).lower() self.instances = list( set( ( self.instances + MODEL_MAP.get( apps.get_app_config( self.model._meta.app_label ).verbose_name, {}, ).get(self.model, []) + PROTECTED_MODEL_MAP.get( apps.get_app_config( self.model._meta.app_label ).verbose_name, {}, ).get(self.model, []) ) ) ) queryset = self.model.objects.filter( pk__in=[ instance.pk for instance in filter(_search_filter, self.instances) ] ) return queryset model.dynamic_display_name_generic_delete = ( DynamiListView.dynamic_display_name_generic_delete ) DynamiListView.model = model if "_" in model.__name__: DynamiListView.bulk_update_fields = [MODEL_NAME.lower()] else: DynamiListView.bulk_update_fields = MODEL_RELATED_FIELD_MAP.get( model, [] ) DynamiListView.instances = model_map[app_label][model] DynamiListView.search_url = DYNAMIC_PATH_MAP[model.verbose_name] DynamiListView.selected_instances_key_id = ( DynamiListView.selected_instances_key_id + model.verbose_name ) urlpatterns.append( path( DynamiListView.search_url, DynamiListView.as_view(), name=DynamiListView.search_url, ) ) model_map[app_label][model].append(instance) return instance _to_delete = collector.nested(format_callback) protected = [ format_callback(obj, protected=True) for obj in collector.protected ] model_count = { model._meta.verbose_name_plural: len(objs) for model, objs in collector.model_objs.items() } protected_model_count = defaultdict(int) for obj in collector.protected: model = type(obj) protected_model_count[model._meta.verbose_name_plural] += 1 protected_model_count = dict(protected_model_count) context = { "model_map": merge_dicts(MODEL_MAP, PROTECTED_MODEL_MAP), "dynamic_list_path": DYNAMIC_PATH_MAP, "delete_object": delete_object, "protected": protected, "model_count_sum": sum(model_count.values()), "related_objects_count": model_count, "protected_objects_count": protected_model_count, } for key, value in self.get_context_data().items(): context[key] = value return render(self.request, "generic/delete_confirmation.html", context) def post(self, *args, **kwargs): """ Post method to handle the delete """ pk = self.request.GET["pk"] app, MODEL_NAME = self.request.GET["model"].split(".") if not self.request.user.has_perm(app + ".delete_" + MODEL_NAME.lower()): return render(self.request, "no_perm.html") model = apps.get_model(app, MODEL_NAME) delete_object = model.objects.get(pk=pk) objs = [delete_object] using = router.db_for_write(delete_object._meta.model) collector = NestedObjects(using=using, origin=objs) collector.collect(objs) def delete_callback(instance, protected=False): try: if self.request.user.has_perm( f"{instance._meta.app_label}.delete_{instance._meta.model.__name__.lower()}" ): pre_generic_delete.send( sender=instance._meta.model, instance=instance, args=args, view_instance=self, kwargs=kwargs, ) instance.delete() post_generic_delete.send( sender=instance._meta.model, instance=instance, args=args, view_instance=self, kwargs=kwargs, ) messages.success(self.request, f"Deleted {instance}") else: messages.info( self.request, f"You don't have permission to delete {instance}" ) except: messages.error(self.request, f"Cannot delete : {instance}") # deleting protected objects for obj in collector.protected: delete_callback(obj, protected=True) # deleting related objects collector.nested(delete_callback) return HttpResponse( """ """ ) def get_context_data(self, **kwargs) -> dict: context = {} context["confirmation_target"] = self.confirmation_target return context