Skip to content

Tcp End#99

Open
SHshenhao wants to merge 10 commits into
mainfrom
tcp-v4
Open

Tcp End#99
SHshenhao wants to merge 10 commits into
mainfrom
tcp-v4

Conversation

@SHshenhao
Copy link
Copy Markdown

add tcp end

root and others added 10 commits May 14, 2026 13:24
Introduce a standalone-asio-based TcpEndpoint in dlslime/csrc/engine/tcp/
with four async communication primitives, all supporting timeout (default 30s).

Architecture highlights:
- 17-byte SessionHeader (Mooncake-aligned): {size, addr, opcode} with 3 opcodes
  (OP_SEND, OP_READ, OP_WRITE) supporting 4 primitives (recv matched passively)
- TcpContext: shared io_context + connection pool + background thread,
  multiple endpoints can share one context to reduce thread count
- TcpConnectionPool: (host, port)-keyed connection reuse, 60s idle timeout
- ServerSession: async_read callback chain (readHeader->dispatch->readBody loop)
  with 64KB chunked reads for large payloads
- Symmetric connection rendezvous (is_initiator by host:port comparison)

Async primitives:
- async_send(chunk, timeout_ms=30000): post to io_ctx, async_write, signal future
- async_recv(chunk, timeout_ms=30000): FIFO registration, ServerSession matches
  incoming OP_SEND, memcpy to user buffer, signal future
- async_read(assign, timeout_ms=30000): post OP_READ header, async_read response
  data, connection reserved until response arrives
- async_write(assign, timeout_ms=30000): post OP_WRITE header+payload via
  async_write, signal future

Timeout: SO_SNDTIMEO on socket for send/write, future.wait_for(ms) timed
busy-spin (machnet_pause) for recv/read.  All return TcpFuture with wait()
and wait_for(seconds) -> int|None.

Files: 16 new (10 in tcp/), 5 modified (CMakeLists chain + bind.cpp)
Tests: 5 Python cases (send/recv, write/read, recv timeout, send timeout,
default timeout) all pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Remove void* stream from all 4 async_* methods (RDMA leftover, never used)
- Remove timeout_ms from async_recv (recv timeout via future.wait_for())
- Remove ineffective SO_SNDTIMEO calls (no effect on asio::async_write)
- Update pybind11 bindings and tests to match
- Add tcp/plan.md with v3 architecture documentation

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- TcpEndpoint(ip, port): bind to specific NIC instead of hardcoded 0.0.0.0
- TcpEndpoint(TcpContext&): =delete until multi-endpoint semantics resolved
- TcpMemoryPool: remove offset field — caller passes final address directly
- TcpConnectionPool: call cleanupIdleConnections in getConnection hot path
  with lock parameter; remove dead connections during iteration; fix returnConnection
  value-type bug; kIdleTimeout 60→300s
- connect: accept repeated calls (remove connected_ guard)
- register_memory_region: simplified signature without offset
- Python: constructor keyword args, offset removed from bindings

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
ServerSession:
- remove readBody chunking (kDefaultChunkSize / transferred_ / chunk_buf_)
  — asio::async_read already loops internally, application-level chunking
  adds nothing but callback overhead
- add writeBody(src, len) symmetric to readBody(dst, len)
- readBody/writeBody share the same pattern: async I/O → readHeader on done
- OP_SEND stays inline (needs signal between read and readHeader)

TcpMemoryPool:
- register_memory_region: name now mandatory (const std::string&, no optional)
- reject empty name and duplicate name with SLIME_LOG_WARN + return -1
- remove handle_to_name_ vector (no longer needed)

TcpConnectionPool:
- cleanupIdleConnections(bool lock = true) — caller can skip internal lock
- getConnection calls cleanupIdleConnections(false) on the hot path

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Introduce ClientSession as the outbound counterpart to ServerSession,
driving one I/O operation per instance.  Endpoint primitives now create
ClientSession instead of ad-hoc lambdas.

ClientSession:
- start_write(hdr, payload): gather async_write header + body
- start_read(hdr, dst): write OP_READ header → async_read response to dst
- DoneCallback reports asio::error_code; Primitive signals OpState on done
- Self-destructs via shared_ptr when async chain completes

Endpoint cleanup:
- async_send / async_write / async_read: ad-hoc asio::post + nested lambda
  replaced with ClientSession creation + start_xxx
- Removed pending_reads_ / read_mu_ / next_req_id_ (no longer needed —
  ClientSession's start_read callback directly delivers the result)
- Removed write_message helper (no longer used)
- shutdown: removed pending_reads_ cleanup block

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- CUDA: char* + new[]/delete[] staging pattern (Mooncake-aligned)
  async_send/async_write: D2H before ClientSession; async_recv/async_read:
  H2D via RecvSlot::post_read callback in ServerSession
- Remove is_initiator: both sides use conn_pool on-demand
- connect() verifies reachability via getConnection before setting peer state
- send/recv: treat chunk_tuple_t as raw pointers (no MR lookup needed
  for bilateral ops); read/write continue using MemoryPool for remote
  address resolution
- PendingRecv: add staging_buf (unique_ptr<char[]>) and cuda_dst for CUDA
- RecvSlot: add post_read callback for post-recv CUDA H2D before signal

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@SHshenhao SHshenhao requested a review from JimyMa May 19, 2026 11:16
@SHshenhao SHshenhao requested a deployment to self-hosted-rdma May 19, 2026 11:16 — with GitHub Actions Waiting
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


root seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a TCP transport layer to the dlslime library, implementing asynchronous communication primitives (send, recv, read, write) and a wire protocol compatible with Mooncake. The implementation features a connection pool, an asio-based event loop, and memory region management. Review feedback highlights several areas for improvement, including the need for message size validation to prevent OOM attacks, transitioning synchronous connection logic to asynchronous calls to avoid blocking, and using smart pointers for safer memory management. Additionally, the reviewer recommended optimizing busy-wait loops in futures and avoiding synchronous CUDA memory copies within the event loop to maintain performance.

Comment thread dlslime/csrc/engine/tcp/tcp_session.cpp
Comment on lines +31 to +33
auto endpoints = resolver.resolve(host, std::to_string(port));
tcp::socket sock(io_ctx_);
asio::error_code ec;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The DNS resolution and TCP connection establishment are performed synchronously here. Since getConnection is called by async_send, async_read, and async_write on the caller's thread, this can cause significant blocking, especially if the peer is slow or unreachable. Consider using asio::ip::tcp::resolver::async_resolve and asio::async_connect to maintain the asynchronous nature of the API.

uintptr_t src = std::get<0>(chunk) + std::get<1>(chunk);
size_t len = std::get<2>(chunk);

auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The peer_host_ variable might be empty if connect() was not called or failed. Calling getConnection with an empty host will trigger a DNS resolution for an empty string, which is inefficient and may lead to unexpected behavior. It is better to check is_connected() or validate peer_host_ before attempting to get a connection.

bool is_cuda = false;
#ifdef USE_CUDA
if (is_cuda_memory(send_ptr)) {
auto* buf = new char[len];
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Manual memory management with new char[] is used here for CUDA staging. If an exception occurs or if the async_write callback is not triggered correctly, this memory will leak. Consider using std::unique_ptr<char[]> or std::vector<char> to ensure exception safety and automatic cleanup.

Comment thread dlslime/csrc/engine/tcp/tcp_future.h

// Always drain the full send payload from the wire. If recv buffer
// is smaller, read into a temp buffer then copy what fits.
size_t n_read = static_cast<size_t>(header_.size);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Casting uint64_t to size_t can lead to truncation on 32-bit platforms. While this might not be the primary target, it's safer to use std::min with a defined maximum message size limit to prevent overflow and potential DoS attacks.

Comment on lines +177 to +180
auto cu_err = cudaMemcpy(reinterpret_cast<void*>(real_addr), ptr,
len, cudaMemcpyHostToDevice);
if (cu_err != cudaSuccess)
SLIME_LOG_ERROR("readBody cudaMemcpy H2D: ", cudaGetErrorString(cu_err));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performing a synchronous cudaMemcpy inside an Asio callback blocks the io_context event loop thread. This prevents the thread from processing other concurrent I/O operations, significantly impacting performance. Consider using cudaMemcpyAsync with a dedicated stream or offloading the copy to a worker thread.

@HaoLiuuu
Copy link
Copy Markdown
Collaborator

HaoLiuuu commented May 20, 2026

Dear Shen Hao, thanks a lot for this work. I learned a lot from reading through the TCP backend (on thetcp-v4 branch at ab6e815), and the overall layering across endpoint / session / connection pool / memory pool / future is very clear.

I think there are two design points worth further discussion:

  1. The current send/recv path seems to rely on recv being posted before send, which makes the TCP send/recv semantics look like a fairly strict posted-recv model.
  2. The read/write path strongly depends on remote MR metadata being exchanged in advance, and remote_pool_ currently looks more like a static snapshot imported at connect time, without a dynamic synchronization path for newly registered remote regions.

Overall, I think the direction is very good and the foundation is already strong. These points are mostly about clarifying a few core semantics early, and I’m happy to keep discussing them together😁.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants