@@ -297,6 +297,7 @@ def __init__(
297297 concurrency_options : Optional [ConcurrencyOptions ] = None ,
298298 channel_options : Optional [Sequence [tuple [str , Any ]]] = None ,
299299 stop_timeout : float = 30.0 ,
300+ keepalive_interval : float = 30.0 ,
300301 ):
301302 self ._registry = _Registry ()
302303 self ._host_address = host_address if host_address else shared .get_default_host_address ()
@@ -306,6 +307,7 @@ def __init__(
306307 self ._secure_channel = secure_channel
307308 self ._channel_options = channel_options
308309 self ._stop_timeout = stop_timeout
310+ self ._keepalive_interval = keepalive_interval
309311 self ._current_channel : Optional [grpc .Channel ] = None # Store channel reference for cleanup
310312 self ._stream_ready = threading .Event ()
311313 # Use provided concurrency options or create default ones
@@ -368,6 +370,18 @@ def run_loop():
368370 raise RuntimeError ("Failed to establish work item stream connection within 10 seconds" )
369371 self ._is_running = True
370372
373+ async def _keepalive_loop (self , stub ):
374+ """Background keepalive loop to keep the gRPC connection alive through L7 load balancers."""
375+ loop = asyncio .get_running_loop ()
376+ while not self ._shutdown .is_set ():
377+ await asyncio .sleep (self ._keepalive_interval )
378+ if self ._shutdown .is_set ():
379+ return
380+ try :
381+ await loop .run_in_executor (None , lambda : stub .Hello (empty_pb2 .Empty ()))
382+ except grpc .RpcError as e :
383+ self ._logger .debug (f"keepalive failed: { e } " )
384+
371385 # TODO: refactor this to be more readable and maintainable.
372386 async def _async_run_loop (self ):
373387 """
@@ -472,6 +486,7 @@ def should_invalidate_connection(rpc_error):
472486 if self ._shutdown .wait (delay ):
473487 break
474488 continue
489+ keepalive_task = None
475490 try :
476491 assert current_stub is not None
477492 stub = current_stub
@@ -584,6 +599,7 @@ def stream_reader():
584599 raise
585600
586601 loop = asyncio .get_running_loop ()
602+ keepalive_task = asyncio .ensure_future (self ._keepalive_loop (stub ))
587603
588604 # NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel
589605 while not self ._shutdown .is_set ():
@@ -633,6 +649,9 @@ def stream_reader():
633649 invalidate_connection ()
634650 raise e
635651 current_reader_thread .join (timeout = 1 )
652+ keepalive_task .cancel ()
653+ with contextlib .suppress (asyncio .CancelledError ):
654+ await keepalive_task
636655
637656 if self ._shutdown .is_set ():
638657 self ._logger .info (f"Disconnected from { self ._host_address } " )
@@ -646,6 +665,10 @@ def stream_reader():
646665 # Fall through to the top of the outer loop, which will
647666 # create a fresh connection (with retry/backoff if needed)
648667 except grpc .RpcError as rpc_error :
668+ if keepalive_task is not None :
669+ keepalive_task .cancel ()
670+ with contextlib .suppress (asyncio .CancelledError ):
671+ await keepalive_task
649672 # Check shutdown first - if shutting down, exit immediately
650673 if self ._shutdown .is_set ():
651674 self ._logger .debug ("Shutdown detected during RPC error handling, exiting" )
@@ -681,6 +704,10 @@ def stream_reader():
681704 f"Application-level gRPC error ({ error_code } ): { rpc_error } "
682705 )
683706 except RuntimeError as ex :
707+ if keepalive_task is not None :
708+ keepalive_task .cancel ()
709+ with contextlib .suppress (asyncio .CancelledError ):
710+ await keepalive_task
684711 # RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
685712 # Check shutdown state first
686713 if self ._shutdown .is_set ():
@@ -704,6 +731,10 @@ def stream_reader():
704731 # it's likely shutdown-related. Break to prevent infinite retries.
705732 break
706733 except Exception as ex :
734+ if keepalive_task is not None :
735+ keepalive_task .cancel ()
736+ with contextlib .suppress (asyncio .CancelledError ):
737+ await keepalive_task
707738 if self ._shutdown .is_set ():
708739 self ._logger .debug (
709740 f"Shutdown detected during exception handling, exiting: { ex } "
0 commit comments