-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_task.py
More file actions
182 lines (146 loc) · 5.55 KB
/
test_task.py
File metadata and controls
182 lines (146 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import unittest
import contextlib
from typing import Any, Mapping, Iterator
from unittest import mock
import fakeredis
from django_lightweight_queue import task
from django_lightweight_queue.types import QueueName, WorkerNumber
from django_lightweight_queue.utils import get_path, get_backend
from django_lightweight_queue.backends.redis import RedisBackend
from . import settings
QUEUE = QueueName('dummy-queue')
@task(str(QUEUE))
def dummy_task(num: int) -> None:
pass
class TaskTests(unittest.TestCase):
longMessage = True
prefix = settings.LIGHTWEIGHT_QUEUE_REDIS_PREFIX
@contextlib.contextmanager
def mock_workers(self, workers: Mapping[str, int]) -> Iterator[None]:
with unittest.mock.patch(
'django_lightweight_queue.utils._accepting_implied_queues',
new=False,
), unittest.mock.patch.dict(
'django_lightweight_queue.app_settings.Settings.WORKERS',
workers,
):
yield
def setUp(self) -> None:
super().setUp()
get_backend.cache_clear()
with mock.patch('redis.StrictRedis', fakeredis.FakeStrictRedis):
self.backend = RedisBackend()
# Mock get_backend. Unfortunately due to the nameing of the 'task'
# decorator class being the same as its containing module and it being
# exposed as the symbol at django_lightweight_queue.task, we cannot mock
# this in the normal way. Instead we mock get_path (which get_backend
# calls) and intercept the our dummy value.
def mocked_get_path(path: str) -> Any:
if path == 'test-backend':
return lambda: self.backend
return get_path(path)
patch = mock.patch(
'django_lightweight_queue.app_settings.Settings.BACKEND',
new='test-backend',
)
patch.start()
self.addCleanup(patch.stop)
patch = mock.patch(
'django_lightweight_queue.utils.get_path',
side_effect=mocked_get_path,
)
patch.start()
self.addCleanup(patch.stop)
def tearDown(self) -> None:
super().tearDown()
get_backend.cache_clear()
def test_enqueues_job(self) -> None:
self.assertEqual(0, self.backend.length(QUEUE))
dummy_task(42)
job = self.backend.dequeue(QUEUE, WorkerNumber(0), 1)
# Plain assert to placate mypy
assert job is not None, "Failed to get a job after enqueuing one"
self.assertEqual(
{
'path': 'tests.test_task.dummy_task',
'args': [42],
'kwargs': {},
'timeout': None,
'sigkill_on_stop': False,
'created_time': mock.ANY,
},
job.as_dict(),
)
def test_enqueues_job_queue_override(self) -> None:
OTHER_QUEUE = QueueName('other-queue')
self.assertEqual(0, self.backend.length(QUEUE))
self.assertEqual(0, self.backend.length(OTHER_QUEUE))
dummy_task(42, django_lightweight_queue_queue=OTHER_QUEUE)
self.assertIsNone(self.backend.dequeue(QUEUE, WorkerNumber(0), 1))
job = self.backend.dequeue(OTHER_QUEUE, WorkerNumber(0), 1)
# Plain assert to placate mypy
assert job is not None, "Failed to get a job after enqueuing one"
self.assertEqual(
{
'path': 'tests.test_task.dummy_task',
'args': [42],
'kwargs': {},
'timeout': None,
'sigkill_on_stop': False,
'created_time': mock.ANY,
},
job.as_dict(),
)
def test_bulk_enqueues_jobs(self) -> None:
self.assertEqual(0, self.backend.length(QUEUE))
with dummy_task.bulk_enqueue() as enqueue:
enqueue(13)
enqueue(num=42)
job = self.backend.dequeue(QUEUE, WorkerNumber(0), 1)
# Plain assert to placate mypy
assert job is not None, "Failed to get a job after enqueuing one"
self.assertEqual(
{
'path': 'tests.test_task.dummy_task',
'args': [13],
'kwargs': {},
'timeout': None,
'sigkill_on_stop': False,
'created_time': mock.ANY,
},
job.as_dict(),
"First job",
)
job = self.backend.dequeue(QUEUE, WorkerNumber(0), 1)
# Plain assert to placate mypy
assert job is not None, "Failed to get a job after enqueuing one"
self.assertEqual(
{
'path': 'tests.test_task.dummy_task',
'args': [],
'kwargs': {'num': 42},
'timeout': None,
'sigkill_on_stop': False,
'created_time': mock.ANY,
},
job.as_dict(),
"Second job",
)
def test_bulk_enqueues_jobs_batch_size_boundary(self) -> None:
self.assertEqual(0, self.backend.length(QUEUE), "Should initially be empty")
with dummy_task.bulk_enqueue(batch_size=3) as enqueue:
enqueue(1)
enqueue(2)
enqueue(3)
enqueue(4)
jobs = [
self.backend.dequeue(QUEUE, WorkerNumber(0), 1)
for _ in range(4)
]
self.assertEqual(0, self.backend.length(QUEUE), "Should be empty after dequeuing all jobs")
args = [x.args for x in jobs if x is not None]
self.assertEqual(
[[1], [2], [3], [4]],
args,
"Wrong jobs bulk enqueued",
)