478 lines
14 KiB
Python
478 lines
14 KiB
Python
import operator
|
|
from functools import reduce
|
|
from json import dumps as json_dumps
|
|
from json import loads as json_loads
|
|
from typing import Any, NotRequired, TypeAlias, TypedDict, TypeGuard
|
|
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core.paginator import Paginator
|
|
from django.db.models.query import QuerySet
|
|
from django.db.models.query_utils import Q
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.template.loader import render_to_string
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from typing_extensions import TypeGuard
|
|
|
|
from common.time import format_duration
|
|
from common.utils import A, Button, truncate_with_popover
|
|
from games.forms import SessionForm
|
|
from games.models import Purchase, Session
|
|
from games.views.general import (
|
|
dateformat,
|
|
datetimeformat,
|
|
durationformat,
|
|
durationformat_manual,
|
|
timeformat,
|
|
use_custom_redirect,
|
|
)
|
|
|
|
|
|
class Filter(TypedDict):
|
|
filter_id: str
|
|
filter_display: str
|
|
filter_string: str
|
|
|
|
|
|
def is_filter(obj: dict[Any, Any]) -> TypeGuard[Filter]:
|
|
return (
|
|
isinstance(obj, dict)
|
|
and "filter_id" in obj
|
|
and isinstance(obj["filter_id"], str)
|
|
and "filter_display" in obj
|
|
and isinstance(obj["filter_display"], str)
|
|
and "filter_string" in obj
|
|
and isinstance(obj["filter_string"], str)
|
|
)
|
|
|
|
|
|
FilterList: TypeAlias = list[Filter]
|
|
|
|
|
|
def is_filterlist(obj: list[Any]) -> TypeGuard[FilterList]:
|
|
return isinstance(obj, list) and all([is_filter(item) for item in obj])
|
|
|
|
|
|
ModelFilterSet: TypeAlias = list[dict[str, FilterList]]
|
|
|
|
|
|
class FieldFilter(TypedDict):
|
|
filtered_field: str
|
|
filtered_value: str
|
|
negated: NotRequired[bool]
|
|
filter: Filter
|
|
|
|
|
|
def is_fieldfilter(obj: dict) -> TypeGuard[FieldFilter]:
|
|
return (
|
|
isinstance(obj, dict)
|
|
and "filtered_field" in obj
|
|
and isinstance(obj["filtered_field"], str)
|
|
and "filtered_value" in obj
|
|
and isinstance(obj["filtered_value"], str)
|
|
and "filter" in obj
|
|
and is_filter(obj["filter"])
|
|
)
|
|
|
|
|
|
FilterSet: TypeAlias = list[FieldFilter]
|
|
|
|
|
|
def is_filterset(obj: list) -> TypeGuard[FilterSet]:
|
|
return isinstance(obj, list) and all([is_fieldfilter(item) for item in obj])
|
|
|
|
|
|
iexact_filter: Filter = {
|
|
"filter_id": "IEXACT",
|
|
"filter_display": "Equals (case-insensitive)",
|
|
"filter_string": "__iexact",
|
|
}
|
|
exact_filter: Filter = {
|
|
"filter_id": "EXACT",
|
|
"filter_display": "Equals (case-sensitive)",
|
|
"filter_string": "__exact",
|
|
}
|
|
isnull_filter: Filter = {
|
|
"filter_id": "ISNULL",
|
|
"filter_display": "Is null",
|
|
"filter_string": "__isnull",
|
|
}
|
|
contains_filter: Filter = {
|
|
"filter_id": "CONTAINS",
|
|
"filter_display": "Contains",
|
|
"filter_string": "__contains",
|
|
}
|
|
startswith_filter: Filter = {
|
|
"filter_id": "STARTSWITH",
|
|
"filter_display": "Starts with",
|
|
"filter_string": "__startswith",
|
|
}
|
|
endswith_filter: Filter = {
|
|
"filter_id": "ENDSWITH",
|
|
"filter_display": "Ends with",
|
|
"filter_string": "__endswith",
|
|
}
|
|
gt_filter: Filter = {
|
|
"filter_id": "GT",
|
|
"filter_display": "Greater than",
|
|
"filter_string": "__gt",
|
|
}
|
|
lt_filter: Filter = {
|
|
"filter_id": "LT",
|
|
"filter_display": "Lesser than",
|
|
"filter_string": "__lt",
|
|
}
|
|
year_gt_filter: Filter = {
|
|
"filter_id": "YEARGT",
|
|
"filter_display": "Greater than",
|
|
"filter_string": "__year__gt",
|
|
}
|
|
year_lt_filter: Filter = {
|
|
"filter_id": "YEARLT",
|
|
"filter_display": "Lesser than",
|
|
"filter_string": "__year__lt",
|
|
}
|
|
year_exact_filter: Filter = {
|
|
"filter_id": "YEAREXACT",
|
|
"filter_display": "Equals (case-sensitive)",
|
|
"filter_string": "__year__exact",
|
|
}
|
|
|
|
defined_filters = [
|
|
iexact_filter,
|
|
exact_filter,
|
|
isnull_filter,
|
|
contains_filter,
|
|
startswith_filter,
|
|
endswith_filter,
|
|
gt_filter,
|
|
lt_filter,
|
|
year_gt_filter,
|
|
year_lt_filter,
|
|
year_exact_filter,
|
|
]
|
|
|
|
defined_filters_list = {list["filter_id"]: list for list in defined_filters}
|
|
|
|
char_filter: FilterList = [
|
|
iexact_filter,
|
|
isnull_filter,
|
|
contains_filter,
|
|
startswith_filter,
|
|
endswith_filter,
|
|
]
|
|
text_filter: FilterList = [
|
|
isnull_filter,
|
|
contains_filter,
|
|
]
|
|
num_filter: FilterList = [exact_filter, gt_filter, lt_filter]
|
|
date_filter: FilterList = [
|
|
year_exact_filter,
|
|
isnull_filter,
|
|
year_gt_filter,
|
|
year_lt_filter,
|
|
]
|
|
|
|
conditions = ["and", "or"]
|
|
session_filters: ModelFilterSet = [
|
|
{"name": char_filter},
|
|
{"timestamp_start": date_filter},
|
|
{"timestamp_end": date_filter},
|
|
{"duration_manual": num_filter},
|
|
{"duration_calculated": num_filter},
|
|
{"note": text_filter},
|
|
{"device": char_filter},
|
|
{"created_at": date_filter},
|
|
{"modified_at": date_filter},
|
|
]
|
|
name_contains_age: FieldFilter = {
|
|
"filtered_field": "name",
|
|
"filtered_value": "age",
|
|
"filter": contains_filter,
|
|
}
|
|
simple_example_filter: FilterSet = [name_contains_age]
|
|
timestamp_start_year_2024: FieldFilter = {
|
|
"filtered_field": "timestamp_start",
|
|
"filtered_value": "2024",
|
|
"filter": year_exact_filter,
|
|
}
|
|
physical_only: FieldFilter = {
|
|
"filtered_field": "purchase__ownership_type",
|
|
"filtered_value": "ph",
|
|
"filter": exact_filter,
|
|
}
|
|
|
|
|
|
def negate_filter(filter: FieldFilter) -> FieldFilter:
|
|
return {**filter, "negated": True}
|
|
|
|
|
|
without_physical: FieldFilter = negate_filter(physical_only)
|
|
combined_example_filter: FilterSet = [name_contains_age, timestamp_start_year_2024]
|
|
combined_with_negated_example_filter = [timestamp_start_year_2024, without_physical]
|
|
|
|
|
|
def string_to_dict(s: str) -> dict[str, str]:
|
|
key, value = s.split("=")
|
|
return {key: value}
|
|
|
|
|
|
def create_django_filter_dict(
|
|
filter: Filter, field: str, filtered_value: str
|
|
) -> dict[str, str]:
|
|
"""
|
|
Creates a dict that can be used with the Django
|
|
filter function by unpacking it:
|
|
Model.objects.filter(**return_value)
|
|
"""
|
|
if not is_filter(filter):
|
|
raise ValueError("filter is not of type Filter")
|
|
return {f"{field}{filter["filter_string"]}": filtered_value}
|
|
|
|
|
|
def join_filter_with_condition(filters: FilterSet, condition: str):
|
|
if not is_filterset(filters):
|
|
raise ValueError("filters is not FilterSet")
|
|
conditions = {"AND": operator.and_, "OR": operator.or_, "XOR": operator.xor}
|
|
condition = condition.upper()
|
|
if condition not in conditions:
|
|
raise ValueError(f"Condition '{condition}' not one of '{conditions.keys()}'.")
|
|
q_objects: list[Q] = []
|
|
for filter_item in filters:
|
|
q = Q(
|
|
**create_django_filter_dict(
|
|
filter_item["filter"],
|
|
filter_item["filtered_field"],
|
|
filter_item["filtered_value"],
|
|
)
|
|
)
|
|
if filter_item.get("negated", False):
|
|
q = ~q
|
|
q_objects.append(q)
|
|
return reduce(conditions[condition], q_objects)
|
|
|
|
|
|
def apply_filters(
|
|
filters: FilterSet,
|
|
queryset: QuerySet[Any],
|
|
) -> QuerySet[Any] | None:
|
|
if len(filters) == 0:
|
|
return queryset
|
|
if type(filters) is not list:
|
|
raise ValueError("filters argument not of type list")
|
|
# TODO: modify FilterSet so it includes the condition to use
|
|
# so we can remove the hard-coding of "AND" here
|
|
return queryset.filter(join_filter_with_condition(filters, "AND"))
|
|
|
|
|
|
def filters_to_string(filters: FilterSet) -> str:
|
|
constructed_filters: list[dict[str, str | bool]] = []
|
|
for filter in filters:
|
|
constructed_filters.append(
|
|
{
|
|
"id": filter["filter"]["filter_id"],
|
|
"field": filter["filtered_field"],
|
|
"value": filter["filtered_value"],
|
|
"negated": filter.get("negated", False),
|
|
}
|
|
)
|
|
return json_dumps(constructed_filters)
|
|
|
|
|
|
def string_to_filters(filter_string: str) -> FilterSet:
|
|
obj = json_loads(filter_string)
|
|
filters = [
|
|
{
|
|
"filter": defined_filters_list[item["id"]],
|
|
"filtered_field": item["field"],
|
|
"filtered_value": item["value"],
|
|
"negated": item.get("negated", False),
|
|
}
|
|
for item in obj
|
|
]
|
|
if not is_filterset(filters):
|
|
raise ValueError("filters is not of type FilterSet")
|
|
return filters
|
|
|
|
|
|
@login_required
|
|
def list_sessions(request: HttpRequest) -> HttpResponse:
|
|
context: dict[Any, Any] = {}
|
|
page_number = request.GET.get("page", 1)
|
|
limit = request.GET.get("limit", 10)
|
|
filters = request.GET.get("filters", "")
|
|
sessions = Session.objects.order_by("-timestamp_start")
|
|
if filters != "":
|
|
filter_obj = string_to_filters(filters)
|
|
sessions = apply_filters(filter_obj, queryset=sessions)
|
|
page_obj = None
|
|
if int(limit) != 0:
|
|
paginator = Paginator(sessions, limit)
|
|
page_obj = paginator.get_page(page_number)
|
|
sessions = page_obj.object_list
|
|
|
|
context = {
|
|
"title": "Manage sessions",
|
|
"page_obj": page_obj or None,
|
|
"elided_page_range": (
|
|
page_obj.paginator.get_elided_page_range(
|
|
page_number, on_each_side=1, on_ends=1
|
|
)
|
|
if page_obj
|
|
else None
|
|
),
|
|
"data": {
|
|
"header_action": A([], Button([], "Add session"), url="add_session"),
|
|
"columns": [
|
|
"Name",
|
|
"Date",
|
|
"Duration",
|
|
"Duration (manual)",
|
|
"Device",
|
|
"Created",
|
|
"Actions",
|
|
],
|
|
"rows": [
|
|
[
|
|
A(
|
|
children=truncate_with_popover(session.purchase.edition.name),
|
|
url=reverse(
|
|
"view_game",
|
|
args=[session.purchase.edition.game.pk],
|
|
),
|
|
),
|
|
f"{session.timestamp_start.strftime(datetimeformat)}{f" — {session.timestamp_end.strftime(timeformat)}" if session.timestamp_end else ""}",
|
|
(
|
|
format_duration(session.duration_calculated, durationformat)
|
|
if session.duration_calculated
|
|
else "-"
|
|
),
|
|
(
|
|
format_duration(session.duration_manual, durationformat_manual)
|
|
if session.duration_manual
|
|
else "-"
|
|
),
|
|
session.device,
|
|
session.created_at.strftime(dateformat),
|
|
render_to_string(
|
|
"cotton/button_group_sm.html",
|
|
{
|
|
"buttons": [
|
|
{
|
|
"href": reverse("edit_session", args=[session.pk]),
|
|
"text": "Edit",
|
|
"color": "gray",
|
|
},
|
|
{
|
|
"href": reverse(
|
|
"delete_session", args=[session.pk]
|
|
),
|
|
"text": "Delete",
|
|
"color": "red",
|
|
},
|
|
]
|
|
},
|
|
),
|
|
]
|
|
for session in sessions
|
|
],
|
|
},
|
|
}
|
|
return render(request, "list_purchases.html", context)
|
|
|
|
|
|
@login_required
|
|
def add_session(request: HttpRequest, purchase_id: int = 0) -> HttpResponse:
|
|
context = {}
|
|
initial: dict[str, Any] = {"timestamp_start": timezone.now()}
|
|
|
|
last = Session.objects.last()
|
|
if last != None:
|
|
initial["purchase"] = last.purchase
|
|
|
|
if request.method == "POST":
|
|
form = SessionForm(request.POST or None, initial=initial)
|
|
if form.is_valid():
|
|
form.save()
|
|
return redirect("list_sessions")
|
|
else:
|
|
if purchase_id:
|
|
purchase = Purchase.objects.get(id=purchase_id)
|
|
form = SessionForm(
|
|
initial={
|
|
**initial,
|
|
"purchase": purchase,
|
|
}
|
|
)
|
|
else:
|
|
form = SessionForm(initial=initial)
|
|
|
|
context["title"] = "Add New Session"
|
|
context["form"] = form
|
|
return render(request, "add_session.html", context)
|
|
|
|
|
|
@login_required
|
|
@use_custom_redirect
|
|
def edit_session(request: HttpRequest, session_id: int) -> HttpResponse:
|
|
context = {}
|
|
session = get_object_or_404(Session, id=session_id)
|
|
form = SessionForm(request.POST or None, instance=session)
|
|
if form.is_valid():
|
|
form.save()
|
|
return redirect("list_sessions")
|
|
context["title"] = "Edit Session"
|
|
context["form"] = form
|
|
return render(request, "add_session.html", context)
|
|
|
|
|
|
def clone_session_by_id(session_id: int) -> Session:
|
|
session = get_object_or_404(Session, id=session_id)
|
|
clone = session
|
|
clone.pk = None
|
|
clone.timestamp_start = timezone.now()
|
|
clone.timestamp_end = None
|
|
clone.note = ""
|
|
clone.save()
|
|
return clone
|
|
|
|
|
|
@login_required
|
|
@use_custom_redirect
|
|
def new_session_from_existing_session(
|
|
request: HttpRequest, session_id: int, template: str = ""
|
|
) -> HttpResponse:
|
|
session = clone_session_by_id(session_id)
|
|
if request.htmx:
|
|
context = {
|
|
"session": session,
|
|
"session_count": int(request.GET.get("session_count", 0)) + 1,
|
|
}
|
|
return render(request, template, context)
|
|
return redirect("list_sessions")
|
|
|
|
|
|
@login_required
|
|
@use_custom_redirect
|
|
def end_session(
|
|
request: HttpRequest, session_id: int, template: str = ""
|
|
) -> HttpResponse:
|
|
session = get_object_or_404(Session, id=session_id)
|
|
session.timestamp_end = timezone.now()
|
|
session.save()
|
|
if request.htmx:
|
|
context = {
|
|
"session": session,
|
|
"session_count": request.GET.get("session_count", 0),
|
|
}
|
|
return render(request, template, context)
|
|
return redirect("list_sessions")
|
|
|
|
|
|
@login_required
|
|
def delete_session(request: HttpRequest, session_id: int = 0) -> HttpResponse:
|
|
session = get_object_or_404(Session, id=session_id)
|
|
session.delete()
|
|
return redirect("list_sessions")
|