timetracker/games/views/session.py

478 lines
14 KiB
Python

import operator
from functools import reduce
from json import dumps as json_dumps
from json import loads as json_loads
from typing import Any, NotRequired, TypeAlias, TypedDict, TypeGuard
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.db.models.query import QuerySet
from django.db.models.query_utils import Q
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import timezone
from typing_extensions import TypeGuard
from common.time import format_duration
from common.utils import A, Button, truncate_with_popover
from games.forms import SessionForm
from games.models import Purchase, Session
from games.views.general import (
dateformat,
datetimeformat,
durationformat,
durationformat_manual,
timeformat,
use_custom_redirect,
)
class Filter(TypedDict):
filter_id: str
filter_display: str
filter_string: str
def is_filter(obj: dict[Any, Any]) -> TypeGuard[Filter]:
return (
isinstance(obj, dict)
and "filter_id" in obj
and isinstance(obj["filter_id"], str)
and "filter_display" in obj
and isinstance(obj["filter_display"], str)
and "filter_string" in obj
and isinstance(obj["filter_string"], str)
)
FilterList: TypeAlias = list[Filter]
def is_filterlist(obj: list[Any]) -> TypeGuard[FilterList]:
return isinstance(obj, list) and all([is_filter(item) for item in obj])
ModelFilterSet: TypeAlias = list[dict[str, FilterList]]
class FieldFilter(TypedDict):
filtered_field: str
filtered_value: str
negated: NotRequired[bool]
filter: Filter
def is_fieldfilter(obj: dict) -> TypeGuard[FieldFilter]:
return (
isinstance(obj, dict)
and "filtered_field" in obj
and isinstance(obj["filtered_field"], str)
and "filtered_value" in obj
and isinstance(obj["filtered_value"], str)
and "filter" in obj
and is_filter(obj["filter"])
)
FilterSet: TypeAlias = list[FieldFilter]
def is_filterset(obj: list) -> TypeGuard[FilterSet]:
return isinstance(obj, list) and all([is_fieldfilter(item) for item in obj])
iexact_filter: Filter = {
"filter_id": "IEXACT",
"filter_display": "Equals (case-insensitive)",
"filter_string": "__iexact",
}
exact_filter: Filter = {
"filter_id": "EXACT",
"filter_display": "Equals (case-sensitive)",
"filter_string": "__exact",
}
isnull_filter: Filter = {
"filter_id": "ISNULL",
"filter_display": "Is null",
"filter_string": "__isnull",
}
contains_filter: Filter = {
"filter_id": "CONTAINS",
"filter_display": "Contains",
"filter_string": "__contains",
}
startswith_filter: Filter = {
"filter_id": "STARTSWITH",
"filter_display": "Starts with",
"filter_string": "__startswith",
}
endswith_filter: Filter = {
"filter_id": "ENDSWITH",
"filter_display": "Ends with",
"filter_string": "__endswith",
}
gt_filter: Filter = {
"filter_id": "GT",
"filter_display": "Greater than",
"filter_string": "__gt",
}
lt_filter: Filter = {
"filter_id": "LT",
"filter_display": "Lesser than",
"filter_string": "__lt",
}
year_gt_filter: Filter = {
"filter_id": "YEARGT",
"filter_display": "Greater than",
"filter_string": "__year__gt",
}
year_lt_filter: Filter = {
"filter_id": "YEARLT",
"filter_display": "Lesser than",
"filter_string": "__year__lt",
}
year_exact_filter: Filter = {
"filter_id": "YEAREXACT",
"filter_display": "Equals (case-sensitive)",
"filter_string": "__year__exact",
}
defined_filters = [
iexact_filter,
exact_filter,
isnull_filter,
contains_filter,
startswith_filter,
endswith_filter,
gt_filter,
lt_filter,
year_gt_filter,
year_lt_filter,
year_exact_filter,
]
defined_filters_list = {list["filter_id"]: list for list in defined_filters}
char_filter: FilterList = [
iexact_filter,
isnull_filter,
contains_filter,
startswith_filter,
endswith_filter,
]
text_filter: FilterList = [
isnull_filter,
contains_filter,
]
num_filter: FilterList = [exact_filter, gt_filter, lt_filter]
date_filter: FilterList = [
year_exact_filter,
isnull_filter,
year_gt_filter,
year_lt_filter,
]
conditions = ["and", "or"]
session_filters: ModelFilterSet = [
{"name": char_filter},
{"timestamp_start": date_filter},
{"timestamp_end": date_filter},
{"duration_manual": num_filter},
{"duration_calculated": num_filter},
{"note": text_filter},
{"device": char_filter},
{"created_at": date_filter},
{"modified_at": date_filter},
]
name_contains_age: FieldFilter = {
"filtered_field": "name",
"filtered_value": "age",
"filter": contains_filter,
}
simple_example_filter: FilterSet = [name_contains_age]
timestamp_start_year_2024: FieldFilter = {
"filtered_field": "timestamp_start",
"filtered_value": "2024",
"filter": year_exact_filter,
}
physical_only: FieldFilter = {
"filtered_field": "purchase__ownership_type",
"filtered_value": "ph",
"filter": exact_filter,
}
def negate_filter(filter: FieldFilter) -> FieldFilter:
return {**filter, "negated": True}
without_physical: FieldFilter = negate_filter(physical_only)
combined_example_filter: FilterSet = [name_contains_age, timestamp_start_year_2024]
combined_with_negated_example_filter = [timestamp_start_year_2024, without_physical]
def string_to_dict(s: str) -> dict[str, str]:
key, value = s.split("=")
return {key: value}
def create_django_filter_dict(
filter: Filter, field: str, filtered_value: str
) -> dict[str, str]:
"""
Creates a dict that can be used with the Django
filter function by unpacking it:
Model.objects.filter(**return_value)
"""
if not is_filter(filter):
raise ValueError("filter is not of type Filter")
return {f"{field}{filter["filter_string"]}": filtered_value}
def join_filter_with_condition(filters: FilterSet, condition: str):
if not is_filterset(filters):
raise ValueError("filters is not FilterSet")
conditions = {"AND": operator.and_, "OR": operator.or_, "XOR": operator.xor}
condition = condition.upper()
if condition not in conditions:
raise ValueError(f"Condition '{condition}' not one of '{conditions.keys()}'.")
q_objects: list[Q] = []
for filter_item in filters:
q = Q(
**create_django_filter_dict(
filter_item["filter"],
filter_item["filtered_field"],
filter_item["filtered_value"],
)
)
if filter_item.get("negated", False):
q = ~q
q_objects.append(q)
return reduce(conditions[condition], q_objects)
def apply_filters(
filters: FilterSet,
queryset: QuerySet[Any],
) -> QuerySet[Any] | None:
if len(filters) == 0:
return queryset
if type(filters) is not list:
raise ValueError("filters argument not of type list")
# TODO: modify FilterSet so it includes the condition to use
# so we can remove the hard-coding of "AND" here
return queryset.filter(join_filter_with_condition(filters, "AND"))
def filters_to_string(filters: FilterSet) -> str:
constructed_filters: list[dict[str, str | bool]] = []
for filter in filters:
constructed_filters.append(
{
"id": filter["filter"]["filter_id"],
"field": filter["filtered_field"],
"value": filter["filtered_value"],
"negated": filter.get("negated", False),
}
)
return json_dumps(constructed_filters)
def string_to_filters(filter_string: str) -> FilterSet:
obj = json_loads(filter_string)
filters = [
{
"filter": defined_filters_list[item["id"]],
"filtered_field": item["field"],
"filtered_value": item["value"],
"negated": item.get("negated", False),
}
for item in obj
]
if not is_filterset(filters):
raise ValueError("filters is not of type FilterSet")
return filters
@login_required
def list_sessions(request: HttpRequest) -> HttpResponse:
context: dict[Any, Any] = {}
page_number = request.GET.get("page", 1)
limit = request.GET.get("limit", 10)
filters = request.GET.get("filters", "")
sessions = Session.objects.order_by("-timestamp_start")
if filters != "":
filter_obj = string_to_filters(filters)
sessions = apply_filters(filter_obj, queryset=sessions)
page_obj = None
if int(limit) != 0:
paginator = Paginator(sessions, limit)
page_obj = paginator.get_page(page_number)
sessions = page_obj.object_list
context = {
"title": "Manage sessions",
"page_obj": page_obj or None,
"elided_page_range": (
page_obj.paginator.get_elided_page_range(
page_number, on_each_side=1, on_ends=1
)
if page_obj
else None
),
"data": {
"header_action": A([], Button([], "Add session"), url="add_session"),
"columns": [
"Name",
"Date",
"Duration",
"Duration (manual)",
"Device",
"Created",
"Actions",
],
"rows": [
[
A(
children=truncate_with_popover(session.purchase.edition.name),
url=reverse(
"view_game",
args=[session.purchase.edition.game.pk],
),
),
f"{session.timestamp_start.strftime(datetimeformat)}{f"{session.timestamp_end.strftime(timeformat)}" if session.timestamp_end else ""}",
(
format_duration(session.duration_calculated, durationformat)
if session.duration_calculated
else "-"
),
(
format_duration(session.duration_manual, durationformat_manual)
if session.duration_manual
else "-"
),
session.device,
session.created_at.strftime(dateformat),
render_to_string(
"cotton/button_group_sm.html",
{
"buttons": [
{
"href": reverse("edit_session", args=[session.pk]),
"text": "Edit",
"color": "gray",
},
{
"href": reverse(
"delete_session", args=[session.pk]
),
"text": "Delete",
"color": "red",
},
]
},
),
]
for session in sessions
],
},
}
return render(request, "list_purchases.html", context)
@login_required
def add_session(request: HttpRequest, purchase_id: int = 0) -> HttpResponse:
context = {}
initial: dict[str, Any] = {"timestamp_start": timezone.now()}
last = Session.objects.last()
if last != None:
initial["purchase"] = last.purchase
if request.method == "POST":
form = SessionForm(request.POST or None, initial=initial)
if form.is_valid():
form.save()
return redirect("list_sessions")
else:
if purchase_id:
purchase = Purchase.objects.get(id=purchase_id)
form = SessionForm(
initial={
**initial,
"purchase": purchase,
}
)
else:
form = SessionForm(initial=initial)
context["title"] = "Add New Session"
context["form"] = form
return render(request, "add_session.html", context)
@login_required
@use_custom_redirect
def edit_session(request: HttpRequest, session_id: int) -> HttpResponse:
context = {}
session = get_object_or_404(Session, id=session_id)
form = SessionForm(request.POST or None, instance=session)
if form.is_valid():
form.save()
return redirect("list_sessions")
context["title"] = "Edit Session"
context["form"] = form
return render(request, "add_session.html", context)
def clone_session_by_id(session_id: int) -> Session:
session = get_object_or_404(Session, id=session_id)
clone = session
clone.pk = None
clone.timestamp_start = timezone.now()
clone.timestamp_end = None
clone.note = ""
clone.save()
return clone
@login_required
@use_custom_redirect
def new_session_from_existing_session(
request: HttpRequest, session_id: int, template: str = ""
) -> HttpResponse:
session = clone_session_by_id(session_id)
if request.htmx:
context = {
"session": session,
"session_count": int(request.GET.get("session_count", 0)) + 1,
}
return render(request, template, context)
return redirect("list_sessions")
@login_required
@use_custom_redirect
def end_session(
request: HttpRequest, session_id: int, template: str = ""
) -> HttpResponse:
session = get_object_or_404(Session, id=session_id)
session.timestamp_end = timezone.now()
session.save()
if request.htmx:
context = {
"session": session,
"session_count": request.GET.get("session_count", 0),
}
return render(request, template, context)
return redirect("list_sessions")
@login_required
def delete_session(request: HttpRequest, session_id: int = 0) -> HttpResponse:
session = get_object_or_404(Session, id=session_id)
session.delete()
return redirect("list_sessions")