From 5eee7176d44a39c0213da2fb9752a1c6a1ea2afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kucharczyk?= Date: Wed, 4 Sep 2024 21:58:56 +0200 Subject: [PATCH] add streak-releted basic functionality --- common/time.py | 81 +++++++++++++++++++++++++++++++++++++++++++- common/utils.py | 23 ++++++++++++- streak_bruteforce.py | 27 +++++++++++++++ tests/test_streak.py | 60 ++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 streak_bruteforce.py create mode 100644 tests/test_streak.py diff --git a/common/time.py b/common/time.py index d35d41a..7cf768a 100644 --- a/common/time.py +++ b/common/time.py @@ -1,8 +1,10 @@ import re -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from django.utils import timezone +from common.utils import generate_split_ranges + dateformat: str = "%d/%m/%Y" datetimeformat: str = "%d/%m/%Y %H:%M" timeformat: str = "%H:%M" @@ -84,3 +86,80 @@ def local_strftime(datetime: datetime, format: str = datetimeformat) -> str: return timezone.localtime(datetime).strftime(format) +def daterange(start: date, end: date, end_inclusive: bool = False) -> list[date]: + time_between: timedelta = end - start + if (days_between := time_between.days) < 1: + raise ValueError("start and end have to be at least 1 day apart.") + if end_inclusive: + print(f"{end_inclusive=}") + print(f"{days_between=}") + days_between += 1 + print(f"{days_between=}") + return [start + timedelta(x) for x in range(days_between)] + + +def streak(datelist: list[date]) -> dict[str, int | tuple[date, date]]: + if len(datelist) == 1: + return {"days": 1, "dates": (datelist[0], datelist[0])} + else: + print(f"Processing {len(datelist)} dates.") + missing = sorted( + set( + datelist[0] + timedelta(x) + for x in range((datelist[-1] - datelist[0]).days) + ) + - set(datelist) + ) + print(f"{len(missing)} days missing.") + datelist_with_missing = sorted(datelist + missing) + ranges = list(generate_split_ranges(datelist_with_missing, missing)) + print(f"{len(ranges)} ranges calculated.") + longest_consecutive_days = timedelta(0) + longest_range: tuple[date, date] = (date(1970, 1, 1), date(1970, 1, 1)) + for start, end in ranges: + if (current_streak := end - start) > longest_consecutive_days: + longest_consecutive_days = current_streak + longest_range = (start, end) + return {"days": longest_consecutive_days.days + 1, "dates": longest_range} + + +def streak_bruteforce(datelist: list[date]) -> dict[str, int | tuple[date, date]]: + if (datelist_length := len(datelist)) == 0: + raise ValueError("Number of dates in the list is 0.") + datelist.sort() + current_streak = 1 + current_start = datelist[0] + current_end = datelist[0] + current_date = datelist[0] + highest_streak = 1 + highest_streak_daterange = (current_start, current_end) + + def update_highest_streak(): + nonlocal highest_streak, highest_streak_daterange + if current_streak > highest_streak: + highest_streak = current_streak + highest_streak_daterange = (current_start, current_end) + + def reset_streak(): + nonlocal current_start, current_end, current_streak + current_start = current_end = current_date + current_streak = 1 + + def increment_streak(): + nonlocal current_end, current_streak + current_end = current_date + current_streak += 1 + + for i, datelist_item in enumerate(datelist, start=1): + current_date = datelist_item + if current_date == current_start or current_date == current_end: + continue + if current_date - timedelta(1) != current_end and i != datelist_length: + update_highest_streak() + reset_streak() + elif current_date - timedelta(1) == current_end and i == datelist_length: + increment_streak() + update_highest_streak() + else: + increment_streak() + return {"days": highest_streak, "dates": highest_streak_daterange} diff --git a/common/utils.py b/common/utils.py index ea2ea39..8695ebd 100644 --- a/common/utils.py +++ b/common/utils.py @@ -1,6 +1,7 @@ +from datetime import date from random import choices from string import ascii_lowercase -from typing import Any, Callable +from typing import Any, Callable, Generator, TypeVar from django.template.loader import render_to_string from django.urls import NoReverseMatch, reverse @@ -145,3 +146,23 @@ def truncate_with_popover(input_string: str) -> str: def randomid(seed: str = "", length: int = 10) -> str: return seed + "".join(choices(ascii_lowercase, k=length)) + + +T = TypeVar("T", str, int, date) + + +def generate_split_ranges( + value_list: list[T], split_points: list[T] +) -> Generator[tuple[T, T], None, None]: + for x in range(0, len(split_points) + 1): + if x == 0: + start = 0 + elif x >= len(split_points): + start = value_list.index(split_points[x - 1]) + 1 + else: + start = value_list.index(split_points[x - 1]) + 1 + try: + end = value_list.index(split_points[x]) + except IndexError: + end = len(value_list) + yield (value_list[start], value_list[end - 1]) diff --git a/streak_bruteforce.py b/streak_bruteforce.py new file mode 100644 index 0000000..5257808 --- /dev/null +++ b/streak_bruteforce.py @@ -0,0 +1,27 @@ +import os +import time + +import django + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "timetracker.settings") +django.setup() + +from common.time import streak_bruteforce +from games.models import Session + +all = Session.objects.filter(timestamp_start__gt="1970-01-01") + +data = [] + +for session in all: + current = session.timestamp_start + data.append(current.date()) + +start = time.time_ns() +start_cpu = time.process_time_ns() +print(streak_bruteforce(data)) +end = time.time_ns() +end_cpu = time.process_time_ns() +print( + f"Processed {all.count()} items in {((end - start)/ 1_000_000_000):.10f} seconds and {((end_cpu - start_cpu)/ 1_000_000_000):.10f} seconds of process time." +) diff --git a/tests/test_streak.py b/tests/test_streak.py new file mode 100644 index 0000000..8162798 --- /dev/null +++ b/tests/test_streak.py @@ -0,0 +1,60 @@ +import unittest +from datetime import date + +from common.time import daterange, streak_bruteforce + + +class StreakTest(unittest.TestCase): + streak = streak_bruteforce + + def test_daterange_exclusive(self): + d = daterange(date(2024, 8, 1), date(2024, 8, 3)) + self.assertEqual( + d, + [date(2024, 8, 1), date(2024, 8, 2)], + ) + + def test_daterange_inclusive(self): + d = daterange(date(2024, 8, 1), date(2024, 8, 3), end_inclusive=True) + self.assertEqual( + d, + [date(2024, 8, 1), date(2024, 8, 2), date(2024, 8, 3)], + ) + + def test_1day_streak(self): + self.assertEqual(streak([date(2024, 8, 1)])["days"], 1) + + def test_2day_streak(self): + self.assertEqual(streak([date(2024, 8, 1), date(2024, 8, 2)])["days"], 2) + + def test_31day_streak(self): + self.assertEqual( + streak(daterange(date(2024, 8, 1), date(2024, 8, 31), end_inclusive=True))[ + "days" + ], + 31, + ) + + def test_5day_streak_in_10_days(self): + d = daterange( + date(2024, 8, 1), date(2024, 8, 5), end_inclusive=True + ) + daterange(date(2024, 8, 7), date(2024, 8, 10), end_inclusive=True) + self.assertEqual(streak(d)["days"], 5) + + def test_10day_streak_in_31_days(self): + d = daterange(date(2024, 8, 1), date(2024, 8, 31), end_inclusive=True) + d.remove(date(2024, 8, 8)) + d.remove(date(2024, 8, 15)) + d.remove(date(2024, 8, 21)) + self.assertEqual(streak(d)["days"], 10) + + def test_10day_streak_in_31_days_with_consecutive_missing(self): + d = daterange(date(2024, 8, 1), date(2024, 8, 31), end_inclusive=True) + d.remove(date(2024, 8, 4)) + d.remove(date(2024, 8, 5)) + d.remove(date(2024, 8, 6)) + d.remove(date(2024, 8, 7)) + d.remove(date(2024, 8, 8)) + d.remove(date(2024, 8, 15)) + d.remove(date(2024, 8, 21)) + self.assertEqual(streak(d)["days"], 10)