Conversation
Introduce a standalone-asio-based TcpEndpoint in dlslime/csrc/engine/tcp/
with four async communication primitives, all supporting timeout (default 30s).
Architecture highlights:
- 17-byte SessionHeader (Mooncake-aligned): {size, addr, opcode} with 3 opcodes
(OP_SEND, OP_READ, OP_WRITE) supporting 4 primitives (recv matched passively)
- TcpContext: shared io_context + connection pool + background thread,
multiple endpoints can share one context to reduce thread count
- TcpConnectionPool: (host, port)-keyed connection reuse, 60s idle timeout
- ServerSession: async_read callback chain (readHeader->dispatch->readBody loop)
with 64KB chunked reads for large payloads
- Symmetric connection rendezvous (is_initiator by host:port comparison)
Async primitives:
- async_send(chunk, timeout_ms=30000): post to io_ctx, async_write, signal future
- async_recv(chunk, timeout_ms=30000): FIFO registration, ServerSession matches
incoming OP_SEND, memcpy to user buffer, signal future
- async_read(assign, timeout_ms=30000): post OP_READ header, async_read response
data, connection reserved until response arrives
- async_write(assign, timeout_ms=30000): post OP_WRITE header+payload via
async_write, signal future
Timeout: SO_SNDTIMEO on socket for send/write, future.wait_for(ms) timed
busy-spin (machnet_pause) for recv/read. All return TcpFuture with wait()
and wait_for(seconds) -> int|None.
Files: 16 new (10 in tcp/), 5 modified (CMakeLists chain + bind.cpp)
Tests: 5 Python cases (send/recv, write/read, recv timeout, send timeout,
default timeout) all pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Remove void* stream from all 4 async_* methods (RDMA leftover, never used) - Remove timeout_ms from async_recv (recv timeout via future.wait_for()) - Remove ineffective SO_SNDTIMEO calls (no effect on asio::async_write) - Update pybind11 bindings and tests to match - Add tcp/plan.md with v3 architecture documentation Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- TcpEndpoint(ip, port): bind to specific NIC instead of hardcoded 0.0.0.0 - TcpEndpoint(TcpContext&): =delete until multi-endpoint semantics resolved - TcpMemoryPool: remove offset field — caller passes final address directly - TcpConnectionPool: call cleanupIdleConnections in getConnection hot path with lock parameter; remove dead connections during iteration; fix returnConnection value-type bug; kIdleTimeout 60→300s - connect: accept repeated calls (remove connected_ guard) - register_memory_region: simplified signature without offset - Python: constructor keyword args, offset removed from bindings Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
ServerSession: - remove readBody chunking (kDefaultChunkSize / transferred_ / chunk_buf_) — asio::async_read already loops internally, application-level chunking adds nothing but callback overhead - add writeBody(src, len) symmetric to readBody(dst, len) - readBody/writeBody share the same pattern: async I/O → readHeader on done - OP_SEND stays inline (needs signal between read and readHeader) TcpMemoryPool: - register_memory_region: name now mandatory (const std::string&, no optional) - reject empty name and duplicate name with SLIME_LOG_WARN + return -1 - remove handle_to_name_ vector (no longer needed) TcpConnectionPool: - cleanupIdleConnections(bool lock = true) — caller can skip internal lock - getConnection calls cleanupIdleConnections(false) on the hot path Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Introduce ClientSession as the outbound counterpart to ServerSession, driving one I/O operation per instance. Endpoint primitives now create ClientSession instead of ad-hoc lambdas. ClientSession: - start_write(hdr, payload): gather async_write header + body - start_read(hdr, dst): write OP_READ header → async_read response to dst - DoneCallback reports asio::error_code; Primitive signals OpState on done - Self-destructs via shared_ptr when async chain completes Endpoint cleanup: - async_send / async_write / async_read: ad-hoc asio::post + nested lambda replaced with ClientSession creation + start_xxx - Removed pending_reads_ / read_mu_ / next_req_id_ (no longer needed — ClientSession's start_read callback directly delivers the result) - Removed write_message helper (no longer used) - shutdown: removed pending_reads_ cleanup block Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- CUDA: char* + new[]/delete[] staging pattern (Mooncake-aligned) async_send/async_write: D2H before ClientSession; async_recv/async_read: H2D via RecvSlot::post_read callback in ServerSession - Remove is_initiator: both sides use conn_pool on-demand - connect() verifies reachability via getConnection before setting peer state - send/recv: treat chunk_tuple_t as raw pointers (no MR lookup needed for bilateral ops); read/write continue using MemoryPool for remote address resolution - PendingRecv: add staging_buf (unique_ptr<char[]>) and cuda_dst for CUDA - RecvSlot: add post_read callback for post-recv CUDA H2D before signal Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
root seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
There was a problem hiding this comment.
Code Review
This pull request introduces a TCP transport layer to the dlslime library, implementing asynchronous communication primitives (send, recv, read, write) and a wire protocol compatible with Mooncake. The implementation features a connection pool, an asio-based event loop, and memory region management. Review feedback highlights several areas for improvement, including the need for message size validation to prevent OOM attacks, transitioning synchronous connection logic to asynchronous calls to avoid blocking, and using smart pointers for safer memory management. Additionally, the reviewer recommended optimizing busy-wait loops in futures and avoiding synchronous CUDA memory copies within the event loop to maintain performance.
| auto endpoints = resolver.resolve(host, std::to_string(port)); | ||
| tcp::socket sock(io_ctx_); | ||
| asio::error_code ec; |
There was a problem hiding this comment.
The DNS resolution and TCP connection establishment are performed synchronously here. Since getConnection is called by async_send, async_read, and async_write on the caller's thread, this can cause significant blocking, especially if the peer is slow or unreachable. Consider using asio::ip::tcp::resolver::async_resolve and asio::async_connect to maintain the asynchronous nature of the API.
| uintptr_t src = std::get<0>(chunk) + std::get<1>(chunk); | ||
| size_t len = std::get<2>(chunk); | ||
|
|
||
| auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_); |
There was a problem hiding this comment.
The peer_host_ variable might be empty if connect() was not called or failed. Calling getConnection with an empty host will trigger a DNS resolution for an empty string, which is inefficient and may lead to unexpected behavior. It is better to check is_connected() or validate peer_host_ before attempting to get a connection.
| bool is_cuda = false; | ||
| #ifdef USE_CUDA | ||
| if (is_cuda_memory(send_ptr)) { | ||
| auto* buf = new char[len]; |
There was a problem hiding this comment.
|
|
||
| // Always drain the full send payload from the wire. If recv buffer | ||
| // is smaller, read into a temp buffer then copy what fits. | ||
| size_t n_read = static_cast<size_t>(header_.size); |
| auto cu_err = cudaMemcpy(reinterpret_cast<void*>(real_addr), ptr, | ||
| len, cudaMemcpyHostToDevice); | ||
| if (cu_err != cudaSuccess) | ||
| SLIME_LOG_ERROR("readBody cudaMemcpy H2D: ", cudaGetErrorString(cu_err)); |
There was a problem hiding this comment.
Performing a synchronous cudaMemcpy inside an Asio callback blocks the io_context event loop thread. This prevents the thread from processing other concurrent I/O operations, significantly impacting performance. Consider using cudaMemcpyAsync with a dedicated stream or offloading the copy to a worker thread.
|
Dear Shen Hao, thanks a lot for this work. I learned a lot from reading through the TCP backend (on the I think there are two design points worth further discussion:
Overall, I think the direction is very good and the foundation is already strong. These points are mostly about clarifying a few core semantics early, and I’m happy to keep discussing them together😁. |
add tcp end