-
-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathsimple_retry_middleware.py
More file actions
84 lines (68 loc) · 2.56 KB
/
simple_retry_middleware.py
File metadata and controls
84 lines (68 loc) · 2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from logging import getLogger
from typing import Any
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.exceptions import NoResultError
from taskiq.kicker import AsyncKicker
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult
logger = getLogger("taskiq.retry_middleware")
class SimpleRetryMiddleware(TaskiqMiddleware):
"""Middleware to add retries."""
def __init__(
self,
default_retry_count: int = 3,
default_retry_label: bool = False,
no_result_on_retry: bool = True,
) -> None:
self.default_retry_count = default_retry_count
self.default_retry_label = default_retry_label
self.no_result_on_retry = no_result_on_retry
async def on_error(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
exception: BaseException,
) -> None:
"""
Retry on error.
This middleware is used to retry
tasks on errors.
If error is found during the execution
this function is invoked.
:param message: Message that caused the error.
:param result: execution result.
:param exception: found exception.
"""
# Valid exception
if isinstance(exception, NoResultError):
return
retry_on_error = message.labels.get("retry_on_error")
if isinstance(retry_on_error, str):
retry_on_error = retry_on_error.lower() == "true"
if retry_on_error is None:
retry_on_error = self.default_retry_label
# Check if retrying is enabled for the task.
if not retry_on_error:
return
kicker: AsyncKicker[Any, Any] = AsyncKicker(
task_name=message.task_name,
broker=self.broker,
labels=message.labels,
).with_task_id(message.task_id)
# Getting number of previous retries.
retries = int(message.labels.get("_retries", 0)) + 1
kicker.with_labels(_retries=retries)
max_retries = int(message.labels.get("max_retries", self.default_retry_count))
if retries < max_retries:
logger.info(
"Task '%s' invocation failed. Retrying.",
message.task_name,
)
await kicker.kiq(*message.args, **message.kwargs)
if self.no_result_on_retry:
result.error = NoResultError()
else:
logger.warning(
"Task '%s' invocation failed. Maximum retries count is reached.",
message.task_name,
)