From b77dd8cfd6290d45462a1fec9db78952d12f5c32 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Mon, 2 Mar 2026 12:53:18 +0100 Subject: [PATCH 1/6] feat(responses): add cancel() support for streaming responses Adds response_id property and cancel() method to ResponseStream and AsyncResponseStream, allowing users to cancel a response mid-stream. Uses a callback pattern to avoid coupling stream classes to the Responses resource. Refs: #2643 --- .../lib/streaming/responses/_responses.py | 64 ++++- src/openai/resources/responses/responses.py | 5 +- .../responses/test_response_stream_cancel.py | 223 ++++++++++++++++++ 3 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 tests/lib/responses/test_response_stream_cancel.py diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 6975a9260d..f6984fb74d 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -17,7 +17,7 @@ from ...._utils import is_given, consume_sync_iterator, consume_async_iterator from ...._models import build, construct_type_unchecked from ...._streaming import Stream, AsyncStream -from ....types.responses import ParsedResponse, ResponseStreamEvent as RawResponseStreamEvent +from ....types.responses import Response, ParsedResponse, ResponseStreamEvent as RawResponseStreamEvent from ..._parsing._responses import TextFormatT, parse_text, parse_response from ....types.responses.tool_param import ToolParam from ....types.responses.parsed_response import ( @@ -35,12 +35,14 @@ def __init__( text_format: type[TextFormatT] | Omit, input_tools: Iterable[ToolParam] | Omit, starting_after: int | None, + cancel_response: Callable[[str], Response] | None = None, ) -> None: self._raw_stream = raw_stream self._response = raw_stream.response self._iterator = self.__stream__() self._state = ResponseStreamState(text_format=text_format, input_tools=input_tools) self._starting_after = starting_after + self._cancel_response = cancel_response def __next__(self) -> ResponseStreamEvent[TextFormatT]: return self._iterator.__next__() @@ -91,6 +93,30 @@ def until_done(self) -> Self: consume_sync_iterator(self) return self + @property + def response_id(self) -> str | None: + """The ID of the response being streamed, available after the first event.""" + snapshot = self._state.current_snapshot + if snapshot is not None: + return snapshot.id + return None + + def cancel(self) -> Response: + """Cancel the response being streamed. + + Calls the API to cancel the response and closes the stream. + + Returns the cancelled Response object. + """ + response_id = self.response_id + if response_id is None: + raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") + if self._cancel_response is None: + raise ValueError("Cancel not available for this stream.") + result = self._cancel_response(response_id) + self.close() + return result + class ResponseStreamManager(Generic[TextFormatT]): def __init__( @@ -100,12 +126,14 @@ def __init__( text_format: type[TextFormatT] | Omit, input_tools: Iterable[ToolParam] | Omit, starting_after: int | None, + cancel_response: Callable[[str], Response] | None = None, ) -> None: self.__stream: ResponseStream[TextFormatT] | None = None self.__api_request = api_request self.__text_format = text_format self.__input_tools = input_tools self.__starting_after = starting_after + self.__cancel_response = cancel_response def __enter__(self) -> ResponseStream[TextFormatT]: raw_stream = self.__api_request() @@ -115,6 +143,7 @@ def __enter__(self) -> ResponseStream[TextFormatT]: text_format=self.__text_format, input_tools=self.__input_tools, starting_after=self.__starting_after, + cancel_response=self.__cancel_response, ) return self.__stream @@ -137,12 +166,14 @@ def __init__( text_format: type[TextFormatT] | Omit, input_tools: Iterable[ToolParam] | Omit, starting_after: int | None, + cancel_response: Callable[[str], Awaitable[Response]] | None = None, ) -> None: self._raw_stream = raw_stream self._response = raw_stream.response self._iterator = self.__stream__() self._state = ResponseStreamState(text_format=text_format, input_tools=input_tools) self._starting_after = starting_after + self._cancel_response = cancel_response async def __anext__(self) -> ResponseStreamEvent[TextFormatT]: return await self._iterator.__anext__() @@ -193,6 +224,30 @@ async def until_done(self) -> Self: await consume_async_iterator(self) return self + @property + def response_id(self) -> str | None: + """The ID of the response being streamed, available after the first event.""" + snapshot = self._state.current_snapshot + if snapshot is not None: + return snapshot.id + return None + + async def cancel(self) -> Response: + """Cancel the response being streamed. + + Calls the API to cancel the response and closes the stream. + + Returns the cancelled Response object. + """ + response_id = self.response_id + if response_id is None: + raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") + if self._cancel_response is None: + raise ValueError("Cancel not available for this stream.") + result = await self._cancel_response(response_id) + await self.close() + return result + class AsyncResponseStreamManager(Generic[TextFormatT]): def __init__( @@ -202,12 +257,14 @@ def __init__( text_format: type[TextFormatT] | Omit, input_tools: Iterable[ToolParam] | Omit, starting_after: int | None, + cancel_response: Callable[[str], Awaitable[Response]] | None = None, ) -> None: self.__stream: AsyncResponseStream[TextFormatT] | None = None self.__api_request = api_request self.__text_format = text_format self.__input_tools = input_tools self.__starting_after = starting_after + self.__cancel_response = cancel_response async def __aenter__(self) -> AsyncResponseStream[TextFormatT]: raw_stream = await self.__api_request @@ -217,6 +274,7 @@ async def __aenter__(self) -> AsyncResponseStream[TextFormatT]: text_format=self.__text_format, input_tools=self.__input_tools, starting_after=self.__starting_after, + cancel_response=self.__cancel_response, ) return self.__stream @@ -244,6 +302,10 @@ def __init__( self._text_format = text_format self._rich_text_format: type | Omit = text_format if inspect.isclass(text_format) else omit + @property + def current_snapshot(self) -> ParsedResponseSnapshot | None: + return self.__current_snapshot + def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEvent[TextFormatT]]: self.__current_snapshot = snapshot = self.accumulate_event(event) diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index c85a94495d..ea73e63917 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -1129,7 +1129,7 @@ def stream( timeout=timeout, ) - return ResponseStreamManager(api_request, text_format=text_format, input_tools=tools, starting_after=None) + return ResponseStreamManager(api_request, text_format=text_format, input_tools=tools, starting_after=None, cancel_response=self.cancel) else: if not is_given(response_id): raise ValueError("id must be provided when streaming an existing response") @@ -1148,6 +1148,7 @@ def stream( text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, + cancel_response=self.cancel, ) def parse( @@ -2794,6 +2795,7 @@ def stream( text_format=text_format, input_tools=tools, starting_after=None, + cancel_response=self.cancel, ) else: if isinstance(response_id, Omit): @@ -2813,6 +2815,7 @@ def stream( text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, + cancel_response=self.cancel, ) async def parse( diff --git a/tests/lib/responses/test_response_stream_cancel.py b/tests/lib/responses/test_response_stream_cancel.py new file mode 100644 index 0000000000..907c1d1105 --- /dev/null +++ b/tests/lib/responses/test_response_stream_cancel.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from openai._types import Omit +from openai._streaming import Stream, AsyncStream +from openai.types.responses import Response +from openai.lib.streaming.responses._responses import ( + ResponseStream, + AsyncResponseStream, + ResponseStreamState, +) + + +def _make_state_with_snapshot(response_id: str = "resp_123") -> ResponseStreamState[object]: + """Create a ResponseStreamState that has a snapshot with a given id.""" + from openai.types.responses.response import Response as RawResponse + from openai.types.responses.response_created_event import ResponseCreatedEvent + + state = ResponseStreamState(text_format=Omit(), input_tools=Omit()) + + raw_response = RawResponse.construct( + id=response_id, + object="response", + created_at=0, + status="in_progress", + output=[], + model="gpt-4o", + parallel_tool_calls=True, + tool_choice="auto", + tools=[], + temperature=1.0, + top_p=1.0, + max_output_tokens=None, + max_tool_calls=None, + previous_response_id=None, + reasoning=None, + truncation="disabled", + error=None, + incomplete_details=None, + instructions=None, + metadata={}, + text={"format": {"type": "text"}}, + usage=None, + user=None, + background=False, + store=True, + ) + + event = ResponseCreatedEvent.construct( + type="response.created", + response=raw_response, + sequence_number=0, + ) + + state.handle_event(event) + return state + + +class TestResponseStreamCancel: + def test_response_id_none_initially(self) -> None: + raw_stream = MagicMock(spec=Stream) + raw_stream.response = MagicMock() + + stream = ResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + ) + + assert stream.response_id is None + + def test_response_id_available_after_event(self) -> None: + raw_stream = MagicMock(spec=Stream) + raw_stream.response = MagicMock() + + stream = ResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + ) + + stream._state = _make_state_with_snapshot("resp_abc") + assert stream.response_id == "resp_abc" + + def test_cancel_raises_when_no_response_id(self) -> None: + raw_stream = MagicMock(spec=Stream) + raw_stream.response = MagicMock() + + stream = ResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + cancel_response=MagicMock(), + ) + + with pytest.raises(ValueError, match="response ID not yet available"): + stream.cancel() + + def test_cancel_raises_when_no_callback(self) -> None: + raw_stream = MagicMock(spec=Stream) + raw_stream.response = MagicMock() + + stream = ResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + ) + + stream._state = _make_state_with_snapshot("resp_abc") + + with pytest.raises(ValueError, match="Cancel not available"): + stream.cancel() + + def test_cancel_calls_callback_and_closes(self) -> None: + raw_stream = MagicMock(spec=Stream) + raw_stream.response = MagicMock() + + mock_response = MagicMock(spec=Response) + cancel_fn = MagicMock(return_value=mock_response) + + stream = ResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + cancel_response=cancel_fn, + ) + + stream._state = _make_state_with_snapshot("resp_xyz") + + result = stream.cancel() + + cancel_fn.assert_called_once_with("resp_xyz") + raw_stream.response.close.assert_called_once() + assert result is mock_response + + +class TestAsyncResponseStreamCancel: + def test_response_id_none_initially(self) -> None: + raw_stream = MagicMock(spec=AsyncStream) + raw_stream.response = MagicMock() + + stream = AsyncResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + ) + + assert stream.response_id is None + + def test_response_id_available_after_event(self) -> None: + raw_stream = MagicMock(spec=AsyncStream) + raw_stream.response = MagicMock() + + stream = AsyncResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + ) + + stream._state = _make_state_with_snapshot("resp_abc") + assert stream.response_id == "resp_abc" + + @pytest.mark.asyncio + async def test_cancel_raises_when_no_response_id(self) -> None: + raw_stream = MagicMock(spec=AsyncStream) + raw_stream.response = MagicMock() + + stream = AsyncResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + cancel_response=AsyncMock(), + ) + + with pytest.raises(ValueError, match="response ID not yet available"): + await stream.cancel() + + @pytest.mark.asyncio + async def test_cancel_calls_callback_and_closes(self) -> None: + raw_stream = MagicMock(spec=AsyncStream) + raw_stream.response = MagicMock() + raw_stream.response.aclose = AsyncMock() + + mock_response = MagicMock(spec=Response) + cancel_fn = AsyncMock(return_value=mock_response) + + stream = AsyncResponseStream( + raw_stream=raw_stream, + text_format=Omit(), + input_tools=Omit(), + starting_after=None, + cancel_response=cancel_fn, + ) + + stream._state = _make_state_with_snapshot("resp_xyz") + + result = await stream.cancel() + + cancel_fn.assert_called_once_with("resp_xyz") + raw_stream.response.aclose.assert_called_once() + assert result is mock_response + + +class TestResponseStreamStateSnapshot: + def test_current_snapshot_none_initially(self) -> None: + state = ResponseStreamState(text_format=Omit(), input_tools=Omit()) + assert state.current_snapshot is None + + def test_current_snapshot_available_after_event(self) -> None: + state = _make_state_with_snapshot("resp_test") + assert state.current_snapshot is not None + assert state.current_snapshot.id == "resp_test" From aad7295c5c5e3e35fcccb303a15926bf075f4bc1 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Mon, 2 Mar 2026 17:25:43 +0100 Subject: [PATCH 2/6] fix(review): close stream in finally to guarantee cleanup on cancel Use try/finally so that stream.close() always runs even if the cancel API call raises an exception. Applies to both sync and async paths. Refs: #2916 --- .../lib/streaming/responses/_responses.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index f6984fb74d..42e2a34e10 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -104,7 +104,7 @@ def response_id(self) -> str | None: def cancel(self) -> Response: """Cancel the response being streamed. - Calls the API to cancel the response and closes the stream. + Closes the stream first, then calls the API to cancel the response. Returns the cancelled Response object. """ @@ -113,9 +113,10 @@ def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - result = self._cancel_response(response_id) - self.close() - return result + try: + return self._cancel_response(response_id) + finally: + self.close() class ResponseStreamManager(Generic[TextFormatT]): @@ -235,7 +236,7 @@ def response_id(self) -> str | None: async def cancel(self) -> Response: """Cancel the response being streamed. - Calls the API to cancel the response and closes the stream. + Closes the stream first, then calls the API to cancel the response. Returns the cancelled Response object. """ @@ -244,9 +245,10 @@ async def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - result = await self._cancel_response(response_id) - await self.close() - return result + try: + return await self._cancel_response(response_id) + finally: + await self.close() class AsyncResponseStreamManager(Generic[TextFormatT]): From fb6a7d96febc37f2aedaffbd29a9823aac34b716 Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Mon, 2 Mar 2026 18:28:37 +0100 Subject: [PATCH 3/6] fix: apply code review suggestions Refs: #2916 --- src/openai/resources/responses/responses.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index ea73e63917..ad22fe1a79 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -1129,7 +1129,13 @@ def stream( timeout=timeout, ) - return ResponseStreamManager(api_request, text_format=text_format, input_tools=tools, starting_after=None, cancel_response=self.cancel) + return ResponseStreamManager( + api_request, + text_format=text_format, + input_tools=tools, + starting_after=None, + cancel_response=self.cancel, + ) else: if not is_given(response_id): raise ValueError("id must be provided when streaming an existing response") From 6983e31246ad8a34186484b7afa0b2c2a6d27beb Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Mon, 2 Mar 2026 19:40:20 +0100 Subject: [PATCH 4/6] fix(responses): propagate stream options to cancel callback and close stream before cancel --- .../lib/streaming/responses/_responses.py | 12 +++---- src/openai/resources/responses/responses.py | 32 ++++++++++++++++--- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 42e2a34e10..a306bf663b 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -113,10 +113,8 @@ def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - try: - return self._cancel_response(response_id) - finally: - self.close() + self.close() + return self._cancel_response(response_id) class ResponseStreamManager(Generic[TextFormatT]): @@ -245,10 +243,8 @@ async def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - try: - return await self._cancel_response(response_id) - finally: - await self.close() + await self.close() + return await self._cancel_response(response_id) class AsyncResponseStreamManager(Generic[TextFormatT]): diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index ad22fe1a79..11d7029bed 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -1134,7 +1134,13 @@ def stream( text_format=text_format, input_tools=tools, starting_after=None, - cancel_response=self.cancel, + cancel_response=lambda response_id: self.cancel( + response_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ), ) else: if not is_given(response_id): @@ -1154,7 +1160,13 @@ def stream( text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, - cancel_response=self.cancel, + cancel_response=lambda response_id: self.cancel( + response_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ), ) def parse( @@ -2801,7 +2813,13 @@ def stream( text_format=text_format, input_tools=tools, starting_after=None, - cancel_response=self.cancel, + cancel_response=lambda response_id: self.cancel( + response_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ), ) else: if isinstance(response_id, Omit): @@ -2821,7 +2839,13 @@ def stream( text_format=text_format, input_tools=tools, starting_after=starting_after if is_given(starting_after) else None, - cancel_response=self.cancel, + cancel_response=lambda response_id: self.cancel( + response_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ), ) async def parse( From 63de4f29da9c3fa92436062087869e0753be8622 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Mon, 2 Mar 2026 20:32:20 +0100 Subject: [PATCH 5/6] fix: apply code review suggestions --- src/openai/lib/streaming/responses/_responses.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index a306bf663b..24bee7e38f 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -113,8 +113,11 @@ def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - self.close() - return self._cancel_response(response_id) + try: + result = self._cancel_response(response_id) + finally: + self.close() + class ResponseStreamManager(Generic[TextFormatT]): @@ -244,7 +247,10 @@ async def cancel(self) -> Response: if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") await self.close() - return await self._cancel_response(response_id) + try: + result = await self._cancel_response(response_id) + finally: + await self.close() class AsyncResponseStreamManager(Generic[TextFormatT]): From 3f6599a66a2026e367af55a8a1d35a3024cb0758 Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Mon, 2 Mar 2026 21:02:05 +0100 Subject: [PATCH 6/6] fix: use try/finally in async cancel() to ensure stream closes on error Remove the pre-try await self.close() that would close the stream before the cancel API call, and keep only the finally block to ensure close() is always called regardless of cancel API success/failure. --- src/openai/lib/streaming/responses/_responses.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 24bee7e38f..85ffede217 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -237,7 +237,7 @@ def response_id(self) -> str | None: async def cancel(self) -> Response: """Cancel the response being streamed. - Closes the stream first, then calls the API to cancel the response. + Calls the API to cancel the response and closes the stream. Returns the cancelled Response object. """ @@ -246,7 +246,6 @@ async def cancel(self) -> Response: raise ValueError("Cannot cancel: response ID not yet available. Wait for the first event.") if self._cancel_response is None: raise ValueError("Cancel not available for this stream.") - await self.close() try: result = await self._cancel_response(response_id) finally: