-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathtest_stdio_limits.py
More file actions
101 lines (81 loc) · 3.54 KB
/
test_stdio_limits.py
File metadata and controls
101 lines (81 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
import subprocess
import sys
import tempfile
import textwrap
import pytest
from acp.transports import spawn_stdio_transport
LARGE_LINE_SIZE = 70 * 1024
def _large_line_script(size: int = LARGE_LINE_SIZE) -> str:
return textwrap.dedent(
f"""
import sys
sys.stdout.write("X" * {size})
sys.stdout.write("\\n")
sys.stdout.flush()
"""
).strip()
@pytest.mark.asyncio
async def test_spawn_stdio_transport_hits_default_limit() -> None:
script = _large_line_script()
async with spawn_stdio_transport(sys.executable, "-c", script) as (reader, _writer, _process):
# readline() re-raises LimitOverrunError as ValueError on CPython 3.12+.
with pytest.raises(ValueError):
await reader.readline()
@pytest.mark.asyncio
async def test_spawn_stdio_transport_custom_limit_handles_large_line() -> None:
script = _large_line_script()
async with spawn_stdio_transport(
sys.executable,
"-c",
script,
limit=LARGE_LINE_SIZE * 2,
) as (reader, _writer, _process):
line = await reader.readline()
assert len(line) == LARGE_LINE_SIZE + 1
@pytest.mark.asyncio
async def test_run_agent_stdio_buffer_limit() -> None:
"""Test that run_agent with different buffer limits can handle appropriately sized messages."""
with tempfile.TemporaryDirectory() as tmpdir:
# Test 1: Small buffer (1KB) fails with large message (70KB)
small_agent = os.path.join(tmpdir, "small_agent.py")
with open(small_agent, "w") as f:
f.write("""
import asyncio
from acp.core import run_agent
from acp.interfaces import Agent
class TestAgent(Agent):
async def list_capabilities(self):
return {"capabilities": {}}
asyncio.run(run_agent(TestAgent(), stdio_buffer_limit_bytes=1024))
""")
# Send a 70KB message - should fail with 1KB buffer
large_msg = '{"jsonrpc":"2.0","method":"test","params":{"data":"' + "X" * LARGE_LINE_SIZE + '"}}\n'
result = subprocess.run( # noqa: S603
[sys.executable, small_agent], input=large_msg, capture_output=True, text=True, timeout=2
)
# The oversized message should be skipped with a warning (graceful recovery).
# Prior to the LimitOverrunError fix, this would crash; now it logs and continues.
assert (
"oversized" in result.stderr.lower() or "buffer limit" in result.stderr.lower() or result.returncode != 0
), f"Expected warning about oversized message or error, got: {result.stderr}"
# Test 2: Large buffer (200KB) succeeds with large message (70KB)
large_agent = os.path.join(tmpdir, "large_agent.py")
with open(large_agent, "w") as f:
f.write(f"""
import asyncio
from acp.core import run_agent
from acp.interfaces import Agent
class TestAgent(Agent):
async def list_capabilities(self):
return {{"capabilities": {{}}}}
asyncio.run(run_agent(TestAgent(), stdio_buffer_limit_bytes={LARGE_LINE_SIZE * 3}))
""")
# Same message, but with a buffer 3x the size - should handle it
result = subprocess.run( # noqa: S603
[sys.executable, large_agent], input=large_msg, capture_output=True, text=True, timeout=2
)
# With a large enough buffer, the agent should at least start successfully
# (it may have other errors from invalid JSON-RPC, but not buffer overrun)
if "LimitOverrunError" in result.stderr or "buffer" in result.stderr.lower():
pytest.fail(f"Large buffer still hit limit error: {result.stderr}")