-
Notifications
You must be signed in to change notification settings - Fork 853
Expand file tree
/
Copy pathcontext_managers.py
More file actions
88 lines (68 loc) · 2.7 KB
/
context_managers.py
File metadata and controls
88 lines (68 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import functools
import sys
from timeit import default_timer
from types import TracebackType
from typing import (
Callable, Literal, Optional, Tuple, Type, TYPE_CHECKING, TypeVar, Union,
)
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
if TYPE_CHECKING:
from . import Counter
TParam = ParamSpec("TParam")
TResult = TypeVar("TResult")
class ExceptionCounter:
def __init__(self, counter: "Counter", exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]]) -> None:
self._counter = counter
self._exception = exception
def __enter__(self) -> None:
pass
def __exit__(self, typ: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> Literal[False]:
if isinstance(value, self._exception):
self._counter.inc()
return False
def __call__(self, f: Callable[TParam, TResult]) -> Callable[TParam, TResult]:
@functools.wraps(f)
def wrapped(*args: TParam.args, **kwargs: TParam.kwargs) -> TResult:
with self:
return f(*args, **kwargs)
return wrapped # type: ignore
class InprogressTracker:
def __init__(self, gauge):
self._gauge = gauge
def __enter__(self):
self._gauge.inc()
def __exit__(self, typ, value, traceback):
self._gauge.dec()
def __call__(self, f: Callable[TParam, TResult]) -> Callable[TParam, TResult]:
@functools.wraps(f)
def wrapped(*args: TParam.args, **kwargs: TParam.kwargs) -> TResult:
with self:
return f(*args, **kwargs)
return wrapped # type: ignore
class Timer:
def __init__(self, metric, callback_name):
self._metric = metric
self._callback_name = callback_name
def _new_timer(self):
return self.__class__(self._metric, self._callback_name)
def __enter__(self):
self._start = default_timer()
return self
def __exit__(self, typ, value, traceback):
# Time can go backwards.
duration = max(default_timer() - self._start, 0)
callback = getattr(self._metric, self._callback_name)
callback(duration)
def labels(self, *args, **kw):
self._metric = self._metric.labels(*args, **kw)
def __call__(self, f: Callable[TParam, TResult]) -> Callable[TParam, TResult]:
@functools.wraps(f)
def wrapped(*args: TParam.args, **kwargs: TParam.kwargs) -> TResult:
# Obtaining new instance of timer every time
# ensures thread safety and reentrancy.
with self._new_timer():
return f(*args, **kwargs)
return wrapped # type: ignore