Filters (backend and frontend) #74
|
@ -1,12 +1,19 @@
|
|||
from typing import Any
|
||||
import operator
|
||||
from functools import reduce
|
||||
from json import dumps as json_dumps
|
||||
from json import loads as json_loads
|
||||
from typing import Any, NotRequired, TypeAlias, TypedDict, TypeGuard
|
||||
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models.query import QuerySet
|
||||
from django.db.models.query_utils import Q
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.template.loader import render_to_string
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from typing_extensions import TypeGuard
|
||||
|
||||
from common.time import format_duration
|
||||
from common.utils import A, Button, truncate_with_popover
|
||||
|
@ -22,12 +29,283 @@ from games.views.general import (
|
|||
)
|
||||
|
||||
|
||||
class Filter(TypedDict):
|
||||
filter_id: str
|
||||
filter_display: str
|
||||
filter_string: str
|
||||
|
||||
|
||||
def is_filter(obj: dict[Any, Any]) -> TypeGuard[Filter]:
|
||||
return (
|
||||
isinstance(obj, dict)
|
||||
and "filter_id" in obj
|
||||
and isinstance(obj["filter_id"], str)
|
||||
and "filter_display" in obj
|
||||
and isinstance(obj["filter_display"], str)
|
||||
and "filter_string" in obj
|
||||
and isinstance(obj["filter_string"], str)
|
||||
)
|
||||
|
||||
|
||||
FilterList: TypeAlias = list[Filter]
|
||||
|
||||
|
||||
def is_filterlist(obj: list[Any]) -> TypeGuard[FilterList]:
|
||||
return isinstance(obj, list) and all([is_filter(item) for item in obj])
|
||||
|
||||
|
||||
ModelFilterSet: TypeAlias = list[dict[str, FilterList]]
|
||||
|
||||
|
||||
class FieldFilter(TypedDict):
|
||||
filtered_field: str
|
||||
filtered_value: str
|
||||
negated: NotRequired[bool]
|
||||
filter: Filter
|
||||
|
||||
|
||||
def is_fieldfilter(obj: dict) -> TypeGuard[FieldFilter]:
|
||||
return (
|
||||
isinstance(obj, dict)
|
||||
and "filtered_field" in obj
|
||||
and isinstance(obj["filtered_field"], str)
|
||||
and "filtered_value" in obj
|
||||
and isinstance(obj["filtered_value"], str)
|
||||
and "filter" in obj
|
||||
and is_filter(obj["filter"])
|
||||
)
|
||||
|
||||
|
||||
FilterSet: TypeAlias = list[FieldFilter]
|
||||
|
||||
|
||||
def is_filterset(obj: list) -> TypeGuard[FilterSet]:
|
||||
return isinstance(obj, list) and all([is_fieldfilter(item) for item in obj])
|
||||
|
||||
|
||||
iexact_filter: Filter = {
|
||||
"filter_id": "IEXACT",
|
||||
"filter_display": "Equals (case-insensitive)",
|
||||
"filter_string": "__iexact",
|
||||
}
|
||||
exact_filter: Filter = {
|
||||
"filter_id": "EXACT",
|
||||
"filter_display": "Equals (case-sensitive)",
|
||||
"filter_string": "__exact",
|
||||
}
|
||||
isnull_filter: Filter = {
|
||||
"filter_id": "ISNULL",
|
||||
"filter_display": "Is null",
|
||||
"filter_string": "__isnull",
|
||||
}
|
||||
contains_filter: Filter = {
|
||||
"filter_id": "CONTAINS",
|
||||
"filter_display": "Contains",
|
||||
"filter_string": "__contains",
|
||||
}
|
||||
startswith_filter: Filter = {
|
||||
"filter_id": "STARTSWITH",
|
||||
"filter_display": "Starts with",
|
||||
"filter_string": "__startswith",
|
||||
}
|
||||
endswith_filter: Filter = {
|
||||
"filter_id": "ENDSWITH",
|
||||
"filter_display": "Ends with",
|
||||
"filter_string": "__endswith",
|
||||
}
|
||||
gt_filter: Filter = {
|
||||
"filter_id": "GT",
|
||||
"filter_display": "Greater than",
|
||||
"filter_string": "__gt",
|
||||
}
|
||||
lt_filter: Filter = {
|
||||
"filter_id": "LT",
|
||||
"filter_display": "Lesser than",
|
||||
"filter_string": "__lt",
|
||||
}
|
||||
year_gt_filter: Filter = {
|
||||
"filter_id": "YEARGT",
|
||||
"filter_display": "Greater than",
|
||||
"filter_string": "__year__gt",
|
||||
}
|
||||
year_lt_filter: Filter = {
|
||||
"filter_id": "YEARLT",
|
||||
"filter_display": "Lesser than",
|
||||
"filter_string": "__year__lt",
|
||||
}
|
||||
year_exact_filter: Filter = {
|
||||
"filter_id": "YEAREXACT",
|
||||
"filter_display": "Equals (case-sensitive)",
|
||||
"filter_string": "__year__exact",
|
||||
}
|
||||
|
||||
defined_filters = [
|
||||
iexact_filter,
|
||||
exact_filter,
|
||||
isnull_filter,
|
||||
contains_filter,
|
||||
startswith_filter,
|
||||
endswith_filter,
|
||||
gt_filter,
|
||||
lt_filter,
|
||||
year_gt_filter,
|
||||
year_lt_filter,
|
||||
year_exact_filter,
|
||||
]
|
||||
|
||||
defined_filters_list = {list["filter_id"]: list for list in defined_filters}
|
||||
|
||||
char_filter: FilterList = [
|
||||
iexact_filter,
|
||||
isnull_filter,
|
||||
contains_filter,
|
||||
startswith_filter,
|
||||
endswith_filter,
|
||||
]
|
||||
text_filter: FilterList = [
|
||||
isnull_filter,
|
||||
contains_filter,
|
||||
]
|
||||
num_filter: FilterList = [exact_filter, gt_filter, lt_filter]
|
||||
date_filter: FilterList = [
|
||||
year_exact_filter,
|
||||
isnull_filter,
|
||||
year_gt_filter,
|
||||
year_lt_filter,
|
||||
]
|
||||
|
||||
conditions = ["and", "or"]
|
||||
session_filters: ModelFilterSet = [
|
||||
{"name": char_filter},
|
||||
{"timestamp_start": date_filter},
|
||||
{"timestamp_end": date_filter},
|
||||
{"duration_manual": num_filter},
|
||||
{"duration_calculated": num_filter},
|
||||
{"note": text_filter},
|
||||
{"device": char_filter},
|
||||
{"created_at": date_filter},
|
||||
{"modified_at": date_filter},
|
||||
]
|
||||
name_contains_age: FieldFilter = {
|
||||
"filtered_field": "name",
|
||||
"filtered_value": "age",
|
||||
"filter": contains_filter,
|
||||
}
|
||||
simple_example_filter: FilterSet = [name_contains_age]
|
||||
timestamp_start_year_2024: FieldFilter = {
|
||||
"filtered_field": "timestamp_start",
|
||||
"filtered_value": "2024",
|
||||
"filter": year_exact_filter,
|
||||
}
|
||||
physical_only: FieldFilter = {
|
||||
"filtered_field": "purchase__ownership_type",
|
||||
"filtered_value": "ph",
|
||||
"filter": exact_filter,
|
||||
}
|
||||
|
||||
|
||||
def negate_filter(filter: FieldFilter) -> FieldFilter:
|
||||
return {**filter, "negated": True}
|
||||
|
||||
|
||||
without_physical: FieldFilter = negate_filter(physical_only)
|
||||
combined_example_filter: FilterSet = [name_contains_age, timestamp_start_year_2024]
|
||||
combined_with_negated_example_filter = [timestamp_start_year_2024, without_physical]
|
||||
|
||||
|
||||
def string_to_dict(s: str) -> dict[str, str]:
|
||||
key, value = s.split("=")
|
||||
return {key: value}
|
||||
|
||||
|
||||
def create_django_filter_dict(
|
||||
filter: Filter, field: str, filtered_value: str
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Creates a dict that can be used with the Django
|
||||
filter function by unpacking it:
|
||||
Model.objects.filter(**return_value)
|
||||
"""
|
||||
if not is_filter(filter):
|
||||
raise ValueError("filter is not of type Filter")
|
||||
return {f"{field}{filter["filter_string"]}": filtered_value}
|
||||
|
||||
|
||||
def join_filter_with_condition(filters: FilterSet, condition: str):
|
||||
if not is_filterset(filters):
|
||||
raise ValueError("filters is not FilterSet")
|
||||
conditions = {"AND": operator.and_, "OR": operator.or_, "XOR": operator.xor}
|
||||
condition = condition.upper()
|
||||
if condition not in conditions:
|
||||
raise ValueError(f"Condition '{condition}' not one of '{conditions.keys()}'.")
|
||||
q_objects: list[Q] = []
|
||||
for filter_item in filters:
|
||||
q = Q(
|
||||
**create_django_filter_dict(
|
||||
filter_item["filter"],
|
||||
filter_item["filtered_field"],
|
||||
filter_item["filtered_value"],
|
||||
)
|
||||
)
|
||||
if filter_item.get("negated", False):
|
||||
q = ~q
|
||||
q_objects.append(q)
|
||||
return reduce(conditions[condition], q_objects)
|
||||
|
||||
|
||||
def apply_filters(
|
||||
filters: FilterSet,
|
||||
queryset: QuerySet[Any],
|
||||
) -> QuerySet[Any] | None:
|
||||
if len(filters) == 0:
|
||||
return queryset
|
||||
if type(filters) is not list:
|
||||
raise ValueError("filters argument not of type list")
|
||||
# TODO: modify FilterSet so it includes the condition to use
|
||||
# so we can remove the hard-coding of "AND" here
|
||||
return queryset.filter(join_filter_with_condition(filters, "AND"))
|
||||
|
||||
|
||||
def filters_to_string(filters: FilterSet) -> str:
|
||||
constructed_filters: list[dict[str, str | bool]] = []
|
||||
for filter in filters:
|
||||
constructed_filters.append(
|
||||
{
|
||||
"id": filter["filter"]["filter_id"],
|
||||
"field": filter["filtered_field"],
|
||||
"value": filter["filtered_value"],
|
||||
"negated": filter.get("negated", False),
|
||||
}
|
||||
)
|
||||
return json_dumps(constructed_filters)
|
||||
|
||||
|
||||
def string_to_filters(filter_string: str) -> FilterSet:
|
||||
obj = json_loads(filter_string)
|
||||
filters = [
|
||||
{
|
||||
"filter": defined_filters_list[item["id"]],
|
||||
"filtered_field": item["field"],
|
||||
"filtered_value": item["value"],
|
||||
"negated": item.get("negated", False),
|
||||
}
|
||||
for item in obj
|
||||
]
|
||||
if not is_filterset(filters):
|
||||
raise ValueError("filters is not of type FilterSet")
|
||||
return filters
|
||||
|
||||
|
||||
@login_required
|
||||
def list_sessions(request: HttpRequest) -> HttpResponse:
|
||||
context: dict[Any, Any] = {}
|
||||
page_number = request.GET.get("page", 1)
|
||||
limit = request.GET.get("limit", 10)
|
||||
filters = request.GET.get("filters", "")
|
||||
sessions = Session.objects.order_by("-timestamp_start")
|
||||
if filters != "":
|
||||
filter_obj = string_to_filters(filters)
|
||||
sessions = apply_filters(filter_obj, queryset=sessions)
|
||||
page_obj = None
|
||||
if int(limit) != 0:
|
||||
paginator = Paginator(sessions, limit)
|
||||
|
|
Loading…
Reference in New Issue