Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
dask.config.get("distributed.scheduler.default-data-size")
)
STIMULUS_ID_UNSET = "<stimulus_id unset>"
INTERNAL_CLIENT_ADDRESS = "<internal>"

DEFAULT_EXTENSIONS = {
"multi_locks": MultiLockExtension,
Expand Down Expand Up @@ -225,14 +226,25 @@ class ClientState:
#: Output of :func:`distributed.versions.get_versions` on the client
versions: dict[str, Any]

#: Remote address of the client connection as seen by the scheduler.
#: Scheduler-owned synthetic clients use ``<internal>`` instead.
address: str

__slots__ = tuple(__annotations__)

def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
def __init__(
self,
client: str,
*,
versions: dict[str, Any] | None = None,
address: str = INTERNAL_CLIENT_ADDRESS,
):
self.client_key = client
self._hash = hash(client)
self.wants_what = set()
self.last_seen = time()
self.versions = versions or {}
self.address = address

def __hash__(self) -> int:
return self._hash
Expand Down Expand Up @@ -5910,9 +5922,12 @@ async def add_client(
"""
assert client is not None
comm.name = "Scheduler->Client"
logger.info("Receive client connection: %s", client)
client_address = comm.peer_address
logger.info("Receive client connection: %s at %s", client, client_address)
self.log_event(["all", client], {"action": "add-client", "client": client})
self.clients[client] = ClientState(client, versions=versions)
self.clients[client] = ClientState(
client, versions=versions, address=client_address
)
self._client_connections_added_total += 1

for plugin in list(self.plugins.values()):
Expand Down Expand Up @@ -5947,15 +5962,19 @@ async def add_client(
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
logger.info("Close client connection: %s", client)
logger.info(
"Close client connection: %s at %s",
client,
client_address,
)
except TypeError: # comm becomes None during GC
pass

def remove_client(self, client: str, stimulus_id: str | None = None) -> None:
"""Remove client from network"""
stimulus_id = stimulus_id or f"remove-client-{time()}"
if self.status == Status.running:
logger.info("Remove client %s", client)
logger.info("Remove client %s at %s", client, self.clients[client].address)
self.log_event(["all", client], {"action": "remove-client", "client": client})
try:
cs: ClientState = self.clients[client]
Expand Down
27 changes: 27 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,33 @@ async def test_clear_events_client_removal(c, s, a, b):
assert time() < start + 2


@gen_cluster(nthreads=[])
async def test_client_connection_logs_include_address(s):
with captured_logger("distributed.scheduler", level=logging.INFO) as caplog:
async with Client(s.address, asynchronous=True) as c:
client_id = c.id
client_address = s.clients[client_id].address

logs = caplog.getvalue()
assert f"Receive client connection: {client_id} at {client_address}" in logs
assert f"Remove client {client_id} at {client_address}" in logs
assert f"Close client connection: {client_id} at {client_address}" in logs


@gen_cluster(client=True)
async def test_synthetic_client_logs_internal_address(c, s, a, b):
future = c.submit(inc, 1)
await future

s.client_desires_keys(keys=[future.key], client="queue-x")
assert s.clients["queue-x"].address == "<internal>"

with captured_logger("distributed.scheduler", level=logging.INFO) as caplog:
s.remove_client("queue-x")

assert "Remove client queue-x at <internal>" in caplog.getvalue()


@gen_cluster(client=True, nthreads=[])
async def test_add_worker(c, s):
x = c.submit(inc, 1, key="x")
Expand Down
Loading