510 lines
18 KiB
Python
510 lines
18 KiB
Python
"""
|
|
Typed criterion inputs for building structured filters.
|
|
|
|
Inspired by Stash's filter architecture: every filterable field uses a typed
|
|
criterion with a value and a CriterionModifier. This separates *what* you're
|
|
filtering from *how* you're comparing, and makes filter serialization trivial.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass, field, fields as dc_fields
|
|
from enum import Enum
|
|
from typing import Any, Self, TypeVar
|
|
|
|
from django.db.models import Q
|
|
|
|
# ── Modifier ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
class Modifier(str, Enum):
|
|
"""Comparison operators shared across all criterion types."""
|
|
|
|
EQUALS = "EQUALS"
|
|
NOT_EQUALS = "NOT_EQUALS"
|
|
GREATER_THAN = "GREATER_THAN"
|
|
LESS_THAN = "LESS_THAN"
|
|
BETWEEN = "BETWEEN"
|
|
NOT_BETWEEN = "NOT_BETWEEN"
|
|
INCLUDES = "INCLUDES"
|
|
EXCLUDES = "EXCLUDES"
|
|
INCLUDES_ALL = "INCLUDES_ALL"
|
|
IS_NULL = "IS_NULL"
|
|
NOT_NULL = "NOT_NULL"
|
|
MATCHES_REGEX = "MATCHES_REGEX"
|
|
NOT_MATCHES_REGEX = "NOT_MATCHES_REGEX"
|
|
|
|
@classmethod
|
|
def for_strings(cls) -> list[Self]:
|
|
return [
|
|
cls.EQUALS,
|
|
cls.NOT_EQUALS,
|
|
cls.INCLUDES,
|
|
cls.EXCLUDES,
|
|
cls.MATCHES_REGEX,
|
|
cls.NOT_MATCHES_REGEX,
|
|
cls.IS_NULL,
|
|
cls.NOT_NULL,
|
|
]
|
|
|
|
@classmethod
|
|
def for_numbers(cls) -> list[Self]:
|
|
return [
|
|
cls.EQUALS,
|
|
cls.NOT_EQUALS,
|
|
cls.GREATER_THAN,
|
|
cls.LESS_THAN,
|
|
cls.BETWEEN,
|
|
cls.NOT_BETWEEN,
|
|
cls.IS_NULL,
|
|
cls.NOT_NULL,
|
|
]
|
|
|
|
@classmethod
|
|
def for_dates(cls) -> list[Self]:
|
|
return cls.for_numbers()
|
|
|
|
@classmethod
|
|
def for_multi(cls) -> list[Self]:
|
|
return [
|
|
cls.INCLUDES,
|
|
cls.EXCLUDES,
|
|
cls.INCLUDES_ALL,
|
|
cls.IS_NULL,
|
|
cls.NOT_NULL,
|
|
]
|
|
|
|
|
|
# ── Base criterion ─────────────────────────────────────────────────────────
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
@dataclass
|
|
class _Criterion:
|
|
"""Base for all typed criteria."""
|
|
|
|
value: Any = None
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def from_json(cls, data: dict | None) -> Self | None:
|
|
if data is None or not isinstance(data, dict):
|
|
return None
|
|
kwargs: dict[str, Any] = {}
|
|
for f in dc_fields(cls):
|
|
if f.name in data:
|
|
val = data[f.name]
|
|
# Coerce string modifier to Modifier enum
|
|
if f.name == "modifier" and isinstance(val, str):
|
|
val = Modifier(val)
|
|
kwargs[f.name] = val
|
|
return cls(**kwargs)
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
result: dict[str, Any] = {}
|
|
for f in dc_fields(self):
|
|
v = getattr(self, f.name)
|
|
if v is not None and v != f.default:
|
|
result[f.name] = v
|
|
return result
|
|
|
|
|
|
# ── Concrete criteria ──────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class StringCriterion(_Criterion):
|
|
value: str = ""
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
m = self.modifier
|
|
if m == Modifier.EQUALS:
|
|
return Q(**{field_name: self.value})
|
|
if m == Modifier.NOT_EQUALS:
|
|
return ~Q(**{field_name: self.value})
|
|
if m == Modifier.INCLUDES:
|
|
return Q(**{f"{field_name}__icontains": self.value})
|
|
if m == Modifier.EXCLUDES:
|
|
return ~Q(**{f"{field_name}__icontains": self.value})
|
|
if m == Modifier.MATCHES_REGEX:
|
|
return Q(**{f"{field_name}__regex": self.value})
|
|
if m == Modifier.NOT_MATCHES_REGEX:
|
|
return ~Q(**{f"{field_name}__regex": self.value})
|
|
if m == Modifier.IS_NULL:
|
|
return Q(**{f"{field_name}__isnull": True})
|
|
if m == Modifier.NOT_NULL:
|
|
return Q(**{f"{field_name}__isnull": False})
|
|
raise ValueError(f"Unsupported modifier {m} for string field")
|
|
|
|
|
|
@dataclass
|
|
class IntCriterion(_Criterion):
|
|
value: int = 0
|
|
value2: int | None = None
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
m = self.modifier
|
|
if m == Modifier.EQUALS:
|
|
return Q(**{field_name: self.value})
|
|
if m == Modifier.NOT_EQUALS:
|
|
return ~Q(**{field_name: self.value})
|
|
if m == Modifier.GREATER_THAN:
|
|
return Q(**{f"{field_name}__gt": self.value})
|
|
if m == Modifier.LESS_THAN:
|
|
return Q(**{f"{field_name}__lt": self.value})
|
|
if m == Modifier.BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("BETWEEN requires value2")
|
|
return Q(
|
|
**{
|
|
f"{field_name}__gte": min(self.value, self.value2),
|
|
f"{field_name}__lte": max(self.value, self.value2),
|
|
}
|
|
)
|
|
if m == Modifier.NOT_BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("NOT_BETWEEN requires value2")
|
|
lo, hi = min(self.value, self.value2), max(self.value, self.value2)
|
|
return Q(**{f"{field_name}__lt": lo}) | Q(**{f"{field_name}__gt": hi})
|
|
if m == Modifier.IS_NULL:
|
|
return Q(**{f"{field_name}__isnull": True})
|
|
if m == Modifier.NOT_NULL:
|
|
return Q(**{f"{field_name}__isnull": False})
|
|
raise ValueError(f"Unsupported modifier {m} for int field")
|
|
|
|
|
|
@dataclass
|
|
class FloatCriterion(_Criterion):
|
|
value: float = 0.0
|
|
value2: float | None = None
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
m = self.modifier
|
|
if m == Modifier.EQUALS:
|
|
return Q(**{field_name: self.value})
|
|
if m == Modifier.NOT_EQUALS:
|
|
return ~Q(**{field_name: self.value})
|
|
if m == Modifier.GREATER_THAN:
|
|
return Q(**{f"{field_name}__gt": self.value})
|
|
if m == Modifier.LESS_THAN:
|
|
return Q(**{f"{field_name}__lt": self.value})
|
|
if m == Modifier.BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("BETWEEN requires value2")
|
|
return Q(
|
|
**{
|
|
f"{field_name}__gte": min(self.value, self.value2),
|
|
f"{field_name}__lte": max(self.value, self.value2),
|
|
}
|
|
)
|
|
if m == Modifier.NOT_BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("NOT_BETWEEN requires value2")
|
|
lo, hi = min(self.value, self.value2), max(self.value, self.value2)
|
|
return Q(**{f"{field_name}__lt": lo}) | Q(**{f"{field_name}__gt": hi})
|
|
if m == Modifier.IS_NULL:
|
|
return Q(**{f"{field_name}__isnull": True})
|
|
if m == Modifier.NOT_NULL:
|
|
return Q(**{f"{field_name}__isnull": False})
|
|
raise ValueError(f"Unsupported modifier {m} for float field")
|
|
|
|
|
|
@dataclass
|
|
class DateCriterion(_Criterion):
|
|
value: str = ""
|
|
value2: str | None = None
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
m = self.modifier
|
|
if m == Modifier.EQUALS:
|
|
return Q(**{field_name: self.value})
|
|
if m == Modifier.NOT_EQUALS:
|
|
return ~Q(**{field_name: self.value})
|
|
if m == Modifier.GREATER_THAN:
|
|
return Q(**{f"{field_name}__gt": self.value})
|
|
if m == Modifier.LESS_THAN:
|
|
return Q(**{f"{field_name}__lt": self.value})
|
|
if m == Modifier.BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("BETWEEN requires value2")
|
|
return Q(
|
|
**{f"{field_name}__gte": self.value, f"{field_name}__lte": self.value2}
|
|
)
|
|
if m == Modifier.NOT_BETWEEN:
|
|
if self.value2 is None:
|
|
raise ValueError("NOT_BETWEEN requires value2")
|
|
return Q(**{f"{field_name}__lt": self.value}) | Q(
|
|
**{f"{field_name}__gt": self.value2}
|
|
)
|
|
if m == Modifier.IS_NULL:
|
|
return Q(**{f"{field_name}__isnull": True})
|
|
if m == Modifier.NOT_NULL:
|
|
return Q(**{f"{field_name}__isnull": False})
|
|
raise ValueError(f"Unsupported modifier {m} for date field")
|
|
|
|
|
|
@dataclass
|
|
class BoolCriterion(_Criterion):
|
|
value: bool = False
|
|
# Bool only makes sense with EQUALS
|
|
modifier: Modifier = Modifier.EQUALS
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
if self.modifier == Modifier.EQUALS:
|
|
return Q(**{field_name: self.value})
|
|
if self.modifier == Modifier.NOT_EQUALS:
|
|
return ~Q(**{field_name: self.value})
|
|
raise ValueError(f"Unsupported modifier {self.modifier} for bool field")
|
|
|
|
|
|
@dataclass
|
|
class _SetCriterion(_Criterion):
|
|
"""Shared base for set-membership criteria (``MultiCriterion`` /
|
|
``ChoiceCriterion``).
|
|
|
|
Two orthogonal channels, mirroring Stash's modifier model:
|
|
|
|
- ``value`` is the *include* set. The ``modifier`` governs how it matches:
|
|
|
|
- ``INCLUDES`` — in ``value`` (match *any*); ``EQUALS`` is an alias.
|
|
- ``INCLUDES_ALL`` — related to *all* of ``value`` (meaningful for
|
|
many-to-many fields, e.g. a purchase's games).
|
|
- ``EXCLUDES`` — in none of ``value`` (match *none*); ``NOT_EQUALS`` is an
|
|
alias.
|
|
|
|
- ``excludes`` is an *always-orthogonal* negative: it contributes
|
|
``AND NOT IN (excludes)`` for every (non-presence) modifier, never
|
|
swapped into the include set. An exclude-only criterion therefore means
|
|
"everything except ``excludes``".
|
|
|
|
Empty lists contribute no constraint. ``IS_NULL`` / ``NOT_NULL`` test
|
|
presence and ignore both lists.
|
|
|
|
The logic lives entirely here so the two subclasses (which differ only in
|
|
their value type) cannot drift.
|
|
"""
|
|
|
|
value: list = field(default_factory=list)
|
|
excludes: list = field(default_factory=list)
|
|
modifier: Modifier = Modifier.INCLUDES
|
|
|
|
def to_q(self, field_name: str) -> Q:
|
|
modifier = self.modifier
|
|
if modifier == Modifier.IS_NULL:
|
|
return Q(**{f"{field_name}__isnull": True})
|
|
if modifier == Modifier.NOT_NULL:
|
|
return Q(**{f"{field_name}__isnull": False})
|
|
# The modifier governs only the include set; ``excludes`` is an orthogonal
|
|
# AND'd negative applied for every (non-presence) modifier.
|
|
q = self._value_q(field_name)
|
|
if self.excludes:
|
|
q &= ~Q(**{f"{field_name}__in": self.excludes})
|
|
return q
|
|
|
|
def _value_q(self, field_name: str) -> Q:
|
|
"""Build the Q for the include (``value``) set, per the modifier."""
|
|
modifier = self.modifier
|
|
if modifier in (Modifier.INCLUDES, Modifier.EQUALS):
|
|
return Q(**{f"{field_name}__in": self.value}) if self.value else Q()
|
|
if modifier in (Modifier.EXCLUDES, Modifier.NOT_EQUALS):
|
|
return ~Q(**{f"{field_name}__in": self.value}) if self.value else Q()
|
|
if modifier == Modifier.INCLUDES_ALL:
|
|
# INCLUDES_ALL ("related to all of these") is only meaningful for
|
|
# many-to-many fields. A naive Q(field=a) & Q(field=b) collapses
|
|
# to a single join requiring one through-row to equal both values
|
|
# (impossible), so the generic criterion layer cannot build a
|
|
# correct Q. M2M callers must supply their own Q builder at the
|
|
# filter level — see PurchaseFilter._games_to_q for the subquery
|
|
# pattern (chained .filter() calls + pk__in).
|
|
assert False, (
|
|
"INCLUDES_ALL requires a filter-level Q builder for M2M fields. "
|
|
"See PurchaseFilter._games_to_q for the chained-subquery pattern."
|
|
)
|
|
raise ValueError(f"Unsupported modifier {modifier} for {type(self).__name__}")
|
|
|
|
@classmethod
|
|
def from_json(cls, data: dict | None) -> Self | None:
|
|
result = super().from_json(data)
|
|
if result is None:
|
|
return None
|
|
# Labels embedded as {id, label} dicts are display-only; strip to bare ids
|
|
# so the querying layer stays clean and typed.
|
|
result.value = [
|
|
item["id"] if isinstance(item, dict) else item for item in result.value
|
|
]
|
|
result.excludes = [
|
|
item["id"] if isinstance(item, dict) else item for item in result.excludes
|
|
]
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class MultiCriterion(_SetCriterion):
|
|
"""Filter on a many-to-many or ForeignKey relationship by ID list.
|
|
|
|
All modifier logic (including ``INCLUDES_ALL`` and ``EXCLUDES``) lives in
|
|
``_SetCriterion``; this subclass only refines the value type.
|
|
"""
|
|
|
|
value: list[int] = field(default_factory=list)
|
|
excludes: list[int] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ChoiceCriterion(_SetCriterion):
|
|
"""Filter on a choice/enum field with multi-select include/exclude.
|
|
|
|
Used by FilterSelect widgets for status, ownership_type, etc. Shares all
|
|
modifier logic with ``MultiCriterion`` via ``_SetCriterion``.
|
|
"""
|
|
|
|
value: list[str] = field(default_factory=list)
|
|
excludes: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ── OperatorFilter base ────────────────────────────────────────────────────
|
|
|
|
F = TypeVar("F", bound="OperatorFilter")
|
|
|
|
|
|
@dataclass
|
|
class OperatorFilter:
|
|
"""Mixin providing AND/OR/NOT composition for entity filter types.
|
|
|
|
Subclasses should declare nullable references to themselves::
|
|
|
|
@dataclass
|
|
class GameFilter(OperatorFilter):
|
|
AND: "GameFilter | None" = None
|
|
OR: "GameFilter | None" = None
|
|
NOT: "GameFilter | None" = None
|
|
name: StringCriterion | None = None
|
|
...
|
|
"""
|
|
|
|
def sub_filter(self) -> OperatorFilter | None:
|
|
"""Return the first non-None of AND / OR / NOT."""
|
|
for attr in ("AND", "OR", "NOT"):
|
|
if hasattr(self, attr):
|
|
v = getattr(self, attr)
|
|
if v is not None:
|
|
return v
|
|
return None
|
|
|
|
def _criterion_fields(self) -> list[str]:
|
|
"""Return field names that hold a _Criterion instance."""
|
|
names: list[str] = []
|
|
for f in dc_fields(self):
|
|
if f.name in ("AND", "OR", "NOT"):
|
|
continue
|
|
v = getattr(self, f.name)
|
|
if isinstance(v, _Criterion):
|
|
names.append(f.name)
|
|
return names
|
|
|
|
def to_q(self) -> Q:
|
|
"""Build a Django Q object from this filter and its sub-filters."""
|
|
q = Q()
|
|
for field_name in self._criterion_fields():
|
|
c = getattr(self, field_name)
|
|
if c is not None:
|
|
q &= c.to_q(field_name)
|
|
sub = self.sub_filter()
|
|
if sub is not None:
|
|
if getattr(self, "AND", None) is not None:
|
|
q &= sub.to_q()
|
|
elif getattr(self, "OR", None) is not None:
|
|
q |= sub.to_q()
|
|
elif getattr(self, "NOT", None) is not None:
|
|
q &= ~sub.to_q()
|
|
return q
|
|
|
|
@classmethod
|
|
def from_json(cls, data: dict[str, Any] | None) -> Self | None:
|
|
if data is None or not isinstance(data, dict):
|
|
return None
|
|
# Resolve criterion class names to actual types
|
|
criterion_types: dict[str, type[_Criterion]] = {
|
|
"StringCriterion": StringCriterion,
|
|
"IntCriterion": IntCriterion,
|
|
"FloatCriterion": FloatCriterion,
|
|
"DateCriterion": DateCriterion,
|
|
"BoolCriterion": BoolCriterion,
|
|
"MultiCriterion": MultiCriterion,
|
|
"ChoiceCriterion": ChoiceCriterion,
|
|
}
|
|
kwargs: dict[str, Any] = {}
|
|
for f in dc_fields(cls):
|
|
if f.name not in data:
|
|
continue
|
|
raw = data[f.name]
|
|
if raw is None:
|
|
kwargs[f.name] = None
|
|
continue
|
|
# Recurse into sub-filters (AND / OR / NOT)
|
|
if f.name in ("AND", "OR", "NOT"):
|
|
kwargs[f.name] = cls.from_json(raw) if isinstance(raw, dict) else None
|
|
continue
|
|
# Resolve criterion fields from string type annotation
|
|
f_type = f.type
|
|
if isinstance(f_type, str):
|
|
# e.g. "StringCriterion | None" → "StringCriterion"
|
|
f_type = f_type.split("|")[0].strip()
|
|
if isinstance(f_type, str) and f_type in criterion_types:
|
|
criterion_cls = criterion_types[f_type]
|
|
kwargs[f.name] = (
|
|
criterion_cls.from_json(raw) if isinstance(raw, dict) else None
|
|
)
|
|
elif isinstance(f_type, type) and issubclass(f_type, _Criterion):
|
|
kwargs[f.name] = (
|
|
f_type.from_json(raw) if isinstance(raw, dict) else None
|
|
)
|
|
return cls(**kwargs)
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
result: dict[str, Any] = {}
|
|
for f in dc_fields(self):
|
|
v = getattr(self, f.name)
|
|
if v is None:
|
|
continue
|
|
if f.name in ("AND", "OR", "NOT"):
|
|
result[f.name] = v.to_json()
|
|
elif isinstance(v, _Criterion):
|
|
j = v.to_json()
|
|
if j:
|
|
result[f.name] = j
|
|
return result
|
|
|
|
|
|
# ── JSON helpers ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def filter_from_json(cls: type[F], json_str: str) -> F | None:
|
|
"""Deserialize a filter from a JSON string.
|
|
|
|
Usage:
|
|
f = filter_from_json(GameFilter, request.GET.get("filter", ""))
|
|
games = Game.objects.filter(f.to_q())
|
|
"""
|
|
if not json_str:
|
|
return None
|
|
try:
|
|
data = json.loads(json_str)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return cls.from_json(data)
|
|
|
|
|
|
def filter_to_json(f: OperatorFilter) -> str:
|
|
"""Serialize a filter to a JSON string for URL params or storage."""
|
|
return json.dumps(f.to_json())
|