diff --git a/sdk/python/tests/app_server_harness.py b/sdk/python/tests/app_server_harness.py new file mode 100644 index 000000000000..6fb0f608be87 --- /dev/null +++ b/sdk/python/tests/app_server_harness.py @@ -0,0 +1,449 @@ +from __future__ import annotations + +import json +import queue +import shutil +import threading +import time +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +from openai_codex import AppServerConfig + + +Json = dict[str, Any] + + +@dataclass(frozen=True) +class CapturedResponsesRequest: + """Recorded request sent by app-server to the mock Responses API.""" + + method: str + path: str + headers: dict[str, str] + body: bytes + + def body_json(self) -> Json: + """Decode the request body as JSON.""" + return json.loads(self.body.decode("utf-8")) + + def input(self) -> list[Json]: + """Return the Responses API input array from the request.""" + value = self.body_json().get("input") + if not isinstance(value, list): + raise AssertionError(f"expected input list, got {value!r}") + return value + + def message_input_texts(self, role: str) -> list[str]: + """Return all input_text strings for message inputs matching one role.""" + texts: list[str] = [] + for item in self.input(): + if item.get("type") != "message" or item.get("role") != role: + continue + content = item.get("content") + if isinstance(content, str): + texts.append(content) + continue + if not isinstance(content, list): + continue + for span in content: + if isinstance(span, dict) and span.get("type") == "input_text": + text = span.get("text") + if isinstance(text, str): + texts.append(text) + return texts + + def message_content_items(self, role: str) -> list[Json]: + """Return structured content items for message inputs matching one role.""" + items: list[Json] = [] + for item in self.input(): + if item.get("type") != "message" or item.get("role") != role: + continue + content = item.get("content") + if not isinstance(content, list): + continue + items.extend(part for part in content if isinstance(part, dict)) + return items + + def message_image_urls(self, role: str) -> list[str]: + """Return all input_image URLs for message inputs matching one role.""" + urls: list[str] = [] + for item in self.message_content_items(role): + if item.get("type") != "input_image": + continue + image_url = item.get("image_url") + if isinstance(image_url, str): + urls.append(image_url) + return urls + + def header(self, name: str) -> str | None: + """Return a captured request header by case-insensitive name.""" + return self.headers.get(name.lower()) + + +@dataclass(frozen=True) +class MockSseResponse: + """One queued SSE response served by the mock Responses API.""" + + body: str + delay_between_events_s: float = 0.0 + + def chunks(self) -> list[bytes]: + """Split an SSE body into event chunks while preserving framing.""" + chunks: list[bytes] = [] + for part in self.body.split("\n\n"): + if not part: + continue + chunks.append(f"{part}\n\n".encode("utf-8")) + return chunks + + +class MockResponsesServer: + """Local HTTP server that records `/v1/responses` requests and returns SSE.""" + + def __init__(self) -> None: + self._responses: queue.Queue[MockSseResponse] = queue.Queue() + self._requests: list[CapturedResponsesRequest] = [] + self._requests_lock = threading.Lock() + self._server = _ResponsesHttpServer(("127.0.0.1", 0), _ResponsesHandler, self) + self._thread = threading.Thread( + target=self._server.serve_forever, + name="mock-responses-api", + daemon=True, + ) + + def __enter__(self) -> MockResponsesServer: + self._thread.start() + return self + + def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None: + self.close() + + @property + def url(self) -> str: + """Return the base URL for app-server config.""" + host, port = self._server.server_address + return f"http://{host}:{port}" + + def close(self) -> None: + """Stop the background HTTP server thread.""" + self._server.shutdown() + self._server.server_close() + self._thread.join(timeout=2) + + def enqueue_sse( + self, + body: str, + *, + delay_between_events_s: float = 0.0, + ) -> None: + """Queue one SSE body for the next `/v1/responses` request.""" + self._responses.put( + MockSseResponse( + body=body, + delay_between_events_s=delay_between_events_s, + ) + ) + + def enqueue_assistant_message(self, text: str, *, response_id: str = "resp-1") -> None: + """Queue a completed assistant-message model response.""" + self.enqueue_sse( + sse( + [ + ev_response_created(response_id), + ev_assistant_message(f"msg-{response_id}", text), + ev_completed(response_id), + ] + ) + ) + + def requests(self) -> list[CapturedResponsesRequest]: + """Return all recorded Responses API requests.""" + with self._requests_lock: + return list(self._requests) + + def single_request(self) -> CapturedResponsesRequest: + """Return the only recorded request, failing if the count differs.""" + requests = self.requests() + if len(requests) != 1: + raise AssertionError(f"expected 1 request, got {len(requests)}") + return requests[0] + + def wait_for_requests( + self, + count: int, + *, + timeout_s: float = 5.0, + ) -> list[CapturedResponsesRequest]: + """Wait until at least `count` requests have been recorded.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + requests = self.requests() + if len(requests) >= count: + return requests + time.sleep(0.01) + requests = self.requests() + raise AssertionError(f"expected {count} requests, got {len(requests)}") + + def _record_request(self, handler: BaseHTTPRequestHandler, body: bytes) -> None: + """Record one inbound HTTP request from app-server.""" + headers = {key.lower(): value for key, value in handler.headers.items()} + request = CapturedResponsesRequest( + method=handler.command, + path=handler.path, + headers=headers, + body=body, + ) + with self._requests_lock: + self._requests.append(request) + + def _next_response(self) -> MockSseResponse: + """Return the next queued SSE response or fail the HTTP request.""" + return self._responses.get_nowait() + + +class AppServerHarness: + """Test fixture that points a pinned runtime app-server at MockResponsesServer.""" + + def __init__(self, tmp_path: Path) -> None: + self.tmp_path = tmp_path + self.codex_home = tmp_path / "codex-home" + self.workspace = tmp_path / "workspace" + self.responses = MockResponsesServer() + + def __enter__(self) -> AppServerHarness: + self.codex_home.mkdir() + self.workspace.mkdir() + self.responses.__enter__() + self._write_config() + return self + + def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None: + self.responses.__exit__(_exc_type, _exc, _tb) + shutil.rmtree(self.codex_home, ignore_errors=True) + shutil.rmtree(self.workspace, ignore_errors=True) + + def app_server_config(self) -> AppServerConfig: + """Build SDK config for an isolated pinned-runtime app-server process.""" + return AppServerConfig( + cwd=str(self.workspace), + env={ + "CODEX_HOME": str(self.codex_home), + "CODEX_APP_SERVER_DISABLE_MANAGED_CONFIG": "1", + "RUST_LOG": "warn", + }, + ) + + def _write_config(self) -> None: + """Write config.toml that routes model calls to the mock server.""" + config_toml = self.codex_home / "config.toml" + config_toml.write_text( + f""" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for Python SDK tests" +base_url = "{self.responses.url}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +""".lstrip() + ) + + +class _ResponsesHttpServer(ThreadingHTTPServer): + """ThreadingHTTPServer carrying a reference to the owning mock.""" + + def __init__( + self, + server_address: tuple[str, int], + handler_class: type[BaseHTTPRequestHandler], + mock: MockResponsesServer, + ) -> None: + super().__init__(server_address, handler_class) + self.mock = mock + + +class _ResponsesHandler(BaseHTTPRequestHandler): + """HTTP handler for the subset of the Responses API used by SDK tests.""" + + server: _ResponsesHttpServer + + def log_message(self, _format: str, *_args: object) -> None: + """Silence default stderr logging; pytest failures print captured requests.""" + return None + + def do_GET(self) -> None: + """Serve a minimal `/v1/models` response if app-server asks for models.""" + if self.path.endswith("/v1/models") or self.path.endswith("/models"): + self._send_json( + { + "object": "list", + "data": [ + { + "id": "mock-model", + "object": "model", + "created": 0, + "owned_by": "openai", + } + ], + } + ) + return + self.send_error(404, f"unexpected GET {self.path}") + + def do_POST(self) -> None: + """Serve queued SSE responses for `/v1/responses` requests.""" + length = int(self.headers.get("content-length", "0")) + body = self.rfile.read(length) + self.server.mock._record_request(self, body) + + if not (self.path.endswith("/v1/responses") or self.path.endswith("/responses")): + self.send_error(404, f"unexpected POST {self.path}") + return + + try: + response = self.server.mock._next_response() + except queue.Empty: + self.send_error(500, "no queued SSE response") + return + + self.send_response(200) + self.send_header("content-type", "text/event-stream") + self.end_headers() + for chunk in response.chunks(): + self.wfile.write(chunk) + self.wfile.flush() + if response.delay_between_events_s: + time.sleep(response.delay_between_events_s) + + def _send_json(self, payload: Json) -> None: + """Write one JSON response.""" + body = json.dumps(payload).encode("utf-8") + self.send_response(200) + self.send_header("content-type", "application/json") + self.send_header("content-length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +def sse(events: list[Json]) -> str: + """Build an SSE body from Responses API event JSON objects.""" + chunks: list[str] = [] + for event in events: + event_type = event["type"] + chunks.append(f"event: {event_type}\ndata: {json.dumps(event)}\n") + return "\n".join(chunks) + "\n" + + +def ev_response_created(response_id: str) -> Json: + """Return a minimal `response.created` event.""" + return {"type": "response.created", "response": {"id": response_id}} + + +def ev_completed(response_id: str) -> Json: + """Return a minimal `response.completed` event with usage.""" + return { + "type": "response.completed", + "response": { + "id": response_id, + "usage": { + "input_tokens": 1, + "input_tokens_details": None, + "output_tokens": 1, + "output_tokens_details": None, + "total_tokens": 2, + }, + }, + } + + +def ev_completed_with_usage( + response_id: str, + *, + input_tokens: int, + cached_input_tokens: int, + output_tokens: int, + reasoning_output_tokens: int, + total_tokens: int, +) -> Json: + """Return `response.completed` with explicit token accounting.""" + return { + "type": "response.completed", + "response": { + "id": response_id, + "usage": { + "input_tokens": input_tokens, + "input_tokens_details": {"cached_tokens": cached_input_tokens}, + "output_tokens": output_tokens, + "output_tokens_details": { + "reasoning_tokens": reasoning_output_tokens, + }, + "total_tokens": total_tokens, + }, + }, + } + + +def ev_assistant_message(item_id: str, text: str) -> Json: + """Return a completed assistant message output item.""" + return { + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": item_id, + "content": [{"type": "output_text", "text": text}], + }, + } + + +def ev_message_item_added(item_id: str, text: str = "") -> Json: + """Return an assistant message added event before streaming deltas.""" + return { + "type": "response.output_item.added", + "item": { + "type": "message", + "role": "assistant", + "id": item_id, + "content": [{"type": "output_text", "text": text}], + }, + } + + +def ev_output_text_delta(delta: str) -> Json: + """Return an output-text delta event.""" + return { + "type": "response.output_text.delta", + "delta": delta, + } + + +def ev_function_call(call_id: str, name: str, arguments: str) -> Json: + """Return a completed function-call output item.""" + return { + "type": "response.output_item.done", + "item": { + "type": "function_call", + "call_id": call_id, + "name": name, + "arguments": arguments, + }, + } + + +def ev_failed(response_id: str, message: str) -> Json: + """Return a failed model response event.""" + return { + "type": "response.failed", + "response": { + "id": response_id, + "error": {"code": "server_error", "message": message}, + }, + } diff --git a/sdk/python/tests/app_server_helpers.py b/sdk/python/tests/app_server_helpers.py new file mode 100644 index 000000000000..e11390d527d1 --- /dev/null +++ b/sdk/python/tests/app_server_helpers.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Iterable, Iterator +from typing import Any + +from app_server_harness import ( + ev_assistant_message, + ev_completed, + ev_message_item_added, + ev_output_text_delta, + ev_response_created, + sse, +) +from openai_codex.generated.v2_all import ( + AgentMessageDeltaNotification, + ItemCompletedNotification, + MessagePhase, +) +from openai_codex.models import Notification + +TINY_PNG_BYTES = bytes( + [ + 137, + 80, + 78, + 71, + 13, + 10, + 26, + 10, + 0, + 0, + 0, + 13, + 73, + 72, + 68, + 82, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 1, + 8, + 6, + 0, + 0, + 0, + 31, + 21, + 196, + 137, + 0, + 0, + 0, + 11, + 73, + 68, + 65, + 84, + 120, + 156, + 99, + 96, + 0, + 2, + 0, + 0, + 5, + 0, + 1, + 122, + 94, + 171, + 63, + 0, + 0, + 0, + 0, + 73, + 69, + 78, + 68, + 174, + 66, + 96, + 130, + ] +) + + +def response_approval_policy(response: Any) -> str: + """Return serialized approvalPolicy from a generated thread response.""" + return response.model_dump(by_alias=True, mode="json")["approvalPolicy"] + + +def agent_message_texts(events: list[Notification]) -> list[str]: + """Extract completed agent-message text from SDK notifications.""" + texts: list[str] = [] + for event in events: + if not isinstance(event.payload, ItemCompletedNotification): + continue + item = event.payload.item.root + if item.type == "agentMessage": + texts.append(item.text) + return texts + + +def agent_message_texts_from_items(items: Iterable[Any]) -> list[str]: + """Extract agent-message text from completed run result items.""" + texts: list[str] = [] + for item in items: + root = item.root + if root.type == "agentMessage": + texts.append(root.text) + return texts + + +def next_sync_delta(stream: Iterator[Notification]) -> str: + """Advance a sync turn stream until the next agent-message text delta.""" + for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +async def next_async_delta(stream: AsyncIterator[Notification]) -> str: + """Advance an async turn stream until the next agent-message text delta.""" + async for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +def streaming_response(response_id: str, item_id: str, parts: list[str]) -> str: + """Build an SSE stream with text deltas and a final assistant message.""" + return sse( + [ + ev_response_created(response_id), + ev_message_item_added(item_id), + *[ev_output_text_delta(part) for part in parts], + ev_assistant_message(item_id, "".join(parts)), + ev_completed(response_id), + ] + ) + + +def assistant_message_with_phase( + item_id: str, + text: str, + phase: MessagePhase, +) -> dict[str, Any]: + """Build an assistant message event carrying app-server phase metadata.""" + event = ev_assistant_message(item_id, text) + event["item"] = {**event["item"], "phase": phase.value} + return event + + +def request_kind(request_path: str) -> str: + """Classify captured mock-server request paths for compact assertions.""" + if request_path.endswith("/responses/compact"): + return "compact" + if request_path.endswith("/responses"): + return "responses" + return request_path diff --git a/sdk/python/tests/test_app_server_approvals.py b/sdk/python/tests/test_app_server_approvals.py new file mode 100644 index 000000000000..61e5703dd75a --- /dev/null +++ b/sdk/python/tests/test_app_server_approvals.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import asyncio + +from app_server_harness import AppServerHarness +from openai_codex import ApprovalMode, AsyncCodex, Codex +from openai_codex.generated.v2_all import AskForApprovalValue, ThreadResumeParams +from app_server_helpers import response_approval_policy + + +def test_thread_resume_inherits_deny_all_approval_mode(tmp_path) -> None: + """Resuming a thread should preserve its stored approval mode.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("source seeded", response_id="resume-mode") + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + resumed = codex.thread_resume(source.id) + resumed_state = codex._client.thread_resume( # noqa: SLF001 + resumed.id, + ThreadResumeParams(thread_id=resumed.id), + ) + + assert { + "final_response": result.final_response, + "resumed_policy": response_approval_policy(resumed_state), + } == { + "final_response": "source seeded", + "resumed_policy": AskForApprovalValue.never.value, + } + + +def test_thread_fork_inherits_deny_all_approval_mode(tmp_path) -> None: + """Forking without an override should preserve the source approval mode.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("source seeded", response_id="fork-mode") + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + forked = codex.thread_fork(source.id) + forked_state = codex._client.thread_resume( # noqa: SLF001 + forked.id, + ThreadResumeParams(thread_id=forked.id), + ) + + assert { + "final_response": result.final_response, + "forked_is_distinct": forked.id != source.id, + "forked_policy": response_approval_policy(forked_state), + } == { + "final_response": "source seeded", + "forked_is_distinct": True, + "forked_policy": AskForApprovalValue.never.value, + } + + +def test_thread_fork_can_override_approval_mode(tmp_path) -> None: + """Forking with an explicit approval mode should send an override.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "source seeded", + response_id="fork-override-mode", + ) + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + forked = codex.thread_fork( + source.id, + approval_mode=ApprovalMode.auto_review, + ) + forked_state = codex._client.thread_resume( # noqa: SLF001 + forked.id, + ThreadResumeParams(thread_id=forked.id), + ) + + assert { + "final_response": result.final_response, + "forked_policy": response_approval_policy(forked_state), + } == { + "final_response": "source seeded", + "forked_policy": AskForApprovalValue.on_request.value, + } + + +def test_turn_approval_mode_persists_until_next_turn(tmp_path) -> None: + """A turn-level approval override should apply to later omitted-arg turns.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1") + harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + first_result = thread.run( + "deny this and later turns", + approval_mode=ApprovalMode.deny_all, + ) + after_turn_override = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = thread.run("inherit previous approval mode") + after_omitted_turn = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_turn_override": response_approval_policy(after_turn_override), + "after_omitted_turn": response_approval_policy(after_omitted_turn), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_turn_override": AskForApprovalValue.never.value, + "after_omitted_turn": AskForApprovalValue.never.value, + "final_responses": ["turn override", "turn inherited"], + } + + +def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None: + """Omitted run approval mode should not rewrite the thread's stored setting.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("locked down", response_id="approval-1") + harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start(approval_mode=ApprovalMode.deny_all) + + first_result = thread.run("keep approvals denied") + after_default_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = thread.run( + "allow auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_default_policy": response_approval_policy(after_default_run), + "after_override_policy": response_approval_policy(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_default_policy": AskForApprovalValue.never.value, + "after_override_policy": AskForApprovalValue.on_request.value, + "final_responses": ["locked down", "reviewable"], + } + + +def test_async_thread_run_approval_mode_persists_until_explicit_override( + tmp_path, +) -> None: + """Async omitted run approval mode should leave stored settings alone.""" + + async def scenario() -> None: + """Use the async client to verify persisted app-server approval state.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "async locked down", + response_id="async-approval-1", + ) + harness.responses.enqueue_assistant_message( + "async reviewable", + response_id="async-approval-2", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all) + first_result = await thread.run("keep async approvals denied") + after_default_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = await thread.run( + "allow async auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_default_policy": response_approval_policy(after_default_run), + "after_override_policy": response_approval_policy(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_default_policy": AskForApprovalValue.never.value, + "after_override_policy": AskForApprovalValue.on_request.value, + "final_responses": ["async locked down", "async reviewable"], + } + + asyncio.run(scenario()) diff --git a/sdk/python/tests/test_app_server_inputs.py b/sdk/python/tests/test_app_server_inputs.py new file mode 100644 index 000000000000..5e560062e57b --- /dev/null +++ b/sdk/python/tests/test_app_server_inputs.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +from app_server_harness import AppServerHarness +from openai_codex import Codex, ImageInput, LocalImageInput, SkillInput, TextInput +from app_server_helpers import TINY_PNG_BYTES + + +def test_remote_image_input_reaches_responses_api( + tmp_path, +) -> None: + """Remote image inputs should survive the SDK and app-server boundary.""" + remote_image_url = "https://example.com/codex.png" + + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "remote image received", + response_id="remote-image", + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run( + [ + TextInput("Describe the remote image."), + ImageInput(remote_image_url), + ] + ) + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "contains_user_prompt": "Describe the remote image." + in request.message_input_texts("user"), + "image_urls": request.message_image_urls("user"), + } == { + "final_response": "remote image received", + "contains_user_prompt": True, + "image_urls": [remote_image_url], + } + + +def test_local_image_input_reaches_responses_api( + tmp_path, +) -> None: + """Local image inputs should become data URLs after crossing the app-server.""" + local_image = tmp_path / "local.png" + local_image.write_bytes(TINY_PNG_BYTES) + + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "local image received", + response_id="local-image", + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run( + [ + TextInput("Describe the local image."), + LocalImageInput(str(local_image)), + ] + ) + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "contains_user_prompt": "Describe the local image." + in request.message_input_texts("user"), + "image_url_is_png_data_url": request.message_image_urls("user")[-1].startswith( + "data:image/png;base64," + ), + } == { + "final_response": "local image received", + "contains_user_prompt": True, + "image_url_is_png_data_url": True, + } + + +def test_skill_input_injects_loaded_skill_body(tmp_path) -> None: + """SkillInput should inject the selected loaded skill into model input.""" + skill_body = "Use the word cobalt." + + with AppServerHarness(tmp_path) as harness: + skill_file = harness.workspace / ".agents" / "skills" / "demo" / "SKILL.md" + skill_file.parent.mkdir(parents=True) + skill_file.write_text( + f"---\nname: demo\ndescription: demo skill\n---\n\n{skill_body}\n" + ) + skill_path = skill_file.resolve() + harness.responses.enqueue_assistant_message( + "skill received", + response_id="skill-input", + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run( + [ + TextInput("Use the selected skill."), + SkillInput("demo", str(skill_path)), + ] + ) + request = harness.responses.single_request() + + skill_blocks = [ + text + for text in request.message_input_texts("user") + if text.startswith("") + ] + assert { + "final_response": result.final_response, + "skill_blocks": [ + { + "has_name": "demo" in text, + "has_path": f"{skill_path}" in text, + "has_body": skill_body in text, + } + for text in skill_blocks + ], + } == { + "final_response": "skill received", + "skill_blocks": [ + { + "has_name": True, + "has_path": True, + "has_body": True, + } + ], + } diff --git a/sdk/python/tests/test_app_server_lifecycle.py b/sdk/python/tests/test_app_server_lifecycle.py new file mode 100644 index 000000000000..644b97585a1e --- /dev/null +++ b/sdk/python/tests/test_app_server_lifecycle.py @@ -0,0 +1,217 @@ +from __future__ import annotations + +import asyncio + +from app_server_harness import AppServerHarness +from openai_codex import AsyncCodex, Codex +from app_server_helpers import request_kind + + +def _thread_message_summary(read_response) -> list[tuple[str, str]]: + """Return persisted user/agent messages from a thread read response.""" + messages: list[tuple[str, str]] = [] + for turn in read_response.thread.turns: + for item in turn.items: + root = item.root + if root.type == "userMessage": + text = "\n".join( + input_item.root.text + for input_item in root.content + if input_item.root.type == "text" + ) + messages.append(("user", text)) + if root.type == "agentMessage": + messages.append(("agent", root.text)) + return messages + + +def test_thread_set_name_and_read(tmp_path) -> None: + """Thread naming should round-trip through app-server JSON-RPC.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + thread.set_name("sdk integration thread") + named = thread.read(include_turns=True) + + assert {"thread_name": named.thread.name} == { + "thread_name": "sdk integration thread", + } + + +def test_thread_list_filters_archived_threads(tmp_path) -> None: + """Thread listing should reflect archive state through app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("active", response_id="list-active") + harness.responses.enqueue_assistant_message( + "archived", + response_id="list-archived", + ) + + with Codex(config=harness.app_server_config()) as codex: + active_thread = codex.thread_start() + archived_thread = codex.thread_start() + active_thread.run("keep this listed") + archived_thread.run("archive this") + codex.thread_archive(archived_thread.id) + active_list = codex.thread_list(archived=False) + archived_list = codex.thread_list(archived=True) + + expected_ids = {active_thread.id, archived_thread.id} + assert { + "active_ids": sorted( + thread.id for thread in active_list.data if thread.id in expected_ids + ), + "archived_ids": sorted( + thread.id for thread in archived_list.data if thread.id in expected_ids + ), + } == { + "active_ids": [active_thread.id], + "archived_ids": [archived_thread.id], + } + + +def test_read_include_turns_returns_persisted_history(tmp_path) -> None: + """Thread.read(include_turns=True) should load real persisted turn items.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("first answer", response_id="read-1") + harness.responses.enqueue_assistant_message("second answer", response_id="read-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + thread.run("first question") + thread.run("second question") + read = thread.read(include_turns=True) + + assert _thread_message_summary(read) == [ + ("user", "first question"), + ("agent", "first answer"), + ("user", "second question"), + ("agent", "second answer"), + ] + + +def test_async_lifecycle_methods_round_trip(tmp_path) -> None: + """Async lifecycle helpers should preserve the same app-server thread state.""" + + async def scenario() -> None: + """Exercise async wrappers over one materialized thread.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "async materialized", + response_id="async-lifecycle", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + run_result = await thread.run("materialize async thread") + await thread.set_name("async lifecycle") + named = await thread.read() + resumed = await codex.thread_resume(thread.id) + forked = await codex.thread_fork(thread.id) + archive_response = await codex.thread_archive(thread.id) + unarchived = await codex.thread_unarchive(thread.id) + + assert { + "run_final_response": run_result.final_response, + "named_thread": named.thread.name, + "resumed_id": resumed.id, + "forked_is_distinct": forked.id != thread.id, + "archive_response": archive_response.model_dump(by_alias=True, mode="json"), + "unarchived_id": unarchived.id, + } == { + "run_final_response": "async materialized", + "named_thread": "async lifecycle", + "resumed_id": thread.id, + "forked_is_distinct": True, + "archive_response": {}, + "unarchived_id": thread.id, + } + + asyncio.run(scenario()) + + +def test_thread_fork_returns_distinct_thread(tmp_path) -> None: + """Thread fork should return a distinct thread for a persisted rollout.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("materialized", response_id="fork-seed") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + seeded = thread.run("materialize this thread before fork") + forked = codex.thread_fork(thread.id) + + assert { + "seeded_response": seeded.final_response, + "forked_is_distinct": forked.id != thread.id, + } == { + "seeded_response": "materialized", + "forked_is_distinct": True, + } + + +def test_archive_unarchive_round_trip_uses_materialized_rollout(tmp_path) -> None: + """Archive helpers should work once the app-server has persisted a rollout.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("materialized", response_id="archive-seed") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + seeded = thread.run("materialize this thread before archive") + archived = codex.thread_archive(thread.id) + unarchived = codex.thread_unarchive(thread.id) + read = unarchived.read() + + assert { + "seeded_response": seeded.final_response, + "archive_response": archived.model_dump(by_alias=True, mode="json"), + "unarchived_id": unarchived.id, + "read_id": read.thread.id, + } == { + "seeded_response": "materialized", + "archive_response": {}, + "unarchived_id": thread.id, + "read_id": thread.id, + } + + +def test_models_rpc(tmp_path) -> None: + """Model listing should go through the pinned app-server method.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + models = codex.models(include_hidden=True) + + assert { + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + } == {"models_payload_has_data": True} + + +def test_compact_rpc_hits_mock_responses(tmp_path) -> None: + """Compaction should run through app-server and hit the mock Responses boundary.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("history", response_id="compact-history") + harness.responses.enqueue_assistant_message( + "compact summary", + response_id="compact-summary", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + run_result = thread.run("create history") + compact_response = thread.compact() + requests = harness.responses.wait_for_requests(2) + + assert { + "run_final_response": run_result.final_response, + "compact_response": compact_response.model_dump( + by_alias=True, + mode="json", + ), + "request_kinds": [request_kind(request.path) for request in requests], + } == { + "run_final_response": "history", + "compact_response": {}, + "request_kinds": ["responses", "responses"], + } diff --git a/sdk/python/tests/test_app_server_run.py b/sdk/python/tests/test_app_server_run.py new file mode 100644 index 000000000000..8a45edf5efe6 --- /dev/null +++ b/sdk/python/tests/test_app_server_run.py @@ -0,0 +1,369 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from app_server_harness import ( + AppServerHarness, + ev_assistant_message, + ev_completed, + ev_completed_with_usage, + ev_failed, + ev_response_created, + sse, +) +from openai_codex import AsyncCodex, Codex +from openai_codex.generated.v2_all import MessagePhase +from app_server_helpers import ( + agent_message_texts_from_items, + assistant_message_with_phase, +) + + +def test_sync_thread_run_uses_mock_responses( + tmp_path, +) -> None: + """Drive Thread.run through the pinned app-server and inspect the HTTP request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + result = thread.run("hello") + + request = harness.responses.single_request() + + body = request.body_json() + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + "has_usage": result.usage is not None, + "request_model": body["model"], + "request_stream": body["stream"], + "request_user_texts": request.message_input_texts("user")[-1:], + } == { + "final_response": "Hello from the mock.", + "agent_messages": ["Hello from the mock."], + "has_usage": True, + "request_model": "mock-model", + "request_stream": True, + "request_user_texts": ["hello"], + } + + +def test_run_params_and_usage_cross_app_server_boundary(tmp_path) -> None: + """Thread.run should pass overrides and collect app-server token usage.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("run-overrides"), + ev_assistant_message("msg-run-overrides", "overrides applied"), + ev_completed_with_usage( + "run-overrides", + input_tokens=11, + cached_input_tokens=3, + output_tokens=7, + reasoning_output_tokens=5, + total_tokens=18, + ), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + result = thread.run( + "use overrides", + model="mock-model-override", + ) + request = harness.responses.single_request() + + usage_payload = None + if result.usage is not None: + dumped_usage = result.usage.model_dump(by_alias=True, mode="json") + usage_payload = { + "last": dumped_usage["last"], + "total": dumped_usage["total"], + } + assert { + "final_response": result.final_response, + "request_model": request.body_json()["model"], + "usage": usage_payload, + } == { + "final_response": "overrides applied", + "request_model": "mock-model-override", + "usage": { + "last": { + "cachedInputTokens": 3, + "inputTokens": 11, + "outputTokens": 7, + "reasoningOutputTokens": 5, + "totalTokens": 18, + }, + "total": { + "cachedInputTokens": 3, + "inputTokens": 11, + "outputTokens": 7, + "reasoningOutputTokens": 5, + "totalTokens": 18, + }, + }, + } + + +def test_async_thread_run_uses_mock_responses( + tmp_path, +) -> None: + """Async Thread.run should exercise the same app-server boundary.""" + + async def scenario() -> None: + """Run the async client against a real app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "Hello async.", + response_id="async-run-1", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + result = await thread.run("async hello") + + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + "request_user_texts": request.message_input_texts("user")[-1:], + } == { + "final_response": "Hello async.", + "agent_messages": ["Hello async."], + "request_user_texts": ["async hello"], + } + + asyncio.run(scenario()) + + +def test_sync_run_result_uses_last_unknown_phase_message(tmp_path) -> None: + """RunResult should use the last unknown-phase agent message as final text.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-last"), + ev_assistant_message("msg-items-first", "First message"), + ev_assistant_message("msg-items-second", "Second message"), + ev_completed("items-last"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: last unknown phase wins") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "Second message", + "agent_messages": ["First message", "Second message"], + } + + +def test_sync_run_result_preserves_empty_last_message(tmp_path) -> None: + """RunResult should preserve an empty final agent message instead of skipping it.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-empty"), + ev_assistant_message("msg-items-nonempty", "First message"), + ev_assistant_message("msg-items-empty", ""), + ev_completed("items-empty"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: empty last message") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "", + "agent_messages": ["First message", ""], + } + + +def test_sync_run_result_does_not_promote_commentary_only_to_final(tmp_path) -> None: + """RunResult final_response should stay unset when app-server marks only commentary.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-commentary"), + assistant_message_with_phase( + "msg-items-commentary", + "Commentary", + MessagePhase.commentary, + ), + ev_completed("items-commentary"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: commentary only") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": None, + "agent_messages": ["Commentary"], + } + + +def test_async_run_result_uses_last_unknown_phase_message(tmp_path) -> None: + """Async RunResult should use the last unknown-phase agent message.""" + + async def scenario() -> None: + """Run one async result-mapping case against a pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("async-items-last"), + ev_assistant_message( + "msg-async-items-first", + "First async message", + ), + ev_assistant_message( + "msg-async-items-second", + "Second async message", + ), + ev_completed("async-items-last"), + ] + ) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + result = await (await codex.thread_start()).run( + "case: async last unknown phase" + ) + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "Second async message", + "agent_messages": ["First async message", "Second async message"], + } + + asyncio.run(scenario()) + + +def test_async_run_result_does_not_promote_commentary_only_to_final( + tmp_path, +) -> None: + """Async RunResult final_response should stay unset for commentary-only output.""" + + async def scenario() -> None: + """Run one async commentary mapping case against a pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("async-items-commentary"), + assistant_message_with_phase( + "msg-async-items-commentary", + "Async commentary", + MessagePhase.commentary, + ), + ev_completed("async-items-commentary"), + ] + ) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + result = await (await codex.thread_start()).run( + "case: async commentary only" + ) + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": None, + "agent_messages": ["Async commentary"], + } + + asyncio.run(scenario()) + + +def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None: + """Thread.run should surface the failed turn error emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("failed-run"), + ev_failed("failed-run", "boom from mock model"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + with pytest.raises(RuntimeError, match="boom from mock model"): + thread.run("trigger failure") + + +def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None: + """RunResult should use the final-answer item emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("phase-1"), + { + **ev_assistant_message("msg-commentary", "Commentary"), + "item": { + **ev_assistant_message("msg-commentary", "Commentary")["item"], + "phase": MessagePhase.commentary.value, + }, + }, + { + **ev_assistant_message("msg-final", "Final answer"), + "item": { + **ev_assistant_message("msg-final", "Final answer")["item"], + "phase": MessagePhase.final_answer.value, + }, + }, + ev_completed("phase-1"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("choose final answer") + + assert { + "final_response": result.final_response, + "items": [ + { + "text": item.root.text, + "phase": None if item.root.phase is None else item.root.phase.value, + } + for item in result.items + if item.root.type == "agentMessage" + ], + } == { + "final_response": "Final answer", + "items": [ + {"text": "Commentary", "phase": MessagePhase.commentary.value}, + {"text": "Final answer", "phase": MessagePhase.final_answer.value}, + ], + } diff --git a/sdk/python/tests/test_app_server_streaming.py b/sdk/python/tests/test_app_server_streaming.py new file mode 100644 index 000000000000..81e2b44d3033 --- /dev/null +++ b/sdk/python/tests/test_app_server_streaming.py @@ -0,0 +1,266 @@ +from __future__ import annotations + +import asyncio + +from app_server_harness import AppServerHarness +from openai_codex import AsyncCodex, Codex, TextInput +from openai_codex.generated.v2_all import ( + AgentMessageDeltaNotification, + TurnCompletedNotification, + TurnStatus, +) +from app_server_helpers import ( + agent_message_texts, + next_async_delta, + next_sync_delta, + streaming_response, +) + + +def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """A sync turn stream should expose deltas, completed items, and completion.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("stream-1", "msg-stream-1", ["hel", "lo"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + stream = thread.turn(TextInput("stream please")).stream() + events = list(stream) + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["hel", "lo"], + "agent_messages": ["hello"], + "completed_statuses": [TurnStatus.completed], + } + + +def test_turn_run_returns_completed_turn(tmp_path) -> None: + """TurnHandle.run should wait for the app-server completion notification.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("complete this turn")) + completed = turn.run() + + assert { + "turn_id": completed.id, + "status": completed.status, + "items": completed.items, + } == { + "turn_id": turn.id, + "status": TurnStatus.completed, + "items": [], + } + + +def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """An async turn stream should expose the same notification sequence.""" + + async def scenario() -> None: + """Stream one async turn against the real pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"]) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + turn = await thread.turn(TextInput("async stream please")) + events = [event async for event in turn.stream()] + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["as", "ync"], + "agent_messages": ["async"], + "completed_statuses": [TurnStatus.completed], + } + + asyncio.run(scenario()) + + +def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None: + """AppServerClient.stream_text should stream through a real app-server turn.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001 + + assert [chunk.delta for chunk in chunks] == ["fir", "st"] + + +def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None: + """Async stream_text should yield without blocking another app-server request.""" + + async def scenario() -> None: + """Leave a stream open while another async request completes.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response( + "low-async-stream", + "msg-low-async-stream", + ["one", "two", "three"], + ), + delay_between_events_s=0.03, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + stream = codex._client.stream_text( # noqa: SLF001 + thread.id, + "low-level async", + ) + first = await anext(stream) + models_task = asyncio.create_task(codex.models()) + models = await asyncio.wait_for(models_task, timeout=1.0) + remaining = [chunk.delta async for chunk in stream] + + assert { + "first": first.delta, + "remaining": remaining, + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + } == { + "first": "one", + "remaining": ["two", "three"], + "models_payload_has_data": True, + } + + asyncio.run(scenario()) + + +def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two sync streams on one client should consume only their own notifications.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("first-stream", "msg-first", ["one-", "done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + streaming_response("second-stream", "msg-second", ["two-", "done"]), + delay_between_events_s=0.01, + ) + + with Codex(config=harness.app_server_config()) as codex: + first_thread = codex.thread_start() + second_thread = codex.thread_start() + first_turn = first_thread.turn(TextInput("first")) + second_turn = second_thread.turn(TextInput("second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = next_sync_delta(first_stream) + second_first_delta = next_sync_delta(second_stream) + first_second_delta = next_sync_delta(first_stream) + second_second_delta = next_sync_delta(second_stream) + first_tail = list(first_stream) + second_tail = list(second_stream) + + assert { + "streams": sorted( + [ + ( + first_first_delta, + first_second_delta, + agent_message_texts(first_tail), + ), + ( + second_first_delta, + second_second_delta, + agent_message_texts(second_tail), + ), + ] + ), + } == { + "streams": [ + ("one-", "done", ["one-done"]), + ("two-", "done", ["two-done"]), + ], + } + + +def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two async streams on one client should consume only their own notifications.""" + + async def scenario() -> None: + """Interleave async stream consumers against one app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("async-first", "msg-async-first", ["a1", "-done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + streaming_response("async-second", "msg-async-second", ["a2", "-done"]), + delay_between_events_s=0.01, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + first_thread = await codex.thread_start() + second_thread = await codex.thread_start() + first_turn = await first_thread.turn(TextInput("async first")) + second_turn = await second_thread.turn(TextInput("async second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = await next_async_delta(first_stream) + second_first_delta = await next_async_delta(second_stream) + first_second_delta = await next_async_delta(first_stream) + second_second_delta = await next_async_delta(second_stream) + first_tail = [event async for event in first_stream] + second_tail = [event async for event in second_stream] + + assert { + "streams": sorted( + [ + ( + first_first_delta, + first_second_delta, + agent_message_texts(first_tail), + ), + ( + second_first_delta, + second_second_delta, + agent_message_texts(second_tail), + ), + ] + ), + } == { + "streams": [ + ("a1", "-done", ["a1-done"]), + ("a2", "-done", ["a2-done"]), + ], + } + + asyncio.run(scenario()) diff --git a/sdk/python/tests/test_app_server_turn_controls.py b/sdk/python/tests/test_app_server_turn_controls.py new file mode 100644 index 000000000000..40ada74ea35d --- /dev/null +++ b/sdk/python/tests/test_app_server_turn_controls.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from app_server_harness import AppServerHarness +from openai_codex import Codex, TextInput +from openai_codex.generated.v2_all import TurnStatus +from app_server_helpers import agent_message_texts, streaming_response + + +def test_turn_steer_adds_follow_up_input(tmp_path) -> None: + """Steering an active turn should create a follow-up Responses request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("steer-first", "msg-steer-first", ["before steer"]), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after steer", + response_id="steer-second", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("Start a steerable turn.")) + harness.responses.wait_for_requests(1) + steer = turn.steer(TextInput("Use this steering input.")) + events = list(turn.stream()) + requests = harness.responses.wait_for_requests(2) + + assert { + "steered_turn_id": steer.turn_id, + "turn_id": turn.id, + "agent_messages": agent_message_texts(events), + "last_user_texts": [ + request.message_input_texts("user")[-1] for request in requests + ], + } == { + "steered_turn_id": turn.id, + "turn_id": turn.id, + "agent_messages": ["before steer", "after steer"], + "last_user_texts": [ + "Start a steerable turn.", + "Use this steering input.", + ], + } + + +def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None: + """Interrupting an active turn should complete it and leave the thread usable.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response( + "interrupt-first", + "msg-interrupt-first", + ["still ", "running"], + ), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after interrupt", + response_id="interrupt-follow-up", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + interrupted_turn = thread.turn(TextInput("Start a long turn.")) + harness.responses.wait_for_requests(1) + interrupt_response = interrupted_turn.interrupt() + completed = interrupted_turn.run() + follow_up = thread.run("Continue after the interrupt.") + + assert { + "interrupt_response": interrupt_response.model_dump( + by_alias=True, + mode="json", + ), + "interrupted_status": completed.status, + "follow_up": follow_up.final_response, + } == { + "interrupt_response": {}, + "interrupted_status": TurnStatus.interrupted, + "follow_up": "after interrupt", + } diff --git a/sdk/python/tests/test_async_client_behavior.py b/sdk/python/tests/test_async_client_behavior.py index 97e2a1d77906..4e48fbbfeeb6 100644 --- a/sdk/python/tests/test_async_client_behavior.py +++ b/sdk/python/tests/test_async_client_behavior.py @@ -2,11 +2,9 @@ import asyncio import time -from types import SimpleNamespace from openai_codex.async_client import AsyncAppServerClient from openai_codex.generated.v2_all import ( - AgentMessageDeltaNotification, TurnCompletedNotification, ) from openai_codex.models import Notification, UnknownNotification @@ -36,46 +34,6 @@ def fake_model_list(include_hidden: bool = False) -> bool: assert asyncio.run(scenario()) == 2 -def test_async_stream_text_is_incremental_without_blocking_parallel_calls() -> None: - """Async text streaming should yield incrementally without blocking other calls.""" - async def scenario() -> tuple[str, list[str], bool]: - """Start a stream, then prove another async client call can finish.""" - client = AsyncAppServerClient() - - def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def] - """Yield one item before sleeping so the async wrapper can interleave.""" - yield "first" - time.sleep(0.03) - yield "second" - yield "third" - - def fake_model_list(include_hidden: bool = False) -> str: - """Return immediately to prove the event loop was not monopolized.""" - return "done" - - client._sync.stream_text = fake_stream_text # type: ignore[method-assign] - client._sync.model_list = fake_model_list # type: ignore[method-assign] - - stream = client.stream_text("thread-1", "hello") - first = await anext(stream) - - competing_call = asyncio.create_task(client.model_list()) - await asyncio.sleep(0.01) - competing_call_done_before_stream_done = competing_call.done() - - remaining: list[str] = [] - async for item in stream: - remaining.append(item) - - await competing_call - return first, remaining, competing_call_done_before_stream_done - - first, remaining, was_unblocked = asyncio.run(scenario()) - assert first == "first" - assert remaining == ["second", "third"] - assert was_unblocked - - def test_async_client_turn_notification_methods_delegate_to_sync_client() -> None: """Async turn routing methods should preserve sync-client registration semantics.""" async def scenario() -> tuple[list[tuple[str, str]], Notification, str]: @@ -142,84 +100,3 @@ def fake_wait(turn_id: str) -> TurnCompletedNotification: ), "turn-1", ) - - -def test_async_stream_text_uses_sync_turn_routing() -> None: - """Async text streaming should consume the same per-turn routing path as sync.""" - async def scenario() -> tuple[list[tuple[str, str]], list[str]]: - """Record routing calls while streaming two deltas and one completion.""" - client = AsyncAppServerClient() - notifications = [ - Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": "first", - "itemId": "item-1", - "threadId": "thread-1", - "turnId": "turn-1", - } - ), - ), - Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": "second", - "itemId": "item-2", - "threadId": "thread-1", - "turnId": "turn-1", - } - ), - ), - Notification( - method="turn/completed", - payload=TurnCompletedNotification.model_validate( - { - "threadId": "thread-1", - "turn": {"id": "turn-1", "items": [], "status": "completed"}, - } - ), - ), - ] - calls: list[tuple[str, str]] = [] - - def fake_turn_start(thread_id: str, text: str, *, params=None): # type: ignore[no-untyped-def] - """Return a started turn id while recording the request thread.""" - calls.append(("turn_start", thread_id)) - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - def fake_register(turn_id: str) -> None: - """Record stream registration for the started turn.""" - calls.append(("register", turn_id)) - - def fake_next(turn_id: str) -> Notification: - """Return the next queued turn notification.""" - calls.append(("next", turn_id)) - return notifications.pop(0) - - def fake_unregister(turn_id: str) -> None: - """Record stream cleanup for the started turn.""" - calls.append(("unregister", turn_id)) - - client._sync.turn_start = fake_turn_start # type: ignore[method-assign] - client._sync.register_turn_notifications = fake_register # type: ignore[method-assign] - client._sync.next_turn_notification = fake_next # type: ignore[method-assign] - client._sync.unregister_turn_notifications = fake_unregister # type: ignore[method-assign] - - chunks = [chunk async for chunk in client.stream_text("thread-1", "hello")] - return calls, [chunk.delta for chunk in chunks] - - calls, deltas = asyncio.run(scenario()) - - assert (calls, deltas) == ( - [ - ("turn_start", "thread-1"), - ("register", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("unregister", "turn-1"), - ], - ["first", "second"], - ) diff --git a/sdk/python/tests/test_client_rpc_methods.py b/sdk/python/tests/test_client_rpc_methods.py index 95f9f606cebf..f2c5020d55a4 100644 --- a/sdk/python/tests/test_client_rpc_methods.py +++ b/sdk/python/tests/test_client_rpc_methods.py @@ -1,7 +1,6 @@ from __future__ import annotations from pathlib import Path -from typing import Any from openai_codex.client import AppServerClient, _params_dict from openai_codex.generated.notification_registry import notification_turn_id @@ -19,23 +18,6 @@ ROOT = Path(__file__).resolve().parents[1] -def test_thread_set_name_and_compact_use_current_rpc_methods() -> None: - client = AppServerClient() - calls: list[tuple[str, dict[str, Any] | None]] = [] - - def fake_request(method: str, params, *, response_model): # type: ignore[no-untyped-def] - calls.append((method, params)) - return response_model.model_validate({}) - - client.request = fake_request # type: ignore[method-assign] - - client.thread_set_name("thread-1", "sdk-name") - client.thread_compact("thread-1") - - assert calls[0][0] == "thread/name/set" - assert calls[1][0] == "thread/compact/start" - - def test_generated_params_models_are_snake_case_and_dump_by_alias() -> None: params = ThreadListParams(search_term="needle", limit=5) diff --git a/sdk/python/tests/test_public_api_runtime_behavior.py b/sdk/python/tests/test_public_api_runtime_behavior.py index 1b0350145411..fb52da59b3d0 100644 --- a/sdk/python/tests/test_public_api_runtime_behavior.py +++ b/sdk/python/tests/test_public_api_runtime_behavior.py @@ -1,34 +1,18 @@ from __future__ import annotations import asyncio -from collections import deque from pathlib import Path -from types import SimpleNamespace from typing import Any import pytest import openai_codex.api as public_api_module -from openai_codex.client import AppServerClient -from openai_codex.generated.v2_all import ( - AgentMessageDeltaNotification, - ItemCompletedNotification, - MessagePhase, - ThreadTokenUsageUpdatedNotification, - TurnCompletedNotification, - TurnStartParams, - TurnStatus, -) -from openai_codex.models import InitializeResponse, Notification +from openai_codex.generated.v2_all import TurnStartParams +from openai_codex.models import InitializeResponse from openai_codex.api import ( ApprovalMode, AsyncCodex, - AsyncThread, - AsyncTurnHandle, Codex, - RunResult, - Thread, - TurnHandle, ) ROOT = Path(__file__).resolve().parents[1] @@ -50,111 +34,6 @@ def _approval_settings(params: list[Any]) -> list[dict[str, object]]: ] -def _delta_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - text: str = "delta-text", -) -> Notification: - return Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": text, - "itemId": "item-1", - "threadId": thread_id, - "turnId": turn_id, - } - ), - ) - - -def _completed_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - status: str = "completed", - error_message: str | None = None, -) -> Notification: - turn: dict[str, object] = { - "id": turn_id, - "items": [], - "status": status, - } - if error_message is not None: - turn["error"] = {"message": error_message} - return Notification( - method="turn/completed", - payload=TurnCompletedNotification.model_validate( - { - "threadId": thread_id, - "turn": turn, - } - ), - ) - - -def _item_completed_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - text: str = "final text", - phase: MessagePhase | None = None, -) -> Notification: - """Build a realistic completed-item notification accepted by generated models.""" - item: dict[str, object] = { - "id": "item-1", - "text": text, - "type": "agentMessage", - } - if phase is not None: - item["phase"] = phase.value - return Notification( - method="item/completed", - payload=ItemCompletedNotification.model_validate( - { - # The pinned runtime schema requires completion timestamps. - "completedAtMs": 1, - "item": item, - "threadId": thread_id, - "turnId": turn_id, - } - ), - ) - - -def _token_usage_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", -) -> Notification: - return Notification( - method="thread/tokenUsage/updated", - payload=ThreadTokenUsageUpdatedNotification.model_validate( - { - "threadId": thread_id, - "turnId": turn_id, - "tokenUsage": { - "last": { - "cachedInputTokens": 1, - "inputTokens": 2, - "outputTokens": 3, - "reasoningOutputTokens": 4, - "totalTokens": 9, - }, - "total": { - "cachedInputTokens": 5, - "inputTokens": 6, - "outputTokens": 7, - "reasoningOutputTokens": 8, - "totalTokens": 26, - }, - }, - } - ), - ) - - def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None: closed: list[bool] = [] @@ -261,64 +140,6 @@ def _approval_mode_turn_params(approval_mode: ApprovalMode) -> TurnStartParams: ) -class CapturingApprovalClient: - """Collect wrapper params at the app-server client boundary.""" - - def __init__(self) -> None: - self.params: list[Any] = [] - - def thread_start(self, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id="thread-1")) - - def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=thread_id)) - - def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork")) - - def turn_start( - self, - thread_id: str, - input: object, # noqa: A002 - *, - params: Any, - ) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn")) - - -class CapturingAsyncApprovalClient: - """Async mirror of CapturingApprovalClient for public async wrappers.""" - - def __init__(self) -> None: - self.params: list[Any] = [] - - async def thread_start(self, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id="thread-1")) - - async def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=thread_id)) - - async def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork")) - - async def turn_start( - self, - thread_id: str, - input: object, # noqa: A002 - *, - params: Any, - ) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn")) - - def test_approval_modes_serialize_to_expected_start_params() -> None: """ApprovalMode should map to the app-server params sent for new work.""" assert { @@ -339,511 +160,6 @@ def test_unknown_approval_mode_is_rejected() -> None: public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type] -def test_approval_defaults_preserve_existing_sync_thread_settings() -> None: - """Only thread creation should write approval defaults unless callers override.""" - client = CapturingApprovalClient() - codex = Codex.__new__(Codex) - codex._client = client - - started = codex.thread_start(approval_mode=ApprovalMode.deny_all) - started.turn([]) - codex.thread_resume("existing-thread") - codex.thread_fork("existing-thread") - started.turn([], approval_mode=ApprovalMode.auto_review) - - assert _approval_settings(client.params) == [ - {"approvalPolicy": "never"}, - {}, - {}, - {}, - { - "approvalPolicy": "on-request", - "approvalsReviewer": "auto_review", - }, - ] - - -def test_approval_defaults_preserve_existing_async_thread_settings() -> None: - """Async wrappers should follow the same approval override semantics.""" - - async def scenario() -> None: - client = CapturingAsyncApprovalClient() - codex = AsyncCodex() - codex._client = client # type: ignore[assignment] - codex._initialized = True - - started = await codex.thread_start(approval_mode=ApprovalMode.deny_all) - await started.turn([]) - await codex.thread_resume("existing-thread") - await codex.thread_fork("existing-thread") - await started.turn([], approval_mode=ApprovalMode.auto_review) - - assert _approval_settings(client.params) == [ - {"approvalPolicy": "never"}, - {}, - {}, - {}, - { - "approvalPolicy": "on-request", - "approvalsReviewer": "auto_review", - }, - ] - - asyncio.run(scenario()) - - -def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None: - """Two sync TurnHandle streams should advance independently on one client.""" - client = AppServerClient() - notifications: dict[str, deque[Notification]] = { - "turn-1": deque( - [ - _delta_notification(turn_id="turn-1", text="one"), - _completed_notification(turn_id="turn-1"), - ] - ), - "turn-2": deque( - [ - _delta_notification(turn_id="turn-2", text="two"), - _completed_notification(turn_id="turn-2"), - ] - ), - } - client.next_turn_notification = lambda turn_id: notifications[turn_id].popleft() # type: ignore[method-assign] - - first_stream = TurnHandle(client, "thread-1", "turn-1").stream() - assert next(first_stream).method == "item/agentMessage/delta" - - second_stream = TurnHandle(client, "thread-1", "turn-2").stream() - assert next(second_stream).method == "item/agentMessage/delta" - assert next(first_stream).method == "turn/completed" - assert next(second_stream).method == "turn/completed" - - first_stream.close() - second_stream.close() - - -def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None: - """Two async TurnHandle streams should advance independently on one client.""" - - async def scenario() -> None: - """Interleave two async streams backed by separate per-turn queues.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this stream test.""" - return None - - notifications: dict[str, deque[Notification]] = { - "turn-1": deque( - [ - _delta_notification(turn_id="turn-1", text="one"), - _completed_notification(turn_id="turn-1"), - ] - ), - "turn-2": deque( - [ - _delta_notification(turn_id="turn-2", text="two"), - _completed_notification(turn_id="turn-2"), - ] - ), - } - - async def fake_next_notification(turn_id: str) -> Notification: - """Return the next notification from the requested per-turn queue.""" - return notifications[turn_id].popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream() - assert (await anext(first_stream)).method == "item/agentMessage/delta" - - second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream() - assert (await anext(second_stream)).method == "item/agentMessage/delta" - assert (await anext(first_stream)).method == "turn/completed" - assert (await anext(second_stream)).method == "turn/completed" - - await first_stream.aclose() - await second_stream.aclose() - - asyncio.run(scenario()) - - -def test_turn_run_returns_completed_turn_payload() -> None: - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - - result = TurnHandle(client, "thread-1", "turn-1").run() - - assert result.id == "turn-1" - assert result.status == TurnStatus.completed - assert result.items == [] - - -def test_thread_run_accepts_string_input_and_returns_run_result() -> None: - """Sync Thread.run should preserve approval settings unless explicitly overridden.""" - client = AppServerClient() - item_notification = _item_completed_notification(text="Hello.") - usage_notification = _token_usage_notification() - notifications: deque[Notification] = deque( - [ - item_notification, - usage_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - seen: dict[str, object] = {} - - def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 - seen["thread_id"] = thread_id - seen["wire_input"] = wire_input - seen["params"] = params - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - client.turn_start = fake_turn_start # type: ignore[method-assign] - - result = Thread(client, "thread-1").run("hello") - - assert ( - seen["thread_id"], - seen["wire_input"], - _approval_settings([seen["params"]]), - result, - ) == ( - "thread-1", - [{"type": "text", "text": "hello"}], - [{}], - RunResult( - final_response="Hello.", - items=[item_notification.payload.item], - usage=usage_notification.payload.token_usage, - ), - ) - - -def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None: - client = AppServerClient() - first_item_notification = _item_completed_notification(text="First message") - second_item_notification = _item_completed_notification(text="Second message") - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "Second message" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - -def test_thread_run_preserves_empty_last_assistant_message() -> None: - client = AppServerClient() - first_item_notification = _item_completed_notification(text="First message") - second_item_notification = _item_completed_notification(text="") - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - -def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None: - client = AppServerClient() - final_answer_notification = _item_completed_notification( - text="Final answer", - phase=MessagePhase.final_answer, - ) - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - final_answer_notification, - commentary_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "Final answer" - assert result.items == [ - final_answer_notification.payload.item, - commentary_notification.payload.item, - ] - - -def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None: - client = AppServerClient() - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - commentary_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response is None - assert result.items == [commentary_notification.payload.item] - - -def test_thread_run_raises_on_failed_turn() -> None: - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _completed_notification(status="failed", error_message="boom"), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - with pytest.raises(RuntimeError, match="boom"): - Thread(client, "thread-1").run("hello") - - -def test_stream_text_registers_and_consumes_turn_notifications() -> None: - """stream_text should register, consume, and unregister one turn queue.""" - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _delta_notification(text="first"), - _delta_notification(text="second"), - _completed_notification(), - ] - ) - calls: list[tuple[str, str]] = [] - client.turn_start = lambda thread_id, input_items, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - def fake_register(turn_id: str) -> None: - """Record registration for the turn created by stream_text.""" - calls.append(("register", turn_id)) - - def fake_next(turn_id: str) -> Notification: - """Return the next queued notification for stream_text.""" - calls.append(("next", turn_id)) - return notifications.popleft() - - def fake_unregister(turn_id: str) -> None: - """Record cleanup for the turn created by stream_text.""" - calls.append(("unregister", turn_id)) - - client.register_turn_notifications = fake_register # type: ignore[method-assign] - client.next_turn_notification = fake_next # type: ignore[method-assign] - client.unregister_turn_notifications = fake_unregister # type: ignore[method-assign] - - chunks = list(client.stream_text("thread-1", "hello")) - - assert ([chunk.delta for chunk in chunks], calls) == ( - ["first", "second"], - [ - ("register", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("unregister", "turn-1"), - ], - ) - - -def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: - """Async Thread.run should preserve approvals while collecting routed results.""" - - async def scenario() -> None: - """Feed item, usage, and completion events through the async turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - item_notification = _item_completed_notification(text="Hello async.") - usage_notification = _token_usage_notification() - notifications: deque[Notification] = deque( - [ - item_notification, - usage_notification, - _completed_notification(), - ] - ) - seen: dict[str, object] = {} - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 - """Capture normalized input and return a synthetic turn id.""" - seen["thread_id"] = thread_id - seen["wire_input"] = wire_input - seen["params"] = params - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued notification for the synthetic turn.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert ( - seen["thread_id"], - seen["wire_input"], - _approval_settings([seen["params"]]), - result, - ) == ( - "thread-1", - [{"type": "text", "text": "hello"}], - [{}], - RunResult( - final_response="Hello async.", - items=[item_notification.payload.item], - usage=usage_notification.payload.token_usage, - ), - ) - - asyncio.run(scenario()) - - -def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> ( - None -): - """Async run should use the last final assistant message as the response text.""" - - async def scenario() -> None: - """Feed two completed agent messages through the async per-turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - first_item_notification = _item_completed_notification( - text="First async message" - ) - second_item_notification = _item_completed_notification( - text="Second async message" - ) - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 - """Return a synthetic turn id after AsyncThread.run builds input.""" - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued notification for that synthetic turn.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert result.final_response == "Second async message" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - asyncio.run(scenario()) - - -def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None: - """Async Thread.run should ignore commentary-only messages for final text.""" - - async def scenario() -> None: - """Feed a commentary item and completion through the async turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - commentary_notification, - _completed_notification(), - ] - ) - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 - """Return a synthetic turn id for commentary-only output.""" - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued commentary/completion notification.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert result.final_response is None - assert result.items == [commentary_notification.payload.item] - - asyncio.run(scenario()) - - def test_retry_examples_compare_status_with_enum() -> None: for path in ( ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",