diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 6efb3ca3f1..8a9fd3a48a 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -980,8 +980,19 @@ def accumulate_event( def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + # For lists of indexed dicts (e.g. tool_calls), initialize an empty + # list and fall through to the merge logic so that entries with + # duplicate indexes in the first chunk are correctly merged. + if ( + is_list(delta_value) + and delta_value + and is_dict(delta_value[0]) + and "index" in delta_value[0] + ): + acc[key] = [] + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: @@ -1007,7 +1018,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value): acc_value.extend(delta_value) continue @@ -1023,15 +1034,21 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> if not isinstance(index, int): raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}") - try: - acc_entry = acc_value[index] - except IndexError: - acc_value.insert(index, delta_entry) + # Find existing entry by its index field value rather + # than by list position, since list position may diverge + # from the index field (e.g. duplicate or out-of-order indexes). + existing_entry = None + existing_entry_idx = None + for i, entry in enumerate(acc_value): + if is_dict(entry) and entry.get("index") == index: + existing_entry = entry + existing_entry_idx = i + break + + if existing_entry is not None: + acc_value[existing_entry_idx] = accumulate_delta(existing_entry, delta_entry) else: - if not is_dict(acc_entry): - raise TypeError("not handled yet") - - acc_value[index] = accumulate_delta(acc_entry, delta_entry) + acc_value.append(delta_entry) acc[key] = acc_value diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..9096c80eb5 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -6,8 +6,19 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value - continue + # For lists of indexed dicts (e.g. tool_calls), initialize an empty + # list and fall through to the merge logic so that entries with + # duplicate indexes in the first chunk are correctly merged. + if ( + is_list(delta_value) + and delta_value + and is_dict(delta_value[0]) + and "index" in delta_value[0] + ): + acc[key] = [] + else: + acc[key] = delta_value + continue acc_value = acc[key] if acc_value is None: @@ -33,7 +44,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> elif is_list(acc_value) and is_list(delta_value): # for lists of non-dictionary items we'll only ever get new entries # in the array, existing entries will never be changed - if all(isinstance(x, (str, int, float)) for x in acc_value): + if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value): acc_value.extend(delta_value) continue @@ -49,15 +60,21 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> if not isinstance(index, int): raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}") - try: - acc_entry = acc_value[index] - except IndexError: - acc_value.insert(index, delta_entry) - else: - if not is_dict(acc_entry): - raise TypeError("not handled yet") + # Find existing entry by its index field value rather + # than by list position, since list position may diverge + # from the index field (e.g. duplicate or out-of-order indexes). + existing_entry = None + existing_entry_idx = None + for i, entry in enumerate(acc_value): + if is_dict(entry) and entry.get("index") == index: + existing_entry = entry + existing_entry_idx = i + break - acc_value[index] = accumulate_delta(acc_entry, delta_entry) + if existing_entry is not None: + acc_value[existing_entry_idx] = accumulate_delta(existing_entry, delta_entry) + else: + acc_value.append(delta_entry) acc[key] = acc_value diff --git a/tests/lib/test_streaming_deltas.py b/tests/lib/test_streaming_deltas.py new file mode 100644 index 0000000000..2f55cfec11 --- /dev/null +++ b/tests/lib/test_streaming_deltas.py @@ -0,0 +1,205 @@ +"""Tests for accumulate_delta from openai.lib.streaming._deltas. + +Regression tests for https://github.com/openai/openai-python/issues/3201: +streaming tool_call deltas with duplicate indexes accumulated incorrectly. +""" +from __future__ import annotations + +from openai.lib.streaming._deltas import accumulate_delta + + +class TestAccumulateDeltaToolCallsDuplicateIndex: + """Issue #3201: when the first streaming chunk contains multiple tool_calls + at the same index, they should be merged into a single entry.""" + + def test_duplicate_index_in_first_chunk(self) -> None: + """Simulate the exact scenario from issue #3201: + first chunk has two tool_call entries both with index=0.""" + acc: dict[object, object] = {} + + # First chunk: two entries both at index 0 + # (e.g. first provides id+name, second provides initial arguments) + chunk1 = { + "tool_calls": [ + { + "index": 0, + "id": "functions.list_files:0", + "function": {"name": "list_files"}, + "type": "function", + }, + { + "index": 0, + "function": {"arguments": ' {"'}, + }, + ] + } + + acc = accumulate_delta(acc, chunk1) + + # Should have a single entry in tool_calls, not two + tool_calls = acc["tool_calls"] # type: ignore[index] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1, ( + f"Expected 1 tool_call entry after merging duplicate indexes, " + f"got {len(tool_calls)}: {tool_calls}" + ) + + tc = tool_calls[0] # type: ignore[index] + assert tc["index"] == 0 + assert tc["id"] == "functions.list_files:0" + assert tc["function"]["name"] == "list_files" + assert tc["function"]["arguments"] == ' {"' + + def test_subsequent_chunks_merge_into_existing(self) -> None: + """After the first chunk with duplicates is correctly merged, + subsequent chunks should continue accumulating into the same entry.""" + acc: dict[object, object] = {} + + # First chunk with duplicate index 0 + chunk1 = { + "tool_calls": [ + { + "index": 0, + "id": "functions.list_files:0", + "function": {"name": "list_files"}, + "type": "function", + }, + { + "index": 0, + "function": {"arguments": ' {"'}, + }, + ] + } + acc = accumulate_delta(acc, chunk1) + + # Second chunk: more arguments for the same tool_call + chunk2 = { + "tool_calls": [ + { + "index": 0, + "function": {"arguments": 'path": "."}'}, + }, + ] + } + acc = accumulate_delta(acc, chunk2) + + tool_calls = acc["tool_calls"] # type: ignore[index] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1 + + tc = tool_calls[0] # type: ignore[index] + assert tc["id"] == "functions.list_files:0" + assert tc["function"]["name"] == "list_files" + assert tc["function"]["arguments"] == ' {"path": "."}' + + def test_multiple_tool_calls_different_indexes(self) -> None: + """Normal case: two different tool calls at index 0 and 1.""" + acc: dict[object, object] = {} + + chunk1 = { + "tool_calls": [ + { + "index": 0, + "id": "call_1", + "function": {"name": "get_weather", "arguments": ""}, + "type": "function", + }, + { + "index": 1, + "id": "call_2", + "function": {"name": "get_time", "arguments": ""}, + "type": "function", + }, + ] + } + acc = accumulate_delta(acc, chunk1) + + tool_calls = acc["tool_calls"] # type: ignore[index] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 2 + assert tool_calls[0]["id"] == "call_1" # type: ignore[index] + assert tool_calls[1]["id"] == "call_2" # type: ignore[index] + + # Later chunks accumulate correctly + chunk2 = { + "tool_calls": [ + {"index": 0, "function": {"arguments": '{"city": "SF"}'}}, + {"index": 1, "function": {"arguments": '{"tz": "PST"}'}}, + ] + } + acc = accumulate_delta(acc, chunk2) + + tool_calls = acc["tool_calls"] # type: ignore[index] + assert len(tool_calls) == 2 + assert tool_calls[0]["function"]["arguments"] == '{"city": "SF"}' # type: ignore[index] + assert tool_calls[1]["function"]["arguments"] == '{"tz": "PST"}' # type: ignore[index] + + def test_non_indexed_lists_still_work(self) -> None: + """Ensure non-indexed lists (e.g. text content arrays) still work + by simple extension.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, {"tags": ["a", "b"]}) + acc = accumulate_delta(acc, {"tags": ["c"]}) + assert acc["tags"] == ["a", "b", "c"] + + def test_basic_string_accumulation(self) -> None: + """Ensure basic string accumulation still works.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, {"content": "Hello"}) + acc = accumulate_delta(acc, {"content": " world"}) + assert acc["content"] == "Hello world" + + def test_first_chunk_with_single_indexed_entry(self) -> None: + """A single tool_call in the first chunk (no duplicates) should work + the same as before.""" + acc: dict[object, object] = {} + + chunk1 = { + "tool_calls": [ + { + "index": 0, + "id": "call_1", + "function": {"name": "foo"}, + "type": "function", + }, + ] + } + acc = accumulate_delta(acc, chunk1) + + tool_calls = acc["tool_calls"] # type: ignore[index] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1 + assert tool_calls[0]["id"] == "call_1" # type: ignore[index] + + def test_multiple_duplicate_indexes_in_first_chunk(self) -> None: + """Edge case: three entries at the same index in the first chunk.""" + acc: dict[object, object] = {} + + chunk1 = { + "tool_calls": [ + { + "index": 0, + "id": "call_1", + "type": "function", + }, + { + "index": 0, + "function": {"name": "get_weather"}, + }, + { + "index": 0, + "function": {"arguments": '{"'}, + }, + ] + } + acc = accumulate_delta(acc, chunk1) + + tool_calls = acc["tool_calls"] # type: ignore[index] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1 + + tc = tool_calls[0] # type: ignore[index] + assert tc["id"] == "call_1" + assert tc["type"] == "function" + assert tc["function"]["name"] == "get_weather" + assert tc["function"]["arguments"] == '{"'