forked from Python-roborock/python-roborock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
122 lines (91 loc) · 3.84 KB
/
util.py
File metadata and controls
122 lines (91 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import annotations
import asyncio
import datetime
import functools
import logging
from asyncio import AbstractEventLoop, TimerHandle
from collections.abc import Callable, Coroutine, MutableMapping
from typing import Any, TypeVar
from roborock import RoborockException
T = TypeVar("T")
DEFAULT_TIME_ZONE: datetime.tzinfo | None = datetime.datetime.now().astimezone().tzinfo
def unpack_list(value: list[T], size: int) -> list[T | None]:
return (value + [None] * size)[:size] # type: ignore
def get_running_loop_or_create_one() -> AbstractEventLoop:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def parse_datetime_to_roborock_datetime(
start_datetime: datetime.datetime, end_datetime: datetime.datetime
) -> tuple[datetime.datetime, datetime.datetime]:
now = datetime.datetime.now(DEFAULT_TIME_ZONE)
start_datetime = start_datetime.replace(
year=now.year, month=now.month, day=now.day, second=0, microsecond=0, tzinfo=DEFAULT_TIME_ZONE
)
end_datetime = end_datetime.replace(
year=now.year, month=now.month, day=now.day, second=0, microsecond=0, tzinfo=DEFAULT_TIME_ZONE
)
if start_datetime > end_datetime:
end_datetime += datetime.timedelta(days=1)
elif end_datetime < now:
start_datetime += datetime.timedelta(days=1)
end_datetime += datetime.timedelta(days=1)
return start_datetime, end_datetime
def parse_time_to_datetime(
start_time: datetime.time, end_time: datetime.time
) -> tuple[datetime.datetime, datetime.datetime]:
"""Help to handle time data."""
start_datetime = datetime.datetime.now(DEFAULT_TIME_ZONE).replace(
hour=start_time.hour, minute=start_time.minute, second=0, microsecond=0
)
end_datetime = datetime.datetime.now(DEFAULT_TIME_ZONE).replace(
hour=end_time.hour, minute=end_time.minute, second=0, microsecond=0
)
return parse_datetime_to_roborock_datetime(start_datetime, end_datetime)
def run_sync():
loop = get_running_loop_or_create_one()
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return loop.run_until_complete(func(*args, **kwargs))
return wrapped
return decorator
class RepeatableTask:
def __init__(self, callback: Callable[[], Coroutine], interval: int):
self.callback = callback
self.interval = interval
self._task: TimerHandle | None = None
async def _run_task(self):
response = None
try:
response = await self.callback()
except RoborockException:
pass
loop = asyncio.get_running_loop()
self._task = loop.call_later(self.interval, self._run_task_soon)
return response
def _run_task_soon(self):
asyncio.create_task(self._run_task())
def cancel(self):
if self._task:
self._task.cancel()
async def reset(self):
self.cancel()
return await self._run_task()
class RoborockLoggerAdapter(logging.LoggerAdapter):
def __init__(self, prefix: str, logger: logging.Logger) -> None:
super().__init__(logger, {})
self.prefix = prefix
def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]:
return f"[{self.prefix}] {msg}", kwargs
counter_map: dict[tuple[int, int], int] = {}
def get_next_int(min_val: int, max_val: int):
"""Gets a random int in the range, precached to help keep it fast."""
if (min_val, max_val) not in counter_map:
# If we have never seen this range, or if the cache is getting low, make a bunch of preshuffled values.
counter_map[(min_val, max_val)] = min_val
counter_map[(min_val, max_val)] += 1
return counter_map[(min_val, max_val)] % max_val + min_val