Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/trio/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ def __init__(self, max_buffer_size: int | float) -> None: # noqa: PYI041

@attrs.frozen
class MemoryChannelStatistics:
"""Statistics about a memory channel.

Returned by :meth:`MemorySendChannel.statistics` and
:meth:`MemoryReceiveChannel.statistics`.

Attributes:
current_buffer_used: The number of items currently buffered in the channel.
max_buffer_size: The maximum number of items that can be buffered.
open_send_channels: The number of open
:class:`MemorySendChannel` endpoints.
open_receive_channels: The number of open
:class:`MemoryReceiveChannel` endpoints.
tasks_waiting_send: The number of tasks waiting to send.
tasks_waiting_receive: The number of tasks waiting to receive.
"""

current_buffer_used: int
max_buffer_size: int | float
open_send_channels: int
Expand Down Expand Up @@ -152,6 +168,14 @@ def statistics(self) -> MemoryChannelStatistics:
@final
@attrs.define(eq=False, repr=False, slots=False)
class MemorySendChannel(SendChannel[SendType], metaclass=NoPublicConstructor):
"""The send end of a memory channel, created by
:func:`open_memory_channel`.

See :ref:`channel` for details. This implements the
:class:`trio.abc.SendChannel` interface.

"""

_state: MemoryChannelState[SendType]
_closed: bool = False
# This is just the tasks waiting on *this* object. As compared to
Expand Down Expand Up @@ -300,6 +324,14 @@ async def aclose(self) -> None:
@final
@attrs.define(eq=False, repr=False, slots=False)
class MemoryReceiveChannel(ReceiveChannel[ReceiveType], metaclass=NoPublicConstructor):
"""The receive end of a memory channel, created by
:func:`open_memory_channel`.

See :ref:`channel` for details. This implements the
:class:`trio.abc.ReceiveChannel` interface.

"""

_state: MemoryChannelState[ReceiveType]
_closed: bool = False
_tasks: set[trio._core._run.Task] = attrs.Factory(set)
Expand Down
2 changes: 2 additions & 0 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class ParkingLot:
# items
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False)
broken_by: list[Task] = attrs.field(factory=list, init=False)
"""List of tasks that have broken this parking lot via
:meth:`break_lot`. An empty list if the lot has not been broken."""

def __len__(self) -> int:
"""Returns the number of parked tasks."""
Expand Down
5 changes: 5 additions & 0 deletions src/trio/_highlevel_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, socket: SocketType) -> None:
self.setsockopt(tsocket.IPPROTO_TCP, tsocket.TCP_NOTSENT_LOWAT, 2**14)

async def send_all(self, data: bytes | bytearray | memoryview) -> None:
"""See :meth:`trio.abc.SendStream.send_all`."""
if self.socket.did_shutdown_SHUT_WR:
raise trio.ClosedResourceError("can't send data after sending EOF")
with self._send_conflict_detector:
Expand All @@ -124,13 +125,15 @@ async def send_all(self, data: bytes | bytearray | memoryview) -> None:
total_sent += sent

async def wait_send_all_might_not_block(self) -> None:
"""See :meth:`trio.abc.SendStream.wait_send_all_might_not_block`."""
with self._send_conflict_detector:
if self.socket.fileno() == -1:
raise trio.ClosedResourceError
with _translate_socket_errors_to_stream_errors():
await self.socket.wait_writable()

async def send_eof(self) -> None:
"""See :meth:`trio.abc.HalfCloseableStream.send_eof`."""
with self._send_conflict_detector:
await trio.lowlevel.checkpoint()
# On macOS, calling shutdown a second time raises ENOTCONN, but
Expand All @@ -141,6 +144,7 @@ async def send_eof(self) -> None:
self.socket.shutdown(tsocket.SHUT_WR)

async def receive_some(self, max_bytes: int | None = None) -> bytes:
"""See :meth:`trio.abc.ReceiveStream.receive_some`."""
if max_bytes is None:
max_bytes = DEFAULT_RECEIVE_SIZE
if max_bytes < 1:
Expand All @@ -149,6 +153,7 @@ async def receive_some(self, max_bytes: int | None = None) -> bytes:
return await self.socket.recv(max_bytes)

async def aclose(self) -> None:
"""See :meth:`trio.abc.AsyncResource.aclose`."""
self.socket.close()
await trio.lowlevel.checkpoint()

Expand Down
4 changes: 3 additions & 1 deletion src/trio/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def pidfd_open(fd: int, flags: int) -> int:
class HasFileno(Protocol):
"""Represents any file-like object that has a file descriptor."""

def fileno(self) -> int: ...
def fileno(self) -> int:
"""Return the underlying file descriptor as an integer."""
...


@final
Expand Down
Loading