-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_http_transformer.py
More file actions
59 lines (48 loc) · 2.25 KB
/
test_http_transformer.py
File metadata and controls
59 lines (48 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Assuming the classes from your latest example are in a file named `pipeline_lib.py`
# This includes Pipeline, Transformer, and your HTTPTransformer.
import requests_mock
from laygo import HTTPTransformer
from laygo import Pipeline
from laygo import PipelineContext
class TestHTTPTransformer:
"""
Test suite for the HTTPTransformer class.
"""
def test_distributed_transformer_with_mock(self):
"""
Tests the HTTPTransformer by mocking the worker endpoint.
This test validates that the client-side of the transformer correctly
calls the endpoint and processes the response from the (mocked) worker.
"""
# 1. Define the transformer's properties
base_url = "http://mock-worker.com"
endpoint = "/process/data"
worker_url = f"{base_url}{endpoint}"
# 2. Define the transformer and its logic using the chainable API.
# This single instance holds both the client and server logic.
http_transformer = (
HTTPTransformer(base_url=base_url, endpoint=endpoint).map(lambda x: x * 2).filter(lambda x: x > 10)
)
# Set a small chunk_size to ensure the client makes multiple requests
http_transformer.chunk_size = 4
# 3. Get the worker's logic from the transformer itself
# The `get_route` method provides the exact function the worker would run.
_, worker_view_func = http_transformer.get_route()
# 4. Configure the mock endpoint to use the real worker logic
def mock_response(request, context):
"""The behavior of the mocked Flask endpoint."""
input_chunk = request.json()
# Call the actual view function logic obtained from get_route()
# We pass None for the context as it's not used in this simple case.
output_chunk = worker_view_func(chunk=input_chunk, context=PipelineContext())
return output_chunk
# Use requests_mock context manager
with requests_mock.Mocker() as m:
m.post(worker_url, json=mock_response)
# 5. Run the standard Pipeline with the configured transformer
initial_data = list(range(10)) # [0, 1, 2, ..., 9]
pipeline = Pipeline(initial_data).apply(http_transformer)
result = pipeline.to_list()
# 6. Assert the final result
expected_result = [12, 14, 16, 18]
assert sorted(result) == sorted(expected_result)