Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "livepeer-gateway"
version = "0.1.0"
requires-python = ">=3.10"
requires-python = ">=3.11"
dependencies = [
"grpcio>=1.65.0",
"protobuf>=4.25.0",
Expand Down
9 changes: 4 additions & 5 deletions src/livepeer_gateway/media_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,10 +942,8 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None:
asyncio.to_thread(read_file.read, _READ_CHUNK)
)
try:
chunk = await asyncio.wait_for(
asyncio.shield(pending_read),
timeout=idle_timeout_s,
)
async with asyncio.timeout(idle_timeout_s):
chunk = await asyncio.shield(pending_read)
except asyncio.TimeoutError:
# NB: This intentionally keeps trickle rolling
# accommodate long stalls / inactivity from the
Expand Down Expand Up @@ -1046,7 +1044,8 @@ async def _drain() -> None:
# Best-effort cleanup: absorb TimeoutError and CancelledError
# so drain never blocks shutdown
try:
await asyncio.wait_for(_drain(), timeout=_DRAIN_TIMEOUT_S)
async with asyncio.timeout(_DRAIN_TIMEOUT_S):
await _drain()
except BaseException:
pass

Expand Down
11 changes: 8 additions & 3 deletions src/livepeer_gateway/trickle_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ async def write(self, data: bytes) -> None:
try:
# This bounds local backpressure while feeding the request body; it does
# not bound the total lifetime of the HTTP POST once the body is drained.
await asyncio.wait_for(self.queue.put(data), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S)
async with asyncio.timeout(_SEGMENT_QUEUE_PUT_TIMEOUT_S):
await self.queue.put(data)
if self._on_write_bytes is not None:
self._on_write_bytes(len(data))
except asyncio.TimeoutError as e:
Expand All @@ -602,8 +603,12 @@ async def close(self) -> None:
if self._seg_state.error is not None:
return
try:
await asyncio.wait_for(self.queue.put(None), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S)
# BaseException to also capture cancellation errors, timeout errors, etc
async with asyncio.timeout(_SEGMENT_QUEUE_PUT_TIMEOUT_S):
await self.queue.put(None)
except asyncio.CancelledError:
# Cancellation isn't a close failure.
raise
# BaseException to also capture timeout errors, etc
except BaseException:
_LOG.warning("Trickle segment close suppressed seq=%s", self._seq, exc_info=True)

Expand Down
Loading