-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_base.py
More file actions
58 lines (47 loc) · 1.42 KB
/
task_base.py
File metadata and controls
58 lines (47 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Task utils.
Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
from __future__ import annotations
from functools import wraps
from typing import Any, Awaitable, Callable, Coroutine, Generic, TypeVar
from redis.asyncio import Redis
T = TypeVar("T", bound=Callable[..., Awaitable | Coroutine])
class Task(Generic[T]):
"""Task."""
def __init__(
self,
f: T,
repeat: float,
one_time: bool,
global_: bool,
) -> None:
"""Init.
:param T f: function
:param int repeat: repeat time
:param bool one_time: flag to run task only once
:param bool global_: flag to run task in lock mode
"""
self.f = f
self.repeat = repeat
self.one_time = one_time
self.global_ = global_
async def __call__(self, storage: Redis) -> None:
"""Call."""
if self.global_:
async with storage.lock(self.f.__name__):
await self.f()
else:
await self.f()
def task_metadata(
repeat: float,
one_time: bool = False,
global_: bool = False,
) -> Callable[[T], Task[T]]:
"""Decorate a Task."""
def decorator(f: T) -> Task[T]:
@wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> Task[T]:
return Task(f, repeat, one_time, global_)
return wrapper
return decorator