From 8a006bb32c326efb2ea3eb5a325955e69e5985e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=9F=BA=E9=AD=81?= <1412414664@qq.com> Date: Tue, 23 Jun 2026 11:36:24 +0800 Subject: [PATCH 1/2] fix: merge indexed streaming deltas on first chunk --- src/openai/lib/streaming/_assistants.py | 24 ++++++++--- src/openai/lib/streaming/_deltas.py | 24 ++++++++--- tests/lib/test_streaming_deltas.py | 56 +++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 10 deletions(-) create mode 100644 tests/lib/test_streaming_deltas.py diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 6efb3ca3f1..bb32989141 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -977,16 +977,28 @@ def accumulate_event( return current_message_snapshot, new_content +def _has_indexed_entries(value: object) -> bool: + return is_list(value) and any(is_dict(entry) and "index" in entry for entry in value) + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + if _has_indexed_entries(delta_value): + acc[key] = [] + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: - acc[key] = delta_value - continue + if _has_indexed_entries(delta_value): + acc_value = [] + else: + acc[key] = delta_value + continue + else: + acc_value = acc[key] # the `index` property is used in arrays of objects so it should # not be accumulated like other values e.g. @@ -1007,7 +1019,9 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if all(isinstance(x, (str, int, float)) for x in acc_value) and all( + isinstance(x, (str, int, float)) for x in delta_value + ): acc_value.extend(delta_value) continue diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..b291b0016d 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -3,16 +3,28 @@ from ..._utils import is_dict, is_list +def _has_indexed_entries(value: object) -> bool: + return is_list(value) and any(is_dict(entry) and "index" in entry for entry in value) + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + if _has_indexed_entries(delta_value): + acc[key] = [] + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: - acc[key] = delta_value - continue + if _has_indexed_entries(delta_value): + acc_value = [] + else: + acc[key] = delta_value + continue + else: + acc_value = acc[key] # the `index` property is used in arrays of objects so it should # not be accumulated like other values e.g. @@ -33,7 +45,9 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if all(isinstance(x, (str, int, float)) for x in acc_value) and all( + isinstance(x, (str, int, float)) for x in delta_value + ): acc_value.extend(delta_value) continue diff --git a/tests/lib/test_streaming_deltas.py b/tests/lib/test_streaming_deltas.py new file mode 100644 index 0000000000..547820a84e --- /dev/null +++ b/tests/lib/test_streaming_deltas.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from collections.abc import Callable + +import pytest + +from openai.lib.streaming._deltas import accumulate_delta +from openai.lib.streaming._assistants import accumulate_delta as accumulate_assistant_delta + + +@pytest.mark.parametrize("accumulator", [accumulate_delta, accumulate_assistant_delta]) +def test_accumulate_delta_merges_duplicate_index_entries_in_initial_list( + accumulator: Callable[[dict[object, object], dict[object, object]], dict[object, object]], +) -> None: + acc: dict[object, object] = {} + + accumulator( + acc, + { + "tool_calls": [ + { + "index": 0, + "id": "call_abc", + "function": {"name": "list_files"}, + "type": "function", + }, + { + "index": 0, + "function": {"arguments": '{"path"'}, + }, + ] + }, + ) + accumulator( + acc, + { + "tool_calls": [ + { + "index": 0, + "function": {"arguments": ': "."}'}, + } + ] + }, + ) + + assert acc["tool_calls"] == [ + { + "index": 0, + "id": "call_abc", + "function": { + "name": "list_files", + "arguments": '{"path": "."}', + }, + "type": "function", + } + ] From f1525081a937bbfb05b1862c354136d094dca2e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=9F=BA=E9=AD=81?= <1412414664@qq.com> Date: Tue, 23 Jun 2026 17:14:33 +0800 Subject: [PATCH 2/2] fix initial duplicate tool call deltas --- src/openai/lib/streaming/chat/_completions.py | 12 ++++--- tests/lib/chat/test_completions_streaming.py | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/openai/lib/streaming/chat/_completions.py b/src/openai/lib/streaming/chat/_completions.py index 5f072cafbd..ceca2b6f10 100644 --- a/src/openai/lib/streaming/chat/_completions.py +++ b/src/openai/lib/streaming/chat/_completions.py @@ -39,7 +39,7 @@ from ....types.chat import ChatCompletionChunk, ParsedChatCompletion, ChatCompletionToolUnionParam from ...._exceptions import LengthFinishReasonError, ContentFilterFinishReasonError from ....types.chat.chat_completion import ChoiceLogprobs -from ....types.chat.chat_completion_chunk import Choice as ChoiceChunk +from ....types.chat.chat_completion_chunk import Choice as ChoiceChunk, ChoiceDelta from ....types.chat.completion_create_params import ResponseFormat as ResponseFormatParam @@ -393,7 +393,7 @@ def _accumulate_chunk(self, chunk: ChatCompletionChunk) -> ParsedChatCompletionS ), ), ), - cast("dict[object, object]", choice.delta.to_dict()), + _convert_delta_to_message(choice.delta), ), ), ) @@ -415,7 +415,7 @@ def _accumulate_chunk(self, chunk: ChatCompletionChunk) -> ParsedChatCompletionS type_=ParsedChoiceSnapshot, value={ **choice.model_dump(exclude_unset=True, exclude={"delta"}), - "message": choice.delta.to_dict(), + "message": _convert_delta_to_message(choice.delta), }, ), ) @@ -744,7 +744,7 @@ def _convert_initial_chunk_into_snapshot(chunk: ChatCompletionChunk) -> ParsedCh for choice in chunk.choices: choices[choice.index] = { **choice.model_dump(exclude_unset=True, exclude={"delta"}), - "message": choice.delta.to_dict(), + "message": _convert_delta_to_message(choice.delta), } return cast( @@ -760,6 +760,10 @@ def _convert_initial_chunk_into_snapshot(chunk: ChatCompletionChunk) -> ParsedCh ) +def _convert_delta_to_message(delta: ChoiceDelta) -> dict[object, object]: + return accumulate_delta({}, cast("dict[object, object]", delta.to_dict())) + + def _is_valid_chat_completion_chunk_weak(sse_event: ChatCompletionChunk) -> bool: # Although the _raw_stream is always supposed to contain only objects adhering to ChatCompletionChunk schema, # this is broken by the Azure OpenAI in case of Asynchronous Filter enabled. diff --git a/tests/lib/chat/test_completions_streaming.py b/tests/lib/chat/test_completions_streaming.py index 598a41ee2b..d9a460901d 100644 --- a/tests/lib/chat/test_completions_streaming.py +++ b/tests/lib/chat/test_completions_streaming.py @@ -725,6 +725,38 @@ class GetWeatherArgs(BaseModel): ) +@pytest.mark.respx(base_url=base_url) +def test_duplicate_tool_call_index_in_initial_chunk(client: OpenAI, respx_mock: MockRouter) -> None: + listener = _make_stream_snapshot_request( + lambda c: c.chat.completions.stream( + model="gpt-4o-2024-08-06", + messages=[{"role": "user", "content": "List files"}], + tools=[ + { + "type": "function", + "function": { + "name": "list_files", + "parameters": {"type": "object", "properties": {"path": {"type": "string"}}}, + }, + } + ], + ), + content_snapshot=snapshot( + b'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1727346161,"model":"gpt-4o-2024-08-06","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_abc","type":"function","function":{"name":"list_files"}},{"index":0,"function":{"arguments":"{\\"path\\""}}]}}]}\n\n' + b'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1727346161,"model":"gpt-4o-2024-08-06","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":": \\".\\"}"}}]}}]}\n\n' + b'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1727346161,"model":"gpt-4o-2024-08-06","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}\n\n' + b"data: [DONE]\n\n" + ), + mock_client=client, + respx_mock=respx_mock, + ) + + final_choice = listener.stream.get_final_completion().choices[0] + assert final_choice.message.tool_calls is not None + assert len(final_choice.message.tool_calls) == 1 + assert final_choice.message.tool_calls[0].function.arguments == '{"path": "."}' + + @pytest.mark.respx(base_url=base_url) def test_parse_multiple_pydantic_tools(client: OpenAI, respx_mock: MockRouter, monkeypatch: pytest.MonkeyPatch) -> None: class GetWeatherArgs(BaseModel):