@@ -660,6 +660,9 @@ def get_first_token_time(span_id: str) -> Optional[float]:
660660# ==============================================================================
661661# HELPER: BATCH PROCESSOR
662662# ==============================================================================
663+ _SHUTDOWN_SENTINEL = object ()
664+
665+
663666class BatchProcessor :
664667 """Handles asynchronous batching and writing of events to BigQuery."""
665668
@@ -809,11 +812,18 @@ async def _batch_writer(self) -> None:
809812 self ._queue .get (), timeout = self .flush_interval
810813 )
811814
815+ if first_item is _SHUTDOWN_SENTINEL :
816+ self ._queue .task_done ()
817+ continue
818+
812819 batch .append (first_item )
813820
814821 while len (batch ) < self .batch_size :
815822 try :
816823 item = self ._queue .get_nowait ()
824+ if item is _SHUTDOWN_SENTINEL :
825+ self ._queue .task_done ()
826+ continue
817827 batch .append (item )
818828 except asyncio .QueueEmpty :
819829 break
@@ -831,6 +841,13 @@ async def _batch_writer(self) -> None:
831841 except asyncio .CancelledError :
832842 logger .info ("Batch writer task cancelled." )
833843 break
844+ except RuntimeError as e :
845+ if "Event loop is closed" in str (e ):
846+ logger .info ("Batch writer loop closed: %s" , e )
847+ break
848+ # Re-raise other RuntimeErrors (or log them below)
849+ logger .error ("RuntimeError in batch writer loop: %s" , e , exc_info = True )
850+ await asyncio .sleep (1 )
834851 except Exception as e :
835852 logger .error ("Error in batch writer loop: %s" , e , exc_info = True )
836853 await asyncio .sleep (1 )
@@ -939,12 +956,24 @@ async def shutdown(self, timeout: float = 5.0) -> None:
939956 """
940957 self ._shutdown = True
941958 logger .info ("BatchProcessor shutting down, draining queue..." )
959+
960+ # Signal the writer to wake up and check shutdown status
961+ try :
962+ self ._queue .put_nowait (_SHUTDOWN_SENTINEL )
963+ except asyncio .QueueFull :
964+ # If queue is full, the writer is active and will check _shutdown soon
965+ pass
966+
942967 if self ._batch_processor_task :
943968 try :
944969 await asyncio .wait_for (self ._batch_processor_task , timeout = timeout )
945970 except asyncio .TimeoutError :
946971 logger .warning ("BatchProcessor shutdown timed out, cancelling worker." )
947972 self ._batch_processor_task .cancel ()
973+ try :
974+ await self ._batch_processor_task
975+ except asyncio .CancelledError :
976+ pass
948977 except Exception as e :
949978 logger .error ("Error during BatchProcessor shutdown: %s" , e )
950979
@@ -1626,51 +1655,55 @@ def get_credentials():
16261655 def _atexit_cleanup (batch_processor : "BatchProcessor" ) -> None :
16271656 """Clean up batch processor on script exit."""
16281657 # Check if the batch_processor object is still alive
1629- if batch_processor and not batch_processor ._shutdown :
1630- # Emergency Flush: Rescue any logs remaining in the queue
1631- remaining_items = []
1632- try :
1633- while True :
1634- remaining_items .append (batch_processor ._queue .get_nowait ())
1635- except (asyncio .QueueEmpty , AttributeError ):
1636- pass
1637-
1638- if remaining_items :
1639- # We need a new loop and client to flush these
1640- async def rescue_flush ():
1641- try :
1642- # Create a short-lived client just for this flush
1658+ try :
1659+ if batch_processor and not batch_processor ._shutdown :
1660+ # Emergency Flush: Rescue any logs remaining in the queue
1661+ remaining_items = []
1662+ try :
1663+ while True :
1664+ remaining_items .append (batch_processor ._queue .get_nowait ())
1665+ except (asyncio .QueueEmpty , AttributeError ):
1666+ pass
1667+
1668+ if remaining_items :
1669+ # We need a new loop and client to flush these
1670+ async def rescue_flush ():
16431671 try :
1644- # Note: This relies on google.auth.default() working in this context.
1645- # pylint: disable=g-import-not-at-top
1646- from google .cloud .bigquery_storage_v1 .services .big_query_write .async_client import BigQueryWriteAsyncClient
1647-
1648- # pylint: enable=g-import-not-at-top
1649- client = BigQueryWriteAsyncClient ()
1672+ # Create a short-lived client just for this flush
1673+ try :
1674+ # Note: This relies on google.auth.default() working in this context.
1675+ # pylint: disable=g-import-not-at-top
1676+ from google .cloud .bigquery_storage_v1 .services .big_query_write .async_client import BigQueryWriteAsyncClient
1677+
1678+ # pylint: enable=g-import-not-at-top
1679+ client = BigQueryWriteAsyncClient ()
1680+ except Exception as e :
1681+ logger .warning ("Could not create rescue client: %s" , e )
1682+ return
1683+
1684+ # Patch batch_processor.write_client temporarily
1685+ old_client = batch_processor .write_client
1686+ batch_processor .write_client = client
1687+ try :
1688+ # Force a write
1689+ await batch_processor ._write_rows_with_retry (remaining_items )
1690+ logger .info ("Rescued logs flushed successfully." )
1691+ except Exception as e :
1692+ logger .error ("Failed to flush rescued logs: %s" , e )
1693+ finally :
1694+ batch_processor .write_client = old_client
16501695 except Exception as e :
1651- logger .warning ("Could not create rescue client: %s" , e )
1652- return
1696+ logger .error ("Rescue flush failed: %s" , e )
16531697
1654- # Patch batch_processor.write_client temporarily
1655- old_client = batch_processor .write_client
1656- batch_processor .write_client = client
1657- try :
1658- # Force a write
1659- await batch_processor ._write_rows_with_retry (remaining_items )
1660- logger .info ("Rescued logs flushed successfully." )
1661- except Exception as e :
1662- logger .error ("Failed to flush rescued logs: %s" , e )
1663- finally :
1664- batch_processor .write_client = old_client
1698+ try :
1699+ loop = asyncio .new_event_loop ()
1700+ loop .run_until_complete (rescue_flush ())
1701+ loop .close ()
16651702 except Exception as e :
1666- logger .error ("Rescue flush failed: %s" , e )
1667-
1668- try :
1669- loop = asyncio .new_event_loop ()
1670- loop .run_until_complete (rescue_flush ())
1671- loop .close ()
1672- except Exception as e :
1673- logger .error ("Failed to run rescue loop: %s" , e )
1703+ logger .error ("Failed to run rescue loop: %s" , e )
1704+ except ReferenceError :
1705+ # batch_processor already GC'd, nothing to do
1706+ pass
16741707
16751708 def _ensure_schema_exists (self ) -> None :
16761709 """Ensures the BigQuery table exists with the correct schema."""
0 commit comments