diff --git a/tensorrt_llm/_torch/pyexecutor/kv_cache_manager_v2.py b/tensorrt_llm/_torch/pyexecutor/kv_cache_manager_v2.py index 02e48743465c..92a0578d26f3 100644 --- a/tensorrt_llm/_torch/pyexecutor/kv_cache_manager_v2.py +++ b/tensorrt_llm/_torch/pyexecutor/kv_cache_manager_v2.py @@ -494,6 +494,14 @@ def __init__( ) logger.info(f"[KVCacheManager] execution_stream: {self._stream}") + # Per-call host staging buffers for copy_batch_block_offsets, kept alive + # until their consuming async gather kernel drains (nvbug 6293536). Each + # entry is (cuda_event, host_block_offsets, copy_index); retired once the + # event completes. See copy_batch_block_offsets for the rationale. + self._inflight_block_offset_buffers: List[ + Tuple[torch.cuda.Event, torch.Tensor, torch.Tensor] + ] = [] + # Determine max_attention_window_vec if kv_cache_config.max_attention_window is not None: self.max_attention_window_vec = ( @@ -1930,15 +1938,65 @@ def copy_batch_block_offsets( copy_idx = self.index_mapper.get_copy_index(request_ids, num_contexts, beam_width) assert copy_idx.shape[0] == num_seqs + # nvbug 6293536: copy_batch_block_offsets_to_device launches an + # asynchronous gather kernel that reads its host inputs at *execution* + # time, not enqueue time. Two of those inputs are reused in place across + # iterations: host_kv_cache_block_offsets (the C++ KV cache writes page + # indices straight into a sequence's slot, and a freed slot is rebound to + # a different request on reuse) and copy_idx (a slice of the IndexMapper's + # single persistent copyIndex_ buffer, overwritten by the next + # get_copy_index call). Under the overlap scheduler the CPU runs an + # iteration ahead, so either could be clobbered before this iteration's + # still-pending kernel drains, making the kernel gather another batch's + # blocks. Snapshot the rows this call needs into a fresh pinned buffer + # and feed the kernel an identity index. Unlike the v1 + # KVCacheManager.copy_batch_block_offsets fix, we cannot rely on PyTorch's + # caching host allocator to keep the freed buffer alive: that protection + # is only recorded for cudaMemcpyAsync out of pinned memory, not for a + # custom kernel reading host memory directly. So keep an explicit + # reference to each per-call buffer and retire it only once a CUDA event + # recorded after the kernel completes (see _inflight_block_offset_buffers). + # + # The persistent host_kv_cache_block_offsets attribute is intentionally + # left in place: the C++ KV cache holds raw pointers into it + # (set_base_page_index_buf) and same-iteration CPU readers (DSA sparse + # attention) expect it to hold the current page table. + + # Retire buffers whose consuming kernel has drained. Kernels run in order + # on a single stream, so their events complete FIFO. + inflight = self._inflight_block_offset_buffers + while inflight and inflight[0][0].query(): + inflight.pop(0) + + host_block_offsets = torch.zeros( + self.num_pools, + num_seqs, + 2, # key and value; only the key row is read by the gather kernel + self.max_blocks_per_seq, + dtype=torch.int32, + pin_memory=prefer_pinned(), + device="cpu", + ) + host_block_offsets[:, :, 0, :] = self.host_kv_cache_block_offsets[ + :, copy_idx.to(torch.long), 0, : + ] + identity_idx = torch.arange( + num_seqs, dtype=torch.int32, pin_memory=prefer_pinned(), device="cpu" + ) + copy_batch_block_offsets_to_device( - self.host_kv_cache_block_offsets, + host_block_offsets, dst_tensor, - copy_idx, + identity_idx, self.index_scales, self.kv_offset, self._stream.cuda_stream, ) + done = torch.cuda.Event() + done.record(self._stream) + inflight.append((done, host_block_offsets, identity_idx)) + def _create_kv_cache( self, request_id: int, diff --git a/tensorrt_llm/_torch/pyexecutor/resource_manager.py b/tensorrt_llm/_torch/pyexecutor/resource_manager.py index e4ad48050606..334855cec41e 100644 --- a/tensorrt_llm/_torch/pyexecutor/resource_manager.py +++ b/tensorrt_llm/_torch/pyexecutor/resource_manager.py @@ -2070,6 +2070,13 @@ def pin_blocks(self, request_id: int): def copy_batch_block_offsets(self, dst_tensor: torch.Tensor, request_ids: List[int], beam_width: int, num_context: int, num_seqs: int): + # Fill the persistent host buffer in place, exactly as before. CPU-side + # consumers read self.host_kv_cache_block_offsets directly and depend on + # its persistent, max_batch-sized layout: DSA sparse attention, the + # speculative-decoding draft/target swap, and the KV-cache relocation + # path. Reassigning this attribute to a per-call, num_seqs-sized buffer + # broke those readers (nvbug 6293536 regression), so it must stay the + # buffer they see. if self.kv_cache_type == CacheTypeCpp.CROSS and beam_width > 1: # This branch is reached only via attribute aliasing, never a # direct cross_kv_cache_manager.copy_batch_block_offsets(...) call: @@ -2092,14 +2099,18 @@ def copy_batch_block_offsets(self, dst_tensor: torch.Tensor, # tensor whose rows are decoder-sequence scoped. self.impl.copy_batch_block_offsets(self.host_kv_cache_block_offsets, request_ids, 1, 0) - for pool_idx in range(self.host_kv_cache_block_offsets.shape[0]): + # Snapshot the rows this call reads into a fresh pinned buffer so the + # async H2D below has a private, immutable source (see the non-cross + # path for the full rationale). + num_rows = num_context + num_gen_requests + host_block_offsets = self._stage_block_offsets_for_copy(num_rows) + for pool_idx in range(self.num_pools): if num_context > 0: dst_tensor[pool_idx, :num_context].copy_( - self.host_kv_cache_block_offsets[ - pool_idx, :num_context], + host_block_offsets[pool_idx, :num_context], non_blocking=True) if num_gen_requests > 0: - gen_block_offsets = self.host_kv_cache_block_offsets[ + gen_block_offsets = host_block_offsets[ pool_idx, num_context:num_context + num_gen_requests] dst_tensor[pool_idx, num_context:num_seqs].copy_( gen_block_offsets.repeat_interleave(beam_width, dim=0), @@ -2112,10 +2123,36 @@ def copy_batch_block_offsets(self, dst_tensor: torch.Tensor, request_ids[num_context:], beam_width, num_context) - for pool_idx in range(self.host_kv_cache_block_offsets.shape[0]): - dst_tensor[pool_idx, :num_seqs].copy_( - self.host_kv_cache_block_offsets[pool_idx, :num_seqs], - non_blocking=True) + # The H2D copies below are asynchronous, so their source is read at copy + # execution time, not enqueue time. With the overlap scheduler the CPU + # runs an iteration ahead, so copying straight from the persistent buffer + # let the next iteration's in-place refill clobber the source of this + # iteration's still-pending H2D -> the attention kernel indexed another + # batch's blocks (nvbug 6293536; the window is widened when host + # offloading stalls the execution stream in front of the H2D). Stage the + # async copy through a freshly-allocated pinned buffer instead; the + # caching host allocator holds it until the consuming copy completes, + # matching the already-safe kv_lens / block_ids_per_seq staging. The + # persistent buffer above is untouched by this and stays valid for the + # synchronous CPU readers. + host_block_offsets = self._stage_block_offsets_for_copy(num_seqs) + for pool_idx in range(self.num_pools): + dst_tensor[pool_idx, :num_seqs].copy_(host_block_offsets[pool_idx], + non_blocking=True) + + def _stage_block_offsets_for_copy(self, num_rows: int) -> torch.Tensor: + """Snapshot the first ``num_rows`` rows of the persistent host block + offset buffer into a fresh pinned buffer, to serve as the private source + of an asynchronous H2D copy (nvbug 6293536).""" + host_block_offsets = torch.empty(self.num_pools, + num_rows, + 2, + self.max_blocks_per_seq, + dtype=torch.int32, + pin_memory=prefer_pinned(), + device='cpu') + host_block_offsets.copy_(self.host_kv_cache_block_offsets[:, :num_rows]) + return host_block_offsets def truncate_blocks(self, target_tokens: List[int], num_tokens_to_keep: int): diff --git a/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race.py b/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race.py new file mode 100644 index 000000000000..92aba91ead1b --- /dev/null +++ b/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression test for nvbug 6293536. + +``KVCacheManager.copy_batch_block_offsets`` ships each iteration's block-offset +page tables to the device with an *asynchronous* H2D. The async copy reads its +source at execution time, not enqueue time, so the host staging buffer must not +be overwritten before the copy drains. Under the overlap scheduler the CPU runs +an iteration ahead, so a single persistently-reused staging buffer let the next +iteration's in-place overwrite clobber the source of the previous iteration's +still-pending H2D -> the attention kernel indexed another batch's blocks +(memory corruption). The fix stages through a freshly-allocated pinned buffer +each call. + +This test drives that exact window deterministically: it stalls the stream with +``torch.cuda._sleep`` (standing in for the production ``syncTransfers`` offload +stall that delays the H2D), enqueues batch A's async H2D behind the stall, then +lets the host immediately issue batch B before A's copy can drain. With the bug +present, batch A's device tensor ends up holding batch B's offsets. +""" + +import pytest +import torch + +from tensorrt_llm._torch.pyexecutor.resource_manager import KVCacheManager +from tensorrt_llm.bindings import DataType, LayerType +from tensorrt_llm.bindings import ModelConfig as ModelConfigCpp +from tensorrt_llm.bindings.internal.batch_manager import CacheType +from tensorrt_llm.llmapi.llm_args import KvCacheConfig +from tensorrt_llm.mapping import Mapping + +# Small geometry so each sequence spans several blocks and the two batches get +# visibly different physical block indices. +_NUM_LAYERS = 2 +_NUM_KV_HEADS = 2 +_HEAD_DIM = 16 +_TOKENS_PER_BLOCK = 16 +_MAX_SEQ_LEN = 256 +_BATCH = 6 +_TOKENS_PER_SEQ = _TOKENS_PER_BLOCK * 5 +# Long GPU stall so the host reliably wins the race and overwrites the staging +# buffer before batch A's H2D drains. Generous to stay deterministic on fast GPUs. +_SLEEP_CYCLES = 2_000_000_000 + + +def _build_manager(): + model_config = ModelConfigCpp( + vocab_size=32000, + num_layers=_NUM_LAYERS, + num_attention_layers=_NUM_LAYERS, + num_rnn_layers=0, + num_heads=_NUM_KV_HEADS, + hidden_size=_NUM_KV_HEADS * _HEAD_DIM, + data_type=DataType.HALF, + ) + model_config.layer_types = [LayerType.ATTENTION] * _NUM_LAYERS + model_config.set_num_kv_heads(_NUM_KV_HEADS) + + kv_cache_config = KvCacheConfig( + max_tokens=4096, free_gpu_memory_fraction=0.2, enable_block_reuse=False + ) + mapping = Mapping(world_size=1, tp_size=1, pp_size=1) + + return KVCacheManager( + kv_cache_config=kv_cache_config, + kv_cache_type=CacheType.SELF, + num_layers=_NUM_LAYERS, + num_kv_heads=_NUM_KV_HEADS, + head_dim=_HEAD_DIM, + tokens_per_block=_TOKENS_PER_BLOCK, + max_seq_len=_MAX_SEQ_LEN, + max_batch_size=2 * _BATCH, + mapping=mapping, + dtype=DataType.HALF, + model_config=model_config, + max_beam_width=1, + ) + + +def _empty_device_offsets(mgr): + return torch.zeros( + mgr.num_pools, 2 * _BATCH, 2, mgr.max_blocks_per_seq, dtype=torch.int32, device="cuda" + ) + + +def _reference_offsets(mgr, ids): + """Non-racy ground truth: copy followed by an immediate synchronize.""" + dst = _empty_device_offsets(mgr) + mgr.copy_batch_block_offsets(dst, ids, 1, len(ids), len(ids)) + torch.cuda.synchronize() + return dst[:, : len(ids)].clone() + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires a CUDA device") +def test_copy_batch_block_offsets_survives_overlap_overwrite(): + mgr = _build_manager() + + # Two disjoint batches -> different physical block indices. + ids_a = list(range(1, 1 + _BATCH)) + ids_b = list(range(101, 101 + _BATCH)) + toks = [_TOKENS_PER_SEQ] * _BATCH + mgr.add_dummy_requests(request_ids=ids_a, token_nums=toks, prepare_resource=True) + mgr.add_dummy_requests(request_ids=ids_b, token_nums=toks, prepare_resource=True) + + ref_a = _reference_offsets(mgr, ids_a) + ref_b = _reference_offsets(mgr, ids_b) + assert not torch.equal(ref_a, ref_b), ( + "test setup invalid: batches A and B produced identical offsets" + ) + + # Drive the overlap window: stall the stream, enqueue batch A's async H2D + # behind the stall, then let the host run ahead and issue batch B (whose + # host-side fill would overwrite a shared staging buffer in place) before + # A's copy can drain. + dst_a = _empty_device_offsets(mgr) + dst_b = _empty_device_offsets(mgr) + torch.cuda.synchronize() + + torch.cuda._sleep(_SLEEP_CYCLES) + mgr.copy_batch_block_offsets(dst_a, ids_a, 1, _BATCH, _BATCH) + mgr.copy_batch_block_offsets(dst_b, ids_b, 1, _BATCH, _BATCH) + torch.cuda.synchronize() + + # Batch A's device offsets must still be batch A's, not clobbered by B. + assert torch.equal(dst_a[:, :_BATCH], ref_a), ( + "nvbug 6293536: batch A's async H2D read another batch's host staging " + "buffer (overlap-scheduler data race on host_kv_cache_block_offsets)" + ) + # And batch B's copy is independently correct. + assert torch.equal(dst_b[:, :_BATCH], ref_b) diff --git a/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race_v2.py b/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race_v2.py new file mode 100644 index 000000000000..6081b0b94d2b --- /dev/null +++ b/tests/unittest/_torch/executor/test_kv_block_offset_overlap_race_v2.py @@ -0,0 +1,152 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression test for nvbug 6293536 on KVCacheManagerV2. + +``KVCacheManagerV2.copy_batch_block_offsets`` ships each iteration's block +offsets to the device with an *asynchronous* gather kernel +(``copy_batch_block_offsets_to_device``). The kernel reads its host inputs at +execution time, not enqueue time, so they must not be overwritten before the +copy drains. Two of those inputs are reused in place across iterations: + +* ``host_kv_cache_block_offsets`` -- the C++ KV cache writes page indices + straight into a sequence's slot, and a freed slot is rebound to a different + request on reuse; and +* the ``copy_idx`` returned by ``IndexMapper.get_copy_index`` -- a slice of a + single persistent pinned ``copyIndex_`` buffer overwritten by the next call. + +Under the overlap scheduler the CPU runs an iteration ahead, so either could be +clobbered before the previous iteration's still-pending copy drains -> the +kernel gathers another batch's blocks (memory corruption). The fix snapshots the +rows each call needs into a fresh pinned buffer and feeds the kernel an identity +index. + +This drives that window deterministically with two simultaneously-live batches +(so their page-table slots differ): it stalls the stream with +``torch.cuda._sleep``, enqueues batch A's async gather behind the stall, then +lets the host immediately issue batch B before A's copy can drain. With the bug +present, batch B's ``get_copy_index`` overwrites the shared ``copyIndex_`` buffer +in place, so batch A's still-pending kernel gathers batch B's slots. +""" + +import pytest +import torch + +from tensorrt_llm.bindings import DataType, LayerType +from tensorrt_llm.bindings import ModelConfig as ModelConfigCpp +from tensorrt_llm.bindings.internal.batch_manager import CacheType +from tensorrt_llm.llmapi.llm_args import KvCacheConfig +from tensorrt_llm.mapping import Mapping + +# Small geometry so each sequence spans several blocks and the two batches get +# visibly different physical block indices. +_NUM_LAYERS = 2 +_NUM_KV_HEADS = 2 +_HEAD_DIM = 16 +_TOKENS_PER_BLOCK = 16 +_MAX_SEQ_LEN = 256 +_BATCH = 6 +_TOKENS_PER_SEQ = _TOKENS_PER_BLOCK * 5 +# Long GPU stall so the host reliably wins the race and overwrites the shared +# copy-index buffer before batch A's gather drains. +_SLEEP_CYCLES = 2_000_000_000 + + +def _build_manager(): + from tensorrt_llm._torch.pyexecutor.kv_cache_manager_v2 import KVCacheManagerV2 + + model_config = ModelConfigCpp( + vocab_size=32000, + num_layers=_NUM_LAYERS, + num_attention_layers=_NUM_LAYERS, + num_rnn_layers=0, + num_heads=_NUM_KV_HEADS, + hidden_size=_NUM_KV_HEADS * _HEAD_DIM, + data_type=DataType.HALF, + ) + model_config.layer_types = [LayerType.ATTENTION] * _NUM_LAYERS + model_config.set_num_kv_heads(_NUM_KV_HEADS) + + kv_cache_config = KvCacheConfig( + max_tokens=4096, free_gpu_memory_fraction=0.2, enable_block_reuse=False + ) + mapping = Mapping(world_size=1, tp_size=1, pp_size=1) + + return KVCacheManagerV2( + kv_cache_config=kv_cache_config, + kv_cache_type=CacheType.SELF, + num_layers=_NUM_LAYERS, + num_kv_heads=_NUM_KV_HEADS, + head_dim=_HEAD_DIM, + tokens_per_block=_TOKENS_PER_BLOCK, + max_seq_len=_MAX_SEQ_LEN, + max_batch_size=2 * _BATCH, + mapping=mapping, + dtype=DataType.HALF, + model_config=model_config, + max_beam_width=1, + ) + + +def _empty_device_offsets(mgr): + return torch.zeros( + mgr.num_pools, 2 * _BATCH, 2, mgr.max_blocks_per_seq, dtype=torch.int32, device="cuda" + ) + + +def _reference_offsets(mgr, ids): + """Non-racy ground truth: copy followed by an immediate synchronize.""" + dst = _empty_device_offsets(mgr) + mgr.copy_batch_block_offsets(dst, ids, 1, len(ids), len(ids)) + torch.cuda.synchronize() + return dst[:, : len(ids)].clone() + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires a CUDA device") +def test_copy_batch_block_offsets_v2_survives_overlap_overwrite(): + mgr = _build_manager() + + # Two disjoint batches -> different physical block indices and, because both + # are kept live, different IndexMapper slots. + ids_a = list(range(1, 1 + _BATCH)) + ids_b = list(range(101, 101 + _BATCH)) + toks = [_TOKENS_PER_SEQ] * _BATCH + mgr.add_dummy_requests(request_ids=ids_a, token_nums=toks, prepare_resource=True) + mgr.add_dummy_requests(request_ids=ids_b, token_nums=toks, prepare_resource=True) + + ref_a = _reference_offsets(mgr, ids_a) + ref_b = _reference_offsets(mgr, ids_b) + assert not torch.equal(ref_a, ref_b), ( + "test setup invalid: batches A and B produced identical offsets" + ) + + # Drive the overlap window: stall the stream, enqueue batch A's async gather + # behind the stall, then let the host run ahead and issue batch B (whose + # get_copy_index overwrites the shared copy-index buffer in place) before + # A's copy can drain. + dst_a = _empty_device_offsets(mgr) + dst_b = _empty_device_offsets(mgr) + torch.cuda.synchronize() + + torch.cuda._sleep(_SLEEP_CYCLES) + mgr.copy_batch_block_offsets(dst_a, ids_a, 1, _BATCH, _BATCH) + mgr.copy_batch_block_offsets(dst_b, ids_b, 1, _BATCH, _BATCH) + torch.cuda.synchronize() + + # Batch A's device offsets must still be batch A's, not clobbered by B. + assert torch.equal(dst_a[:, :_BATCH], ref_a), ( + "nvbug 6293536: batch A's async gather read another batch's host inputs " + "(overlap-scheduler data race in KVCacheManagerV2.copy_batch_block_offsets)" + ) + # And batch B's copy is independently correct. + assert torch.equal(dst_b[:, :_BATCH], ref_b)