Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions packages/pynumaflow/pynumaflow/reducestreamer/async_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import contextlib
import inspect
import sys

import aiorun
import grpc

from pynumaflow.info.server import write as info_server_write
from pynumaflow.info.types import ServerInfo, ContainerType, MINIMUM_NUMAFLOW_VERSION
from pynumaflow.proto.reducer import reduce_pb2_grpc

Expand All @@ -15,6 +19,7 @@
REDUCE_STREAM_SOCK_PATH,
REDUCE_STREAM_SERVER_INFO_FILE_PATH,
MAX_NUM_THREADS,
NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS,
)

from pynumaflow.reducestreamer._dtypes import (
Expand All @@ -23,7 +28,7 @@
ReduceStreamer,
)

from pynumaflow.shared.server import NumaflowServer, check_instance, start_async_server
from pynumaflow.shared.server import NumaflowServer, check_instance


def get_handler(
Expand Down Expand Up @@ -156,6 +161,7 @@ def __init__(
]
# Get the servicer instance for the async server
self.servicer = AsyncReduceStreamServicer(self.reduce_stream_handler)
self._error: BaseException | None = None

def start(self):
"""
Expand All @@ -166,6 +172,9 @@ def start(self):
"Starting Async Reduce Stream Server",
)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
if self._error:
_LOGGER.critical("Server exiting due to UDF error: %s", self._error)
sys.exit(1)

async def aexec(self):
"""
Expand All @@ -178,15 +187,42 @@ async def aexec(self):
# Create a new async server instance and add the servicer to it
server = grpc.aio.server(options=self._server_options)
server.add_insecure_port(self.sock_path)

# The asyncio.Event must be created here (inside aexec) rather than in __init__,
# because it must be bound to the running event loop that aiorun creates.
shutdown_event = asyncio.Event()
self.servicer.set_shutdown_event(shutdown_event)

reduce_pb2_grpc.add_ReduceServicer_to_server(self.servicer, server)

serv_info = ServerInfo.get_default_server_info()
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Reducestreamer]
await start_async_server(
server_async=server,
sock_path=self.sock_path,
max_threads=self.max_threads,
cleanup_coroutines=list(),
server_info_file=self.server_info_file,
server_info=serv_info,

await server.start()
info_server_write(server_info=serv_info, info_file=self.server_info_file)

_LOGGER.info(
"Async GRPC Reduce Stream Server listening on: %s with max threads: %s",
self.sock_path,
self.max_threads,
)

async def _watch_for_shutdown():
"""Wait for the shutdown event and stop the server with a grace period."""
await shutdown_event.wait()
_LOGGER.info("Shutdown signal received, stopping server gracefully...")
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)

shutdown_task = asyncio.create_task(_watch_for_shutdown())
await server.wait_for_termination()

# Propagate error so start() can exit with a non-zero code
self._error = self.servicer._error

shutdown_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await shutdown_task

_LOGGER.info("Stopping event loop...")
asyncio.get_event_loop().stop()
_LOGGER.info("Event loop stopped")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING
from pynumaflow._constants import ERR_UDF_EXCEPTION_STRING, _LOGGER
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
from pynumaflow.reducestreamer._dtypes import (
Datum,
Expand All @@ -12,7 +12,7 @@
ReduceRequest,
)
from pynumaflow.reducestreamer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import handle_async_error
from pynumaflow.shared.server import update_context_err
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -47,6 +47,12 @@ def __init__(
):
# The Reduce handler can be a function or a builder class instance.
self.__reduce_handler: ReduceStreamAsyncCallable | _ReduceStreamBuilderClass = handler
self._shutdown_event: asyncio.Event | None = None
self._error: BaseException | None = None

def set_shutdown_event(self, event: asyncio.Event):
"""Wire up the shutdown event created by the server's aexec() coroutine."""
self._shutdown_event = event

async def ReduceFn(
self,
Expand Down Expand Up @@ -94,20 +100,50 @@ async def ReduceFn(
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, BaseException):
await handle_async_error(context, msg, ERR_UDF_EXCEPTION_STRING)
err_msg = f"ReduceStreamError, {ERR_UDF_EXCEPTION_STRING}: {repr(msg)}"
_LOGGER.critical(err_msg, exc_info=True)
update_context_err(context, msg, err_msg)
self._error = msg
if self._shutdown_event is not None:
self._shutdown_event.set()
return
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except GeneratorExit:
# ReduceFn is an async generator (it yields messages). When Numaflow closes a
# window, gRPC calls .aclose() on this generator, throwing GeneratorExit at
# the yield point. This is normal stream lifecycle — return cleanly.
return
except asyncio.CancelledError:
# SIGTERM: aiorun cancelled all tasks. Signal the server to stop so
# Server.__del__ doesn't try to schedule on a closed event loop.
if self._shutdown_event is not None:
self._shutdown_event.set()
return
except BaseException as e:
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
err_msg = f"ReduceStreamError, {ERR_UDF_EXCEPTION_STRING}: {repr(e)}"
_LOGGER.critical(err_msg, exc_info=True)
update_context_err(context, e, err_msg)
self._error = e
if self._shutdown_event is not None:
self._shutdown_event.set()
return
# Wait for the process_input_stream task to finish for a clean exit
try:
await producer
except asyncio.CancelledError:
if self._shutdown_event is not None:
self._shutdown_event.set()
return
except BaseException as e:
await handle_async_error(context, e, ERR_UDF_EXCEPTION_STRING)
err_msg = f"ReduceStreamError, {ERR_UDF_EXCEPTION_STRING}: {repr(e)}"
_LOGGER.critical(err_msg, exc_info=True)
update_context_err(context, e, err_msg)
self._error = e
if self._shutdown_event is not None:
self._shutdown_event.set()
return

async def IsReady(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ async def __invoke_reduce(
new_instance = self.__reduce_handler.create()
try:
_ = await new_instance(keys, request_iterator, output, md)
except asyncio.CancelledError:
_LOGGER.info("ReduceStream __invoke_reduce cancelled, returning cleanly")
return
# If there is an error in the reduce operation, log and
# then send the error to the result queue
except BaseException as err:
Expand All @@ -217,6 +220,9 @@ async def process_input_stream(self, request_iterator: AsyncIterable[reduce_pb2.
# append the task data to the existing task
# if the task does not exist, create a new task
await self.send_datum_to_task(request)
except asyncio.CancelledError:
_LOGGER.info("ReduceStream process_input_stream cancelled, returning cleanly")
return
# If there is an error in the reduce operation, log and
# then send the error to the result queue
except BaseException as e:
Expand Down Expand Up @@ -261,6 +267,9 @@ async def process_input_stream(self, request_iterator: AsyncIterable[reduce_pb2.

# Once all tasks are completed, senf EOF the global result queue
await self.global_result_queue.put(STREAM_EOF)
except asyncio.CancelledError:
_LOGGER.info("ReduceStream post-processing cancelled, returning cleanly")
return
except BaseException as e:
err_msg = f"Reduce Streaming Error: {repr(e)}"
_LOGGER.critical(err_msg, exc_info=True)
Expand Down
Loading