-
-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathtest_hooks.py
More file actions
156 lines (108 loc) · 4.79 KB
/
test_hooks.py
File metadata and controls
156 lines (108 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import asyncio
from typing import Any
import pytest
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult
@pytest.mark.anyio
async def test_set_broker() -> None:
class _TestMiddleware(TaskiqMiddleware):
def set_broker(self, broker: "AsyncBroker") -> None:
super().set_broker(broker)
self.test_value = 1
middleware = _TestMiddleware()
broker = InMemoryBroker().with_middlewares(middleware)
assert middleware is broker.middlewares[0]
assert middleware.test_value == 1
@pytest.mark.anyio
async def test_startup_shutdown_in_pair() -> None:
test_list = []
class _TestMiddleware1(TaskiqMiddleware):
def startup(self) -> None:
test_list.append("1up")
def shutdown(self) -> None:
test_list.append("1down")
class _TestMiddleware2(TaskiqMiddleware):
async def startup(self) -> None:
await asyncio.sleep(0)
test_list.append("2up")
async def shutdown(self) -> None:
await asyncio.sleep(0)
test_list.append("2down")
broker = InMemoryBroker().with_middlewares(_TestMiddleware1(), _TestMiddleware2())
await broker.startup()
await broker.shutdown()
assert test_list == ["1up", "2up", "2down", "1down"]
@pytest.mark.anyio
async def test_pre_post_send_in_pair() -> None:
test_list = []
class _TestMiddleware1(TaskiqMiddleware):
def pre_send(self, message: "TaskiqMessage") -> "TaskiqMessage":
test_list.append("1pre")
return message
def post_send(self, message: "TaskiqMessage") -> None:
test_list.append("1post")
class _TestMiddleware2(TaskiqMiddleware):
def pre_send(self, message: "TaskiqMessage") -> "TaskiqMessage":
test_list.append("2pre")
return message
def post_send(self, message: "TaskiqMessage") -> None:
test_list.append("2post")
broker = InMemoryBroker().with_middlewares(_TestMiddleware1(), _TestMiddleware2())
await broker.startup()
await broker.task(lambda: None).kiq()
await broker.shutdown()
assert test_list == ["1pre", "2pre", "2post", "1post"]
@pytest.mark.anyio
async def test_pre_post_execute_in_pair() -> None:
test_list = []
class _TestMiddleware1(TaskiqMiddleware):
def pre_execute(self, message: "TaskiqMessage") -> "TaskiqMessage":
test_list.append("1pre")
return message
def post_execute(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
test_list.append("1post")
class _TestMiddleware2(TaskiqMiddleware):
def pre_execute(self, message: "TaskiqMessage") -> "TaskiqMessage":
test_list.append("2pre")
return message
def post_execute(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
test_list.append("2post")
broker = InMemoryBroker().with_middlewares(_TestMiddleware1(), _TestMiddleware2())
await broker.startup()
task = await broker.task(lambda: 1).kiq()
await task.wait_result(timeout=2)
await broker.shutdown()
assert test_list == ["1pre", "2pre", "2post", "1post"]
@pytest.mark.anyio
async def test_post_save_inverted() -> None:
test_list = []
class _TestMiddleware1(TaskiqMiddleware):
def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
test_list.append("1save")
class _TestMiddleware2(TaskiqMiddleware):
def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
test_list.append("2save")
broker = InMemoryBroker().with_middlewares(_TestMiddleware1(), _TestMiddleware2())
await broker.startup()
task = await broker.task(lambda: 1).kiq()
await task.wait_result(timeout=2)
await broker.shutdown()
assert test_list == ["2save", "1save"]
@pytest.mark.anyio
async def test_post_on_error_inverted() -> None:
test_list = []
class _TestMiddleware1(TaskiqMiddleware):
def on_error(self, message: "TaskiqMessage", result: "TaskiqResult[Any]", exception: BaseException) -> None:
test_list.append("1error")
class _TestMiddleware2(TaskiqMiddleware):
def on_error(self, message: "TaskiqMessage", result: "TaskiqResult[Any]", exception: BaseException) -> None:
test_list.append("2error")
broker = InMemoryBroker().with_middlewares(_TestMiddleware1(), _TestMiddleware2())
await broker.startup()
task = await broker.task(lambda: (_ for _ in ()).throw(Exception("test"))).kiq()
await task.wait_result(timeout=2)
await broker.shutdown()
assert test_list == ["2error", "1error"]