-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.py
More file actions
249 lines (220 loc) · 8.1 KB
/
parallel.py
File metadata and controls
249 lines (220 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""Parallel transformer implementation using multiple processes and loky."""
from collections import deque
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import MutableMapping
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import Future
from concurrent.futures import wait
import copy
import itertools
import multiprocessing as mp
from multiprocessing.managers import DictProxy
from typing import Any
from typing import Union
from typing import overload
from loky import ProcessPoolExecutor
from loky import get_reusable_executor
from laygo.errors import ErrorHandler
from laygo.helpers import PipelineContext
from laygo.transformers.transformer import ChunkErrorHandler
from laygo.transformers.transformer import InternalTransformer
from laygo.transformers.transformer import PipelineFunction
from laygo.transformers.transformer import Transformer
def _process_chunk_for_multiprocessing[In, Out](
transformer: InternalTransformer[In, Out],
shared_context: MutableMapping[str, Any],
chunk: list[In],
) -> list[Out]:
"""
Top-level function to process a single chunk.
'loky' will use cloudpickle to serialize the 'transformer' object.
"""
return transformer(chunk, shared_context) # type: ignore
def createParallelTransformer[T](
_type_hint: type[T],
max_workers: int = 4,
ordered: bool = True,
chunk_size: int | None = None,
) -> "ParallelTransformer[T, T]":
"""Create a new identity parallel transformer with an explicit type hint."""
return ParallelTransformer[T, T](
max_workers=max_workers,
ordered=ordered,
chunk_size=chunk_size,
transformer=None,
)
class ParallelTransformer[In, Out](Transformer[In, Out]):
"""
A transformer that executes operations concurrently using multiple processes.
It uses 'loky' to support dynamically created transformation logic.
"""
def __init__(
self,
max_workers: int = 4,
ordered: bool = True,
chunk_size: int | None = None,
transformer: InternalTransformer[In, Out] | None = None,
):
super().__init__(chunk_size, transformer)
self.max_workers = max_workers
self.ordered = ordered
@classmethod
def from_transformer[T, U](
cls,
transformer: Transformer[T, U],
chunk_size: int | None = None,
max_workers: int = 4,
ordered: bool = True,
) -> "ParallelTransformer[T, U]":
return cls(
chunk_size=chunk_size or transformer.chunk_size,
transformer=copy.deepcopy(transformer.transformer), # type: ignore
max_workers=max_workers,
ordered=ordered,
)
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
"""
Executes the transformer on data concurrently. It uses the shared
context provided by the Pipeline, if available.
"""
run_context = context if context is not None else self.context
# Detect if the context is already managed by the Pipeline.
is_managed_context = isinstance(run_context, DictProxy)
if is_managed_context:
# Use the existing shared context and lock from the Pipeline.
shared_context = run_context
yield from self._execute_with_context(data, shared_context)
# The context is live, so no need to update it here.
# The Pipeline's __exit__ will handle final state.
else:
# Fallback for standalone use: create a temporary manager.
with mp.Manager() as manager:
initial_ctx_data = dict(run_context)
shared_context = manager.dict(initial_ctx_data)
if "lock" not in shared_context:
shared_context["lock"] = manager.Lock()
yield from self._execute_with_context(data, shared_context)
# Copy results back to the original non-shared context.
final_context_state = dict(shared_context)
final_context_state.pop("lock", None)
run_context.update(final_context_state)
def _execute_with_context(self, data: Iterable[In], shared_context: MutableMapping[str, Any]) -> Iterator[Out]:
"""Helper to run the execution logic with a given context."""
executor = get_reusable_executor(max_workers=self.max_workers)
chunks_to_process = self._chunk_generator(data)
gen_func = self._ordered_generator if self.ordered else self._unordered_generator
processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context)
for result_chunk in processed_chunks_iterator:
yield from result_chunk
# ... The rest of the file remains the same ...
def _ordered_generator(
self,
chunks_iter: Iterator[list[In]],
executor: ProcessPoolExecutor,
shared_context: MutableMapping[str, Any],
) -> Iterator[list[Out]]:
"""Generate results in their original order."""
futures: deque[Future[list[Out]]] = deque()
for _ in range(self.max_workers + 1):
try:
chunk = next(chunks_iter)
futures.append(
executor.submit(
_process_chunk_for_multiprocessing,
self.transformer,
shared_context,
chunk,
)
)
except StopIteration:
break
while futures:
yield futures.popleft().result()
try:
chunk = next(chunks_iter)
futures.append(
executor.submit(
_process_chunk_for_multiprocessing,
self.transformer,
shared_context,
chunk,
)
)
except StopIteration:
continue
def _unordered_generator(
self,
chunks_iter: Iterator[list[In]],
executor: ProcessPoolExecutor,
shared_context: MutableMapping[str, Any],
) -> Iterator[list[Out]]:
"""Generate results as they complete."""
futures = {
executor.submit(
_process_chunk_for_multiprocessing,
self.transformer,
shared_context,
chunk,
)
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()
try:
chunk = next(chunks_iter)
futures.add(
executor.submit(
_process_chunk_for_multiprocessing,
self.transformer,
shared_context,
chunk,
)
)
except StopIteration:
continue
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]":
super().on_error(handler)
return self
def map[U](self, function: PipelineFunction[Out, U]) -> "ParallelTransformer[In, U]":
super().map(function)
return self # type: ignore
def filter(self, predicate: PipelineFunction[Out, bool]) -> "ParallelTransformer[In, Out]":
super().filter(predicate)
return self
@overload
def flatten[T](self: "ParallelTransformer[In, list[T]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ...
def flatten[T]( # type: ignore
self: Union[
"ParallelTransformer[In, list[T]]",
"ParallelTransformer[In, tuple[T, ...]]",
"ParallelTransformer[In, set[T]]",
],
) -> "ParallelTransformer[In, T]":
super().flatten() # type: ignore
return self # type: ignore
def tap(self, function: PipelineFunction[Out, Any]) -> "ParallelTransformer[In, Out]":
super().tap(function)
return self
def apply[T](
self, t: Callable[["ParallelTransformer[In, Out]"], "Transformer[In, T]"]
) -> "ParallelTransformer[In, T]":
super().apply(t) # type: ignore
return self # type: ignore
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
) -> "ParallelTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "ParallelTransformer[In, Out]":
super().short_circuit(function)
return self