Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/openai/lib/streaming/_assistants.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,19 @@ def accumulate_event(
def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]:
for key, delta_value in delta.items():
if key not in acc:
acc[key] = delta_value
continue
# For lists of indexed dicts (e.g. tool_calls), initialize an empty
# list and fall through to the merge logic so that entries with
# duplicate indexes in the first chunk are correctly merged.
if (
is_list(delta_value)
and delta_value
and is_dict(delta_value[0])
and "index" in delta_value[0]
):
acc[key] = []
Comment thread
EvanYao826 marked this conversation as resolved.
else:
acc[key] = delta_value
continue

acc_value = acc[key]
if acc_value is None:
Expand All @@ -1007,7 +1018,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
elif is_list(acc_value) and is_list(delta_value):
# for lists of non-dictionary items we'll only ever get new entries
# in the array, existing entries will never be changed
if all(isinstance(x, (str, int, float)) for x in acc_value):
if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value):
acc_value.extend(delta_value)
continue

Expand All @@ -1023,15 +1034,21 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
if not isinstance(index, int):
raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}")

try:
acc_entry = acc_value[index]
except IndexError:
acc_value.insert(index, delta_entry)
# Find existing entry by its index field value rather
# than by list position, since list position may diverge
# from the index field (e.g. duplicate or out-of-order indexes).
existing_entry = None
existing_entry_idx = None
for i, entry in enumerate(acc_value):
if is_dict(entry) and entry.get("index") == index:
existing_entry = entry
existing_entry_idx = i
break

if existing_entry is not None:
acc_value[existing_entry_idx] = accumulate_delta(existing_entry, delta_entry)
else:
if not is_dict(acc_entry):
raise TypeError("not handled yet")

acc_value[index] = accumulate_delta(acc_entry, delta_entry)
acc_value.append(delta_entry)

acc[key] = acc_value

Expand Down
39 changes: 28 additions & 11 deletions src/openai/lib/streaming/_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@
def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]:
for key, delta_value in delta.items():
if key not in acc:
acc[key] = delta_value
continue
# For lists of indexed dicts (e.g. tool_calls), initialize an empty
# list and fall through to the merge logic so that entries with
# duplicate indexes in the first chunk are correctly merged.
if (
is_list(delta_value)
and delta_value
and is_dict(delta_value[0])
and "index" in delta_value[0]
):
acc[key] = []
Comment thread
EvanYao826 marked this conversation as resolved.
else:
acc[key] = delta_value
continue

acc_value = acc[key]
if acc_value is None:
Expand All @@ -33,7 +44,7 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
elif is_list(acc_value) and is_list(delta_value):
# for lists of non-dictionary items we'll only ever get new entries
# in the array, existing entries will never be changed
if all(isinstance(x, (str, int, float)) for x in acc_value):
if acc_value and all(isinstance(x, (str, int, float)) for x in acc_value):
acc_value.extend(delta_value)
continue

Expand All @@ -49,15 +60,21 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
if not isinstance(index, int):
raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}")

try:
acc_entry = acc_value[index]
except IndexError:
acc_value.insert(index, delta_entry)
else:
if not is_dict(acc_entry):
raise TypeError("not handled yet")
# Find existing entry by its index field value rather
# than by list position, since list position may diverge
# from the index field (e.g. duplicate or out-of-order indexes).
existing_entry = None
existing_entry_idx = None
for i, entry in enumerate(acc_value):
if is_dict(entry) and entry.get("index") == index:
existing_entry = entry
existing_entry_idx = i
break

acc_value[index] = accumulate_delta(acc_entry, delta_entry)
if existing_entry is not None:
acc_value[existing_entry_idx] = accumulate_delta(existing_entry, delta_entry)
else:
acc_value.append(delta_entry)

acc[key] = acc_value

Expand Down
205 changes: 205 additions & 0 deletions tests/lib/test_streaming_deltas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Tests for accumulate_delta from openai.lib.streaming._deltas.

Regression tests for https://github.com/openai/openai-python/issues/3201:
streaming tool_call deltas with duplicate indexes accumulated incorrectly.
"""
from __future__ import annotations

from openai.lib.streaming._deltas import accumulate_delta


class TestAccumulateDeltaToolCallsDuplicateIndex:
"""Issue #3201: when the first streaming chunk contains multiple tool_calls
at the same index, they should be merged into a single entry."""

def test_duplicate_index_in_first_chunk(self) -> None:
"""Simulate the exact scenario from issue #3201:
first chunk has two tool_call entries both with index=0."""
acc: dict[object, object] = {}

# First chunk: two entries both at index 0
# (e.g. first provides id+name, second provides initial arguments)
chunk1 = {
"tool_calls": [
{
"index": 0,
"id": "functions.list_files:0",
"function": {"name": "list_files"},
"type": "function",
},
{
"index": 0,
"function": {"arguments": ' {"'},
},
]
}

acc = accumulate_delta(acc, chunk1)

# Should have a single entry in tool_calls, not two
tool_calls = acc["tool_calls"] # type: ignore[index]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1, (
f"Expected 1 tool_call entry after merging duplicate indexes, "
f"got {len(tool_calls)}: {tool_calls}"
)

tc = tool_calls[0] # type: ignore[index]
assert tc["index"] == 0
assert tc["id"] == "functions.list_files:0"
assert tc["function"]["name"] == "list_files"
assert tc["function"]["arguments"] == ' {"'

def test_subsequent_chunks_merge_into_existing(self) -> None:
"""After the first chunk with duplicates is correctly merged,
subsequent chunks should continue accumulating into the same entry."""
acc: dict[object, object] = {}

# First chunk with duplicate index 0
chunk1 = {
"tool_calls": [
{
"index": 0,
"id": "functions.list_files:0",
"function": {"name": "list_files"},
"type": "function",
},
{
"index": 0,
"function": {"arguments": ' {"'},
},
]
}
acc = accumulate_delta(acc, chunk1)

# Second chunk: more arguments for the same tool_call
chunk2 = {
"tool_calls": [
{
"index": 0,
"function": {"arguments": 'path": "."}'},
},
]
}
acc = accumulate_delta(acc, chunk2)

tool_calls = acc["tool_calls"] # type: ignore[index]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1

tc = tool_calls[0] # type: ignore[index]
assert tc["id"] == "functions.list_files:0"
assert tc["function"]["name"] == "list_files"
assert tc["function"]["arguments"] == ' {"path": "."}'

def test_multiple_tool_calls_different_indexes(self) -> None:
"""Normal case: two different tool calls at index 0 and 1."""
acc: dict[object, object] = {}

chunk1 = {
"tool_calls": [
{
"index": 0,
"id": "call_1",
"function": {"name": "get_weather", "arguments": ""},
"type": "function",
},
{
"index": 1,
"id": "call_2",
"function": {"name": "get_time", "arguments": ""},
"type": "function",
},
]
}
acc = accumulate_delta(acc, chunk1)

tool_calls = acc["tool_calls"] # type: ignore[index]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 2
assert tool_calls[0]["id"] == "call_1" # type: ignore[index]
assert tool_calls[1]["id"] == "call_2" # type: ignore[index]

# Later chunks accumulate correctly
chunk2 = {
"tool_calls": [
{"index": 0, "function": {"arguments": '{"city": "SF"}'}},
{"index": 1, "function": {"arguments": '{"tz": "PST"}'}},
]
}
acc = accumulate_delta(acc, chunk2)

tool_calls = acc["tool_calls"] # type: ignore[index]
assert len(tool_calls) == 2
assert tool_calls[0]["function"]["arguments"] == '{"city": "SF"}' # type: ignore[index]
assert tool_calls[1]["function"]["arguments"] == '{"tz": "PST"}' # type: ignore[index]

def test_non_indexed_lists_still_work(self) -> None:
"""Ensure non-indexed lists (e.g. text content arrays) still work
by simple extension."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {"tags": ["a", "b"]})
acc = accumulate_delta(acc, {"tags": ["c"]})
assert acc["tags"] == ["a", "b", "c"]

def test_basic_string_accumulation(self) -> None:
"""Ensure basic string accumulation still works."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {"content": "Hello"})
acc = accumulate_delta(acc, {"content": " world"})
assert acc["content"] == "Hello world"

def test_first_chunk_with_single_indexed_entry(self) -> None:
"""A single tool_call in the first chunk (no duplicates) should work
the same as before."""
acc: dict[object, object] = {}

chunk1 = {
"tool_calls": [
{
"index": 0,
"id": "call_1",
"function": {"name": "foo"},
"type": "function",
},
]
}
acc = accumulate_delta(acc, chunk1)

tool_calls = acc["tool_calls"] # type: ignore[index]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1" # type: ignore[index]

def test_multiple_duplicate_indexes_in_first_chunk(self) -> None:
"""Edge case: three entries at the same index in the first chunk."""
acc: dict[object, object] = {}

chunk1 = {
"tool_calls": [
{
"index": 0,
"id": "call_1",
"type": "function",
},
{
"index": 0,
"function": {"name": "get_weather"},
},
{
"index": 0,
"function": {"arguments": '{"'},
},
]
}
acc = accumulate_delta(acc, chunk1)

tool_calls = acc["tool_calls"] # type: ignore[index]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1

tc = tool_calls[0] # type: ignore[index]
assert tc["id"] == "call_1"
assert tc["type"] == "function"
assert tc["function"]["name"] == "get_weather"
assert tc["function"]["arguments"] == '{"'