Skip to content

Commit ce7521a

Browse files
caohy1988claude
andcommitted
fix: use loop.is_closed() for stale loop state validation in BigQuery plugin
The previous approach in PR google#4457 used `_queue._loop` to validate loop identity, but `asyncio.Queue._loop` is `None` on Python 3.10+, causing the check to always treat states as stale and break the plugin entirely. This fix uses `loop.is_closed()` — a public, reliable API — to detect and clean up stale loop states in `_batch_processor_prop`, `_get_loop_state`, and `flush`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fc1f1db commit ce7521a

2 files changed

Lines changed: 121 additions & 0 deletions

File tree

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,11 +1643,23 @@ def __getattribute__(self, name: str) -> Any:
16431643
return self._write_stream_prop
16441644
return super().__getattribute__(name)
16451645

1646+
def _cleanup_stale_loop_states(self) -> None:
1647+
"""Removes entries for event loops that have been closed."""
1648+
stale = [loop for loop in self._loop_state_by_loop if loop.is_closed()]
1649+
for loop in stale:
1650+
logger.warning(
1651+
"Cleaning up stale loop state for closed loop %s (id=%s).",
1652+
loop,
1653+
id(loop),
1654+
)
1655+
del self._loop_state_by_loop[loop]
1656+
16461657
@property
16471658
def _batch_processor_prop(self) -> Optional["BatchProcessor"]:
16481659
"""The batch processor for the current loop (backward compatibility)."""
16491660
try:
16501661
loop = asyncio.get_running_loop()
1662+
self._cleanup_stale_loop_states()
16511663
if loop in self._loop_state_by_loop:
16521664
return self._loop_state_by_loop[loop].batch_processor
16531665
except RuntimeError:
@@ -1700,6 +1712,7 @@ async def _get_loop_state(self) -> _LoopState:
17001712
The loop-specific state object containing clients and processors.
17011713
"""
17021714
loop = asyncio.get_running_loop()
1715+
self._cleanup_stale_loop_states()
17031716
if loop in self._loop_state_by_loop:
17041717
return self._loop_state_by_loop[loop]
17051718

@@ -1771,6 +1784,7 @@ async def flush(self) -> None:
17711784
"""
17721785
try:
17731786
loop = asyncio.get_running_loop()
1787+
self._cleanup_stale_loop_states()
17741788
if loop in self._loop_state_by_loop:
17751789
await self._loop_state_by_loop[loop].batch_processor.flush()
17761790
except RuntimeError:

tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2246,3 +2246,110 @@ async def test_generation_config_logging(
22462246

22472247
if "labels" in gen_config_kwargs:
22482248
assert attributes.get("labels") == gen_config_kwargs["labels"]
2249+
2250+
2251+
class TestLoopStateValidation:
2252+
"""Tests for loop state validation and stale loop cleanup."""
2253+
2254+
def _make_plugin(self):
2255+
"""Creates a plugin instance without starting it."""
2256+
return bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin(
2257+
project_id=PROJECT_ID,
2258+
dataset_id=DATASET_ID,
2259+
table_id=TABLE_ID,
2260+
)
2261+
2262+
def _make_loop_state(self):
2263+
"""Creates a mock _LoopState with batch_processor and write_client."""
2264+
state = mock.MagicMock()
2265+
state.batch_processor = mock.MagicMock(
2266+
spec=bigquery_agent_analytics_plugin.BatchProcessor
2267+
)
2268+
state.batch_processor.flush = mock.AsyncMock()
2269+
state.write_client = mock.MagicMock()
2270+
return state
2271+
2272+
def test_cleanup_stale_loop_states_removes_closed_loops(self):
2273+
"""Closed loops should be removed from _loop_state_by_loop."""
2274+
plugin = self._make_plugin()
2275+
2276+
closed_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2277+
closed_loop.is_closed.return_value = True
2278+
2279+
plugin._loop_state_by_loop[closed_loop] = self._make_loop_state()
2280+
2281+
plugin._cleanup_stale_loop_states()
2282+
2283+
assert closed_loop not in plugin._loop_state_by_loop
2284+
2285+
def test_cleanup_stale_loop_states_keeps_open_loops(self):
2286+
"""Open loops should not be removed from _loop_state_by_loop."""
2287+
plugin = self._make_plugin()
2288+
2289+
open_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2290+
open_loop.is_closed.return_value = False
2291+
2292+
plugin._loop_state_by_loop[open_loop] = self._make_loop_state()
2293+
2294+
plugin._cleanup_stale_loop_states()
2295+
2296+
assert open_loop in plugin._loop_state_by_loop
2297+
2298+
def test_cleanup_removes_only_closed_loops(self):
2299+
"""Only closed loops should be removed; open ones stay."""
2300+
plugin = self._make_plugin()
2301+
2302+
open_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2303+
open_loop.is_closed.return_value = False
2304+
closed_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2305+
closed_loop.is_closed.return_value = True
2306+
2307+
plugin._loop_state_by_loop[open_loop] = self._make_loop_state()
2308+
plugin._loop_state_by_loop[closed_loop] = self._make_loop_state()
2309+
2310+
plugin._cleanup_stale_loop_states()
2311+
2312+
assert open_loop in plugin._loop_state_by_loop
2313+
assert closed_loop not in plugin._loop_state_by_loop
2314+
2315+
@pytest.mark.asyncio
2316+
async def test_batch_processor_prop_returns_processor_for_open_loop(
2317+
self,
2318+
):
2319+
"""_batch_processor_prop returns processor for the current loop."""
2320+
plugin = self._make_plugin()
2321+
2322+
loop = asyncio.get_running_loop()
2323+
state = self._make_loop_state()
2324+
plugin._loop_state_by_loop[loop] = state
2325+
2326+
assert plugin._batch_processor_prop is state.batch_processor
2327+
2328+
# Clean up
2329+
del plugin._loop_state_by_loop[loop]
2330+
2331+
@pytest.mark.asyncio
2332+
async def test_batch_processor_prop_cleans_closed_loop_entry(self):
2333+
"""Accessing _batch_processor_prop cleans up closed loop entries."""
2334+
plugin = self._make_plugin()
2335+
2336+
closed_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2337+
closed_loop.is_closed.return_value = True
2338+
plugin._loop_state_by_loop[closed_loop] = self._make_loop_state()
2339+
2340+
# Accessing the prop should clean up the closed loop entry
2341+
_ = plugin._batch_processor_prop
2342+
assert closed_loop not in plugin._loop_state_by_loop
2343+
2344+
@pytest.mark.asyncio
2345+
async def test_flush_cleans_stale_states(self):
2346+
"""flush() should clean up stale loop states before flushing."""
2347+
plugin = self._make_plugin()
2348+
2349+
closed_loop = mock.MagicMock(spec=asyncio.AbstractEventLoop)
2350+
closed_loop.is_closed.return_value = True
2351+
plugin._loop_state_by_loop[closed_loop] = self._make_loop_state()
2352+
2353+
await plugin.flush()
2354+
2355+
assert closed_loop not in plugin._loop_state_by_loop

0 commit comments

Comments
 (0)