import operator from functools import reduce from json import dumps as json_dumps from json import loads as json_loads from typing import Any, NotRequired, TypeAlias, TypedDict, TypeGuard from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.db.models.query import QuerySet from django.db.models.query_utils import Q from django.http import HttpRequest, HttpResponse from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import render_to_string from django.urls import reverse from django.utils import timezone from typing_extensions import TypeGuard from common.time import format_duration from common.utils import A, Button, truncate_with_popover from games.forms import SessionForm from games.models import Purchase, Session from games.views.general import ( dateformat, datetimeformat, durationformat, durationformat_manual, timeformat, use_custom_redirect, ) class Filter(TypedDict): filter_id: str filter_display: str filter_string: str def is_filter(obj: dict[Any, Any]) -> TypeGuard[Filter]: return ( isinstance(obj, dict) and "filter_id" in obj and isinstance(obj["filter_id"], str) and "filter_display" in obj and isinstance(obj["filter_display"], str) and "filter_string" in obj and isinstance(obj["filter_string"], str) ) FilterList: TypeAlias = list[Filter] def is_filterlist(obj: list[Any]) -> TypeGuard[FilterList]: return isinstance(obj, list) and all([is_filter(item) for item in obj]) ModelFilterSet: TypeAlias = list[dict[str, FilterList]] class FieldFilter(TypedDict): filtered_field: str filtered_value: str negated: NotRequired[bool] filter: Filter def is_fieldfilter(obj: dict) -> TypeGuard[FieldFilter]: return ( isinstance(obj, dict) and "filtered_field" in obj and isinstance(obj["filtered_field"], str) and "filtered_value" in obj and isinstance(obj["filtered_value"], str) and "filter" in obj and is_filter(obj["filter"]) ) FilterSet: TypeAlias = list[FieldFilter] def is_filterset(obj: list) -> TypeGuard[FilterSet]: return isinstance(obj, list) and all([is_fieldfilter(item) for item in obj]) iexact_filter: Filter = { "filter_id": "IEXACT", "filter_display": "Equals (case-insensitive)", "filter_string": "__iexact", } exact_filter: Filter = { "filter_id": "EXACT", "filter_display": "Equals (case-sensitive)", "filter_string": "__exact", } isnull_filter: Filter = { "filter_id": "ISNULL", "filter_display": "Is null", "filter_string": "__isnull", } contains_filter: Filter = { "filter_id": "CONTAINS", "filter_display": "Contains", "filter_string": "__contains", } startswith_filter: Filter = { "filter_id": "STARTSWITH", "filter_display": "Starts with", "filter_string": "__startswith", } endswith_filter: Filter = { "filter_id": "ENDSWITH", "filter_display": "Ends with", "filter_string": "__endswith", } gt_filter: Filter = { "filter_id": "GT", "filter_display": "Greater than", "filter_string": "__gt", } lt_filter: Filter = { "filter_id": "LT", "filter_display": "Lesser than", "filter_string": "__lt", } year_gt_filter: Filter = { "filter_id": "YEARGT", "filter_display": "Greater than", "filter_string": "__year__gt", } year_lt_filter: Filter = { "filter_id": "YEARLT", "filter_display": "Lesser than", "filter_string": "__year__lt", } year_exact_filter: Filter = { "filter_id": "YEAREXACT", "filter_display": "Equals (case-sensitive)", "filter_string": "__year__exact", } defined_filters = [ iexact_filter, exact_filter, isnull_filter, contains_filter, startswith_filter, endswith_filter, gt_filter, lt_filter, year_gt_filter, year_lt_filter, year_exact_filter, ] defined_filters_list = {list["filter_id"]: list for list in defined_filters} char_filter: FilterList = [ iexact_filter, isnull_filter, contains_filter, startswith_filter, endswith_filter, ] text_filter: FilterList = [ isnull_filter, contains_filter, ] num_filter: FilterList = [exact_filter, gt_filter, lt_filter] date_filter: FilterList = [ year_exact_filter, isnull_filter, year_gt_filter, year_lt_filter, ] conditions = ["and", "or"] session_filters: ModelFilterSet = [ {"name": char_filter}, {"timestamp_start": date_filter}, {"timestamp_end": date_filter}, {"duration_manual": num_filter}, {"duration_calculated": num_filter}, {"note": text_filter}, {"device": char_filter}, {"created_at": date_filter}, {"modified_at": date_filter}, ] name_contains_age: FieldFilter = { "filtered_field": "name", "filtered_value": "age", "filter": contains_filter, } simple_example_filter: FilterSet = [name_contains_age] timestamp_start_year_2024: FieldFilter = { "filtered_field": "timestamp_start", "filtered_value": "2024", "filter": year_exact_filter, } physical_only: FieldFilter = { "filtered_field": "purchase__ownership_type", "filtered_value": "ph", "filter": exact_filter, } def negate_filter(filter: FieldFilter) -> FieldFilter: return {**filter, "negated": True} without_physical: FieldFilter = negate_filter(physical_only) combined_example_filter: FilterSet = [name_contains_age, timestamp_start_year_2024] combined_with_negated_example_filter = [timestamp_start_year_2024, without_physical] def string_to_dict(s: str) -> dict[str, str]: key, value = s.split("=") return {key: value} def create_django_filter_dict( filter: Filter, field: str, filtered_value: str ) -> dict[str, str]: """ Creates a dict that can be used with the Django filter function by unpacking it: Model.objects.filter(**return_value) """ if not is_filter(filter): raise ValueError("filter is not of type Filter") return {f"{field}{filter["filter_string"]}": filtered_value} def join_filter_with_condition(filters: FilterSet, condition: str): if not is_filterset(filters): raise ValueError("filters is not FilterSet") conditions = {"AND": operator.and_, "OR": operator.or_, "XOR": operator.xor} condition = condition.upper() if condition not in conditions: raise ValueError(f"Condition '{condition}' not one of '{conditions.keys()}'.") q_objects: list[Q] = [] for filter_item in filters: q = Q( **create_django_filter_dict( filter_item["filter"], filter_item["filtered_field"], filter_item["filtered_value"], ) ) if filter_item.get("negated", False): q = ~q q_objects.append(q) return reduce(conditions[condition], q_objects) def apply_filters( filters: FilterSet, queryset: QuerySet[Any], ) -> QuerySet[Any] | None: if len(filters) == 0: return queryset if type(filters) is not list: raise ValueError("filters argument not of type list") # TODO: modify FilterSet so it includes the condition to use # so we can remove the hard-coding of "AND" here return queryset.filter(join_filter_with_condition(filters, "AND")) def filters_to_string(filters: FilterSet) -> str: constructed_filters: list[dict[str, str | bool]] = [] for filter in filters: constructed_filters.append( { "id": filter["filter"]["filter_id"], "field": filter["filtered_field"], "value": filter["filtered_value"], "negated": filter.get("negated", False), } ) return json_dumps(constructed_filters) def string_to_filters(filter_string: str) -> FilterSet: obj = json_loads(filter_string) filters = [ { "filter": defined_filters_list[item["id"]], "filtered_field": item["field"], "filtered_value": item["value"], "negated": item.get("negated", False), } for item in obj ] if not is_filterset(filters): raise ValueError("filters is not of type FilterSet") return filters @login_required def list_sessions(request: HttpRequest) -> HttpResponse: context: dict[Any, Any] = {} page_number = request.GET.get("page", 1) limit = request.GET.get("limit", 10) filters = request.GET.get("filters", "") sessions = Session.objects.order_by("-timestamp_start") if filters != "": filter_obj = string_to_filters(filters) sessions = apply_filters(filter_obj, queryset=sessions) page_obj = None if int(limit) != 0: paginator = Paginator(sessions, limit) page_obj = paginator.get_page(page_number) sessions = page_obj.object_list context = { "title": "Manage sessions", "page_obj": page_obj or None, "elided_page_range": ( page_obj.paginator.get_elided_page_range( page_number, on_each_side=1, on_ends=1 ) if page_obj else None ), "data": { "header_action": A([], Button([], "Add session"), url="add_session"), "columns": [ "Name", "Date", "Duration", "Duration (manual)", "Device", "Created", "Actions", ], "rows": [ [ A( children=truncate_with_popover(session.purchase.edition.name), url=reverse( "view_game", args=[session.purchase.edition.game.pk], ), ), f"{session.timestamp_start.strftime(datetimeformat)}{f" — {session.timestamp_end.strftime(timeformat)}" if session.timestamp_end else ""}", ( format_duration(session.duration_calculated, durationformat) if session.duration_calculated else "-" ), ( format_duration(session.duration_manual, durationformat_manual) if session.duration_manual else "-" ), session.device, session.created_at.strftime(dateformat), render_to_string( "cotton/button_group_sm.html", { "buttons": [ { "href": reverse("edit_session", args=[session.pk]), "text": "Edit", "color": "gray", }, { "href": reverse( "delete_session", args=[session.pk] ), "text": "Delete", "color": "red", }, ] }, ), ] for session in sessions ], }, } return render(request, "list_purchases.html", context) @login_required def add_session(request: HttpRequest, purchase_id: int = 0) -> HttpResponse: context = {} initial: dict[str, Any] = {"timestamp_start": timezone.now()} last = Session.objects.last() if last != None: initial["purchase"] = last.purchase if request.method == "POST": form = SessionForm(request.POST or None, initial=initial) if form.is_valid(): form.save() return redirect("list_sessions") else: if purchase_id: purchase = Purchase.objects.get(id=purchase_id) form = SessionForm( initial={ **initial, "purchase": purchase, } ) else: form = SessionForm(initial=initial) context["title"] = "Add New Session" context["form"] = form return render(request, "add_session.html", context) @login_required @use_custom_redirect def edit_session(request: HttpRequest, session_id: int) -> HttpResponse: context = {} session = get_object_or_404(Session, id=session_id) form = SessionForm(request.POST or None, instance=session) if form.is_valid(): form.save() return redirect("list_sessions") context["title"] = "Edit Session" context["form"] = form return render(request, "add_session.html", context) def clone_session_by_id(session_id: int) -> Session: session = get_object_or_404(Session, id=session_id) clone = session clone.pk = None clone.timestamp_start = timezone.now() clone.timestamp_end = None clone.note = "" clone.save() return clone @login_required @use_custom_redirect def new_session_from_existing_session( request: HttpRequest, session_id: int, template: str = "" ) -> HttpResponse: session = clone_session_by_id(session_id) if request.htmx: context = { "session": session, "session_count": int(request.GET.get("session_count", 0)) + 1, } return render(request, template, context) return redirect("list_sessions") @login_required @use_custom_redirect def end_session( request: HttpRequest, session_id: int, template: str = "" ) -> HttpResponse: session = get_object_or_404(Session, id=session_id) session.timestamp_end = timezone.now() session.save() if request.htmx: context = { "session": session, "session_count": request.GET.get("session_count", 0), } return render(request, template, context) return redirect("list_sessions") @login_required def delete_session(request: HttpRequest, session_id: int = 0) -> HttpResponse: session = get_object_or_404(Session, id=session_id) session.delete() return redirect("list_sessions")