-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Expand file tree
/
Copy pathtest_streaming_errors.py
More file actions
52 lines (37 loc) · 1.59 KB
/
test_streaming_errors.py
File metadata and controls
52 lines (37 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from __future__ import annotations
from typing import Iterator, AsyncIterator
import httpx
import pytest
from openai import OpenAI, AsyncOpenAI
from openai._exceptions import APIError
from openai._streaming import Stream, AsyncStream
@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_thread_event_error_raises(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
def body() -> Iterator[bytes]:
yield b"event: thread.error\n"
yield b'data: {"error": {"message": "boom"}}\n'
yield b"\n"
iterator = make_stream_iterator(content=body(), sync=sync, client=client, async_client=async_client)
with pytest.raises(APIError, match="boom"):
await iter_next(iterator)
async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]:
for chunk in iter:
yield chunk
async def iter_next(iter: Iterator[object] | AsyncIterator[object]) -> object:
if isinstance(iter, AsyncIterator):
return await iter.__anext__()
return next(iter)
def make_stream_iterator(
content: Iterator[bytes],
*,
sync: bool,
client: OpenAI,
async_client: AsyncOpenAI,
) -> Iterator[object] | AsyncIterator[object]:
request = httpx.Request("GET", "http://test")
if sync:
response = httpx.Response(200, request=request, content=content)
return iter(Stream(cast_to=object, client=client, response=response))
response = httpx.Response(200, request=request, content=to_aiter(content))
return AsyncStream(cast_to=object, client=async_client, response=response).__aiter__()