From c2f1d8fe0af687ff1565b54b3abe93a40ee4a50a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kucharczyk?= Date: Tue, 3 Sep 2024 15:25:57 +0200 Subject: [PATCH] add backend functionality --- games/views/session.py | 280 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 279 insertions(+), 1 deletion(-) diff --git a/games/views/session.py b/games/views/session.py index 0e85639..b859580 100644 --- a/games/views/session.py +++ b/games/views/session.py @@ -1,12 +1,19 @@ -from typing import Any +import operator +from functools import reduce +from json import dumps as json_dumps +from json import loads as json_loads +from typing import Any, NotRequired, TypeAlias, TypedDict, TypeGuard from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator +from django.db.models.query import QuerySet +from django.db.models.query_utils import Q from django.http import HttpRequest, HttpResponse from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import render_to_string from django.urls import reverse from django.utils import timezone +from typing_extensions import TypeGuard from common.time import format_duration from common.utils import A, Button, truncate_with_popover @@ -22,12 +29,283 @@ from games.views.general import ( ) +class Filter(TypedDict): + filter_id: str + filter_display: str + filter_string: str + + +def is_filter(obj: dict[Any, Any]) -> TypeGuard[Filter]: + return ( + isinstance(obj, dict) + and "filter_id" in obj + and isinstance(obj["filter_id"], str) + and "filter_display" in obj + and isinstance(obj["filter_display"], str) + and "filter_string" in obj + and isinstance(obj["filter_string"], str) + ) + + +FilterList: TypeAlias = list[Filter] + + +def is_filterlist(obj: list[Any]) -> TypeGuard[FilterList]: + return isinstance(obj, list) and all([is_filter(item) for item in obj]) + + +ModelFilterSet: TypeAlias = list[dict[str, FilterList]] + + +class FieldFilter(TypedDict): + filtered_field: str + filtered_value: str + negated: NotRequired[bool] + filter: Filter + + +def is_fieldfilter(obj: dict) -> TypeGuard[FieldFilter]: + return ( + isinstance(obj, dict) + and "filtered_field" in obj + and isinstance(obj["filtered_field"], str) + and "filtered_value" in obj + and isinstance(obj["filtered_value"], str) + and "filter" in obj + and is_filter(obj["filter"]) + ) + + +FilterSet: TypeAlias = list[FieldFilter] + + +def is_filterset(obj: list) -> TypeGuard[FilterSet]: + return isinstance(obj, list) and all([is_fieldfilter(item) for item in obj]) + + +iexact_filter: Filter = { + "filter_id": "IEXACT", + "filter_display": "Equals (case-insensitive)", + "filter_string": "__iexact", +} +exact_filter: Filter = { + "filter_id": "EXACT", + "filter_display": "Equals (case-sensitive)", + "filter_string": "__exact", +} +isnull_filter: Filter = { + "filter_id": "ISNULL", + "filter_display": "Is null", + "filter_string": "__isnull", +} +contains_filter: Filter = { + "filter_id": "CONTAINS", + "filter_display": "Contains", + "filter_string": "__contains", +} +startswith_filter: Filter = { + "filter_id": "STARTSWITH", + "filter_display": "Starts with", + "filter_string": "__startswith", +} +endswith_filter: Filter = { + "filter_id": "ENDSWITH", + "filter_display": "Ends with", + "filter_string": "__endswith", +} +gt_filter: Filter = { + "filter_id": "GT", + "filter_display": "Greater than", + "filter_string": "__gt", +} +lt_filter: Filter = { + "filter_id": "LT", + "filter_display": "Lesser than", + "filter_string": "__lt", +} +year_gt_filter: Filter = { + "filter_id": "YEARGT", + "filter_display": "Greater than", + "filter_string": "__year__gt", +} +year_lt_filter: Filter = { + "filter_id": "YEARLT", + "filter_display": "Lesser than", + "filter_string": "__year__lt", +} +year_exact_filter: Filter = { + "filter_id": "YEAREXACT", + "filter_display": "Equals (case-sensitive)", + "filter_string": "__year__exact", +} + +defined_filters = [ + iexact_filter, + exact_filter, + isnull_filter, + contains_filter, + startswith_filter, + endswith_filter, + gt_filter, + lt_filter, + year_gt_filter, + year_lt_filter, + year_exact_filter, +] + +defined_filters_list = {list["filter_id"]: list for list in defined_filters} + +char_filter: FilterList = [ + iexact_filter, + isnull_filter, + contains_filter, + startswith_filter, + endswith_filter, +] +text_filter: FilterList = [ + isnull_filter, + contains_filter, +] +num_filter: FilterList = [exact_filter, gt_filter, lt_filter] +date_filter: FilterList = [ + year_exact_filter, + isnull_filter, + year_gt_filter, + year_lt_filter, +] + +conditions = ["and", "or"] +session_filters: ModelFilterSet = [ + {"name": char_filter}, + {"timestamp_start": date_filter}, + {"timestamp_end": date_filter}, + {"duration_manual": num_filter}, + {"duration_calculated": num_filter}, + {"note": text_filter}, + {"device": char_filter}, + {"created_at": date_filter}, + {"modified_at": date_filter}, +] +name_contains_age: FieldFilter = { + "filtered_field": "name", + "filtered_value": "age", + "filter": contains_filter, +} +simple_example_filter: FilterSet = [name_contains_age] +timestamp_start_year_2024: FieldFilter = { + "filtered_field": "timestamp_start", + "filtered_value": "2024", + "filter": year_exact_filter, +} +physical_only: FieldFilter = { + "filtered_field": "purchase__ownership_type", + "filtered_value": "ph", + "filter": exact_filter, +} + + +def negate_filter(filter: FieldFilter) -> FieldFilter: + return {**filter, "negated": True} + + +without_physical: FieldFilter = negate_filter(physical_only) +combined_example_filter: FilterSet = [name_contains_age, timestamp_start_year_2024] +combined_with_negated_example_filter = [timestamp_start_year_2024, without_physical] + + +def string_to_dict(s: str) -> dict[str, str]: + key, value = s.split("=") + return {key: value} + + +def create_django_filter_dict( + filter: Filter, field: str, filtered_value: str +) -> dict[str, str]: + """ + Creates a dict that can be used with the Django + filter function by unpacking it: + Model.objects.filter(**return_value) + """ + if not is_filter(filter): + raise ValueError("filter is not of type Filter") + return {f"{field}{filter["filter_string"]}": filtered_value} + + +def join_filter_with_condition(filters: FilterSet, condition: str): + if not is_filterset(filters): + raise ValueError("filters is not FilterSet") + conditions = {"AND": operator.and_, "OR": operator.or_, "XOR": operator.xor} + condition = condition.upper() + if condition not in conditions: + raise ValueError(f"Condition '{condition}' not one of '{conditions.keys()}'.") + q_objects: list[Q] = [] + for filter_item in filters: + q = Q( + **create_django_filter_dict( + filter_item["filter"], + filter_item["filtered_field"], + filter_item["filtered_value"], + ) + ) + if filter_item.get("negated", False): + q = ~q + q_objects.append(q) + return reduce(conditions[condition], q_objects) + + +def apply_filters( + filters: FilterSet, + queryset: QuerySet[Any], +) -> QuerySet[Any] | None: + if len(filters) == 0: + return queryset + if type(filters) is not list: + raise ValueError("filters argument not of type list") + # TODO: modify FilterSet so it includes the condition to use + # so we can remove the hard-coding of "AND" here + return queryset.filter(join_filter_with_condition(filters, "AND")) + + +def filters_to_string(filters: FilterSet) -> str: + constructed_filters: list[dict[str, str | bool]] = [] + for filter in filters: + constructed_filters.append( + { + "id": filter["filter"]["filter_id"], + "field": filter["filtered_field"], + "value": filter["filtered_value"], + "negated": filter.get("negated", False), + } + ) + return json_dumps(constructed_filters) + + +def string_to_filters(filter_string: str) -> FilterSet: + obj = json_loads(filter_string) + filters = [ + { + "filter": defined_filters_list[item["id"]], + "filtered_field": item["field"], + "filtered_value": item["value"], + "negated": item.get("negated", False), + } + for item in obj + ] + if not is_filterset(filters): + raise ValueError("filters is not of type FilterSet") + return filters + + @login_required def list_sessions(request: HttpRequest) -> HttpResponse: context: dict[Any, Any] = {} page_number = request.GET.get("page", 1) limit = request.GET.get("limit", 10) + filters = request.GET.get("filters", "") sessions = Session.objects.order_by("-timestamp_start") + if filters != "": + filter_obj = string_to_filters(filters) + sessions = apply_filters(filter_obj, queryset=sessions) page_obj = None if int(limit) != 0: paginator = Paginator(sessions, limit)