From cec80ffba3ba91e1e56142b584ddcbc6d4df7cb5 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Sat, 7 Mar 2026 13:35:54 +0530 Subject: [PATCH 1/3] fix: use keyed_window.keys for task lookup in accumulator task manager payload.keys is empty on CLOSE operations since no data is attached, causing "accumulator task not found" errors. keyed_window.keys is the authoritative key identity populated for all operation types. Also updates tests to set keyedWindow.keys independently from payload.keys to match real platform behavior. Signed-off-by: Sreekanth --- .../accumulator/servicer/task_manager.py | 12 ++++++--- .../accumulator/test_async_accumulator.py | 25 +++++++++++++++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index 77c3b294..0f30623f 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -104,8 +104,9 @@ async def close_task(self, req): 3. Wait for all the results from the task to be written to the global result queue 4. Remove the task from the tracker """ - d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup since payload.keys may be empty + # (e.g., CLOSE operations don't carry data, so payload.keys is not populated). + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -127,7 +128,9 @@ async def create_task(self, req): it creates a new task or appends the request to the existing task. """ d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup — the authoritative key identity + # for the window, consistent across all operation types (OPEN, APPEND, CLOSE). + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -178,7 +181,8 @@ async def send_datum_to_task(self, req): If the task does not exist, create it. """ d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup to match the key used in create_task/close_task. + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) result = self.tasks.get(unified_key, None) if not result: diff --git a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py index e0927f8e..fa8f0d60 100644 --- a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py +++ b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py @@ -30,9 +30,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) # Set operation based on index - first is OPEN, rest are APPEND if i == 0: @@ -52,9 +54,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool = def request_generator_append_only(count, request, resetkey: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) # Set operation to APPEND for all requests request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND @@ -64,9 +68,11 @@ def request_generator_append_only(count, request, resetkey: bool = False): def request_generator_mixed(count, request, resetkey: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) if i % 2 == 0: # Set operation to APPEND for even requests @@ -107,7 +113,12 @@ def start_request() -> accumulator_pb2.AccumulatorRequest: def start_request_without_open() -> accumulator_pb2.AccumulatorRequest: event_time_timestamp, watermark_timestamp = get_time_args() - + window = accumulator_pb2.KeyedWindow( + start=mock_interval_window_start(), + end=mock_interval_window_end(), + slot="slot-0", + keys=["test_key"], + ) payload = accumulator_pb2.Payload( keys=["test_key"], value=mock_message(), @@ -115,9 +126,13 @@ def start_request_without_open() -> accumulator_pb2.AccumulatorRequest: watermark=watermark_timestamp, id="test_id", ) - + operation = accumulator_pb2.AccumulatorRequest.WindowOperation( + event=accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND, + keyedWindow=window, + ) request = accumulator_pb2.AccumulatorRequest( payload=payload, + operation=operation, ) return request From e415c01441a9d39ba39a0a9d3cc74cb36090e733 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Sun, 8 Mar 2026 20:02:34 +0530 Subject: [PATCH 2/3] cleanup Signed-off-by: Sreekanth --- .../pynumaflow/accumulator/_dtypes.py | 6 +++--- .../accumulator/servicer/task_manager.py | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py index 658c76a4..62f388c7 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py @@ -17,9 +17,9 @@ class WindowOperation(IntEnum): Enumerate the type of Window operation received. """ - OPEN = (0,) - CLOSE = (1,) - APPEND = (2,) + OPEN = 0 + CLOSE = 1 + APPEND = 2 @dataclass(init=False, slots=True) diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index 0f30623f..a70fa913 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -13,7 +13,7 @@ Datum, _AccumulatorBuilderClass, AccumulatorAsyncCallable, - WindowOperation, + WindowOperation, AccumulatorRequest, ) from pynumaflow.proto.accumulator import accumulator_pb2 from pynumaflow.shared.asynciter import NonBlockingIterator @@ -93,7 +93,7 @@ async def stream_send_eof(self): for unified_key in task_keys: await self.tasks[unified_key].iterator.put(STREAM_EOF) - async def close_task(self, req): + async def close_task(self, req: AccumulatorRequest): """ Closes a running accumulator task for a given key. Based on the request we compute the unique key, and then @@ -121,7 +121,7 @@ async def close_task(self, req): # Put the exception in the result queue await self.global_result_queue.put(err) - async def create_task(self, req): + async def create_task(self, req: AccumulatorRequest): """ Creates a new accumulator task for the given request. Based on the request we compute a unique key, and then @@ -141,7 +141,7 @@ async def create_task(self, req): # Create a new result queue for the current task # We create a new result queue for each task, so that # the results of the accumulator operation can be sent to the - # the global result queue, which in turn sends the results + # global result queue, which in turn sends the results # to the client. res_queue = NonBlockingIterator() @@ -175,7 +175,7 @@ async def create_task(self, req): # Put the request in the iterator await curr_task.iterator.put(d) - async def send_datum_to_task(self, req): + async def send_datum_to_task(self, req: AccumulatorRequest): """ Appends the request to the existing window reduce task. If the task does not exist, create it. @@ -220,7 +220,7 @@ async def __invoke_accumulator( await self.global_result_queue.put(err) async def process_input_stream( - self, request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest] + self, request_iterator: AsyncIterable[AccumulatorRequest] ): # Start iterating through the request iterator and create tasks # based on the operation type received. @@ -230,15 +230,15 @@ async def process_input_stream( request_count += 1 # check whether the request is an open, append, or close operation match request.operation: - case int(WindowOperation.OPEN): + case WindowOperation.OPEN: # create a new task for the open operation and # put the request in the task iterator await self.create_task(request) - case int(WindowOperation.APPEND): + case WindowOperation.APPEND: # append the task data to the existing task # if the task does not exist, create a new task await self.send_datum_to_task(request) - case int(WindowOperation.CLOSE): + case WindowOperation.CLOSE: # close the current task for req await self.close_task(request) case _: From 3368ae4c928790c53cc08345ee350d6ac009f0ce Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Sun, 8 Mar 2026 20:07:40 +0530 Subject: [PATCH 3/3] code formatting Signed-off-by: Sreekanth --- .../pynumaflow/accumulator/servicer/task_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index a70fa913..a9758bf7 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -13,7 +13,8 @@ Datum, _AccumulatorBuilderClass, AccumulatorAsyncCallable, - WindowOperation, AccumulatorRequest, + WindowOperation, + AccumulatorRequest, ) from pynumaflow.proto.accumulator import accumulator_pb2 from pynumaflow.shared.asynciter import NonBlockingIterator @@ -219,9 +220,7 @@ async def __invoke_accumulator( # Put the exception in the result queue await self.global_result_queue.put(err) - async def process_input_stream( - self, request_iterator: AsyncIterable[AccumulatorRequest] - ): + async def process_input_stream(self, request_iterator: AsyncIterable[AccumulatorRequest]): # Start iterating through the request iterator and create tasks # based on the operation type received. try: