Skip to content

fix: Handle window closes correctly in reducestreamer#331

Merged
BulkBeing merged 5 commits intomainfrom
reducestreamer-watermark
Mar 10, 2026
Merged

fix: Handle window closes correctly in reducestreamer#331
BulkBeing merged 5 commits intomainfrom
reducestreamer-watermark

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 9, 2026

Fixes: #328

When I tried to reproduce the issue, the reducestream UDF was crashing after running for sometime.

Reduce UDF:

class Counter(ReduceStreamer):
    async def handler(
        self,
        keys: list[str],
        datums: AsyncIterable[ReduceDatum],
        output: NonBlockingIterator,
        md: Metadata,
    ):
        logger.info(
            "Window opened: start=%s end=%s keys=%s",
            md.interval_window.start,
            md.interval_window.end,
            keys,
        )
        count = 0
        async for datum in datums:
            count += 1
            logger.info(
                "datum #%d: event_time=%s, watermark=%s",
                count, datum.event_time, datum.watermark,
            )
        logger.info("Window closed, total count=%d", count)
        await output.put(ReduceMessage(str(count).encode(), keys=keys))

When the platform (numa) closes sender end of the bidirectional gRPC stream between sliding window intervals, the in-flight await queue.get() in the consumer async iterator gets cancelled, raising asyncio.CancelledError. This is normal gRPC lifecycle — it's how the framework signals "this stream is done."

But ReduceFn had a blanket except BaseException that caught it. CancelledError is a subclass of BaseException (not Exception), so it was treated as a UDF error.

The caught exception was passed to handle_async_errorexit_on_errorpsutil.Process.kill(), which killed the entire Python process. Since a reduce streamer has many concurrent windows/streams active at once, killing the process took down all of them.

After the fix:
Screenshot 2026-03-09 at 6 03 29 PM

Screenshot 2026-03-09 at 6 03 22 PM

Also verified UDF exception are reaching UI
Screenshot 2026-03-09 at 5 57 17 PM

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link

codecov bot commented Mar 9, 2026

Codecov Report

❌ Patch coverage is 75.00000% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.10%. Comparing base (4d33841) to head (698bda3).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...numaflow/reducestreamer/servicer/async_servicer.py 79.41% 2 Missing and 5 partials ⚠️
...numaflow/pynumaflow/reducestreamer/async_server.py 66.66% 3 Missing ⚠️
...pynumaflow/reducestreamer/servicer/task_manager.py 66.66% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #331      +/-   ##
==========================================
- Coverage   94.20%   94.10%   -0.11%     
==========================================
  Files          66       66              
  Lines        3091     3137      +46     
  Branches      162      168       +6     
==========================================
+ Hits         2912     2952      +40     
- Misses        149      151       +2     
- Partials       30       34       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.sock_path,
self.max_threads,
)
_LOGGER.info("Debug build") # FIXME: remove this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing merged commit b37f25f into main Mar 10, 2026
11 of 12 checks passed
@BulkBeing BulkBeing deleted the reducestreamer-watermark branch March 10, 2026 03:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UDFs do not seem to propagate Watermarks

2 participants