-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp.py
More file actions
157 lines (131 loc) · 5.66 KB
/
http.py
File metadata and controls
157 lines (131 loc) · 5.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
The final, self-sufficient DistributedTransformer with corrected typing.
"""
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
import hashlib
import itertools
import pickle
from typing import Any
from typing import TypeVar
from typing import Union
from typing import overload
import requests
from laygo.errors import ErrorHandler
from laygo.helpers import PipelineContext
from laygo.transformers.transformer import ChunkErrorHandler
from laygo.transformers.transformer import PipelineFunction
from laygo.transformers.transformer import Transformer
In = TypeVar("In")
Out = TypeVar("Out")
T = TypeVar("T")
U = TypeVar("U")
class HTTPTransformer(Transformer[In, Out]):
"""
A self-sufficient, chainable transformer that manages its own
distributed execution and worker endpoint definition.
"""
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
super().__init__()
self.base_url = base_url.rstrip("/")
self.endpoint = endpoint
self.max_workers = max_workers
self.session = requests.Session()
self._worker_url: str | None = None
def _finalize_config(self):
"""Determines the final worker URL, generating one if needed."""
if hasattr(self, "_worker_url") and self._worker_url:
return
if self.endpoint:
path = self.endpoint
else:
if not self.transformer:
raise ValueError("Cannot determine endpoint for an empty transformer.")
serialized_logic = pickle.dumps(self.transformer)
hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16]
path = f"/autogen/{hash_id}"
self.endpoint = path.lstrip("/")
self._worker_url = f"{self.base_url}/{self.endpoint}"
# --- Original HTTPTransformer Methods ---
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
"""CLIENT-SIDE: Called by the Pipeline to start distributed processing."""
self._finalize_config()
def process_chunk(chunk: list) -> list:
"""Target for a thread: sends one chunk to the worker."""
try:
response = self.session.post(
self._worker_url, # type: ignore
json=chunk,
timeout=300,
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error calling worker {self._worker_url}: {e}")
return []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
chunk_iterator = self._chunk_generator(data)
futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield from future.result()
try:
new_chunk = next(chunk_iterator)
futures.add(executor.submit(process_chunk, new_chunk))
except StopIteration:
continue
def get_route(self):
"""
Function that returns the route for the worker.
This is used to register the worker in a Flask app or similar.
"""
self._finalize_config()
def worker_view_func(chunk: list, context: PipelineContext):
"""The actual worker logic for this transformer."""
return self.transformer(chunk, context)
return (f"/{self.endpoint}", worker_view_func)
# --- Overridden Chaining Methods to Preserve Type ---
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "HTTPTransformer[In, Out]":
super().on_error(handler)
return self
def map[U](self, function: PipelineFunction[Out, U]) -> "HTTPTransformer[In, U]":
super().map(function)
return self # type: ignore
def filter(self, predicate: PipelineFunction[Out, bool]) -> "HTTPTransformer[In, Out]":
super().filter(predicate)
return self
@overload
def flatten[T](self: "HTTPTransformer[In, list[T]]") -> "HTTPTransformer[In, T]": ...
@overload
def flatten[T](self: "HTTPTransformer[In, tuple[T, ...]]") -> "HTTPTransformer[In, T]": ...
@overload
def flatten[T](self: "HTTPTransformer[In, set[T]]") -> "HTTPTransformer[In, T]": ...
# Forgive me for I have sinned, but this is necessary to avoid type errors
# Sinec I'm setting self type in the parent class, overriding it isn't allowed
def flatten[T]( # type: ignore
self: Union["HTTPTransformer[In, list[T]]", "HTTPTransformer[In, tuple[T, ...]]", "HTTPTransformer[In, set[T]]"],
) -> "HTTPTransformer[In, T]":
super().flatten() # type: ignore
return self # type: ignore
def tap(self, function: PipelineFunction[Out, Any]) -> "HTTPTransformer[In, Out]":
super().tap(function)
return self
def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]"]) -> "HTTPTransformer[In, T]":
# Note: The type hint for `t` is slightly adjusted to reflect it receives an HTTPTransformer
super().apply(t) # type: ignore
return self # type: ignore
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
) -> "HTTPTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "HTTPTransformer[In, Out]":
super().short_circuit(function)
return self