Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions tensorrt_llm/_torch/pyexecutor/kv_cache_manager_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,14 @@ def __init__(
)
logger.info(f"[KVCacheManager] execution_stream: {self._stream}")

# Per-call host staging buffers for copy_batch_block_offsets, kept alive
# until their consuming async gather kernel drains (nvbug 6293536). Each
# entry is (cuda_event, host_block_offsets, copy_index); retired once the
# event completes. See copy_batch_block_offsets for the rationale.
self._inflight_block_offset_buffers: List[
Tuple[torch.cuda.Event, torch.Tensor, torch.Tensor]
] = []

# Determine max_attention_window_vec
if kv_cache_config.max_attention_window is not None:
self.max_attention_window_vec = (
Expand Down Expand Up @@ -1930,15 +1938,65 @@ def copy_batch_block_offsets(
copy_idx = self.index_mapper.get_copy_index(request_ids, num_contexts, beam_width)
assert copy_idx.shape[0] == num_seqs

# nvbug 6293536: copy_batch_block_offsets_to_device launches an
# asynchronous gather kernel that reads its host inputs at *execution*
# time, not enqueue time. Two of those inputs are reused in place across
# iterations: host_kv_cache_block_offsets (the C++ KV cache writes page
# indices straight into a sequence's slot, and a freed slot is rebound to
# a different request on reuse) and copy_idx (a slice of the IndexMapper's
# single persistent copyIndex_ buffer, overwritten by the next
# get_copy_index call). Under the overlap scheduler the CPU runs an
# iteration ahead, so either could be clobbered before this iteration's
# still-pending kernel drains, making the kernel gather another batch's
# blocks. Snapshot the rows this call needs into a fresh pinned buffer
# and feed the kernel an identity index. Unlike the v1
# KVCacheManager.copy_batch_block_offsets fix, we cannot rely on PyTorch's
# caching host allocator to keep the freed buffer alive: that protection
# is only recorded for cudaMemcpyAsync out of pinned memory, not for a
# custom kernel reading host memory directly. So keep an explicit
# reference to each per-call buffer and retire it only once a CUDA event
# recorded after the kernel completes (see _inflight_block_offset_buffers).
#
# The persistent host_kv_cache_block_offsets attribute is intentionally
# left in place: the C++ KV cache holds raw pointers into it
# (set_base_page_index_buf) and same-iteration CPU readers (DSA sparse
# attention) expect it to hold the current page table.

# Retire buffers whose consuming kernel has drained. Kernels run in order
# on a single stream, so their events complete FIFO.
inflight = self._inflight_block_offset_buffers
while inflight and inflight[0][0].query():
inflight.pop(0)

host_block_offsets = torch.zeros(
self.num_pools,
num_seqs,
2, # key and value; only the key row is read by the gather kernel
self.max_blocks_per_seq,
dtype=torch.int32,
pin_memory=prefer_pinned(),
device="cpu",
)
host_block_offsets[:, :, 0, :] = self.host_kv_cache_block_offsets[
:, copy_idx.to(torch.long), 0, :
]
identity_idx = torch.arange(
num_seqs, dtype=torch.int32, pin_memory=prefer_pinned(), device="cpu"
)

copy_batch_block_offsets_to_device(
self.host_kv_cache_block_offsets,
host_block_offsets,
dst_tensor,
copy_idx,
identity_idx,
self.index_scales,
self.kv_offset,
self._stream.cuda_stream,
)

done = torch.cuda.Event()
done.record(self._stream)
inflight.append((done, host_block_offsets, identity_idx))

def _create_kv_cache(
self,
request_id: int,
Expand Down
53 changes: 45 additions & 8 deletions tensorrt_llm/_torch/pyexecutor/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,13 @@ def pin_blocks(self, request_id: int):
def copy_batch_block_offsets(self, dst_tensor: torch.Tensor,
request_ids: List[int], beam_width: int,
num_context: int, num_seqs: int):
# Fill the persistent host buffer in place, exactly as before. CPU-side
# consumers read self.host_kv_cache_block_offsets directly and depend on
# its persistent, max_batch-sized layout: DSA sparse attention, the
# speculative-decoding draft/target swap, and the KV-cache relocation
# path. Reassigning this attribute to a per-call, num_seqs-sized buffer
# broke those readers (nvbug 6293536 regression), so it must stay the
# buffer they see.
if self.kv_cache_type == CacheTypeCpp.CROSS and beam_width > 1:
# This branch is reached only via attribute aliasing, never a
# direct cross_kv_cache_manager.copy_batch_block_offsets(...) call:
Expand All @@ -2092,14 +2099,18 @@ def copy_batch_block_offsets(self, dst_tensor: torch.Tensor,
# tensor whose rows are decoder-sequence scoped.
self.impl.copy_batch_block_offsets(self.host_kv_cache_block_offsets,
request_ids, 1, 0)
for pool_idx in range(self.host_kv_cache_block_offsets.shape[0]):
# Snapshot the rows this call reads into a fresh pinned buffer so the
# async H2D below has a private, immutable source (see the non-cross
# path for the full rationale).
num_rows = num_context + num_gen_requests
host_block_offsets = self._stage_block_offsets_for_copy(num_rows)
for pool_idx in range(self.num_pools):
if num_context > 0:
dst_tensor[pool_idx, :num_context].copy_(
self.host_kv_cache_block_offsets[
pool_idx, :num_context],
host_block_offsets[pool_idx, :num_context],
non_blocking=True)
if num_gen_requests > 0:
gen_block_offsets = self.host_kv_cache_block_offsets[
gen_block_offsets = host_block_offsets[
pool_idx, num_context:num_context + num_gen_requests]
dst_tensor[pool_idx, num_context:num_seqs].copy_(
gen_block_offsets.repeat_interleave(beam_width, dim=0),
Expand All @@ -2112,10 +2123,36 @@ def copy_batch_block_offsets(self, dst_tensor: torch.Tensor,
request_ids[num_context:],
beam_width, num_context)

for pool_idx in range(self.host_kv_cache_block_offsets.shape[0]):
dst_tensor[pool_idx, :num_seqs].copy_(
self.host_kv_cache_block_offsets[pool_idx, :num_seqs],
non_blocking=True)
# The H2D copies below are asynchronous, so their source is read at copy
# execution time, not enqueue time. With the overlap scheduler the CPU
# runs an iteration ahead, so copying straight from the persistent buffer
# let the next iteration's in-place refill clobber the source of this
# iteration's still-pending H2D -> the attention kernel indexed another
# batch's blocks (nvbug 6293536; the window is widened when host
# offloading stalls the execution stream in front of the H2D). Stage the
# async copy through a freshly-allocated pinned buffer instead; the
# caching host allocator holds it until the consuming copy completes,
# matching the already-safe kv_lens / block_ids_per_seq staging. The
# persistent buffer above is untouched by this and stays valid for the
# synchronous CPU readers.
host_block_offsets = self._stage_block_offsets_for_copy(num_seqs)
for pool_idx in range(self.num_pools):
dst_tensor[pool_idx, :num_seqs].copy_(host_block_offsets[pool_idx],
non_blocking=True)

def _stage_block_offsets_for_copy(self, num_rows: int) -> torch.Tensor:
"""Snapshot the first ``num_rows`` rows of the persistent host block
offset buffer into a fresh pinned buffer, to serve as the private source
of an asynchronous H2D copy (nvbug 6293536)."""
host_block_offsets = torch.empty(self.num_pools,
num_rows,
2,
self.max_blocks_per_seq,
dtype=torch.int32,
pin_memory=prefer_pinned(),
device='cpu')
host_block_offsets.copy_(self.host_kv_cache_block_offsets[:, :num_rows])
return host_block_offsets

def truncate_blocks(self, target_tokens: List[int],
num_tokens_to_keep: int):
Expand Down
141 changes: 141 additions & 0 deletions tests/unittest/_torch/executor/test_kv_block_offset_overlap_race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Regression test for nvbug 6293536.

``KVCacheManager.copy_batch_block_offsets`` ships each iteration's block-offset
page tables to the device with an *asynchronous* H2D. The async copy reads its
source at execution time, not enqueue time, so the host staging buffer must not
be overwritten before the copy drains. Under the overlap scheduler the CPU runs
an iteration ahead, so a single persistently-reused staging buffer let the next
iteration's in-place overwrite clobber the source of the previous iteration's
still-pending H2D -> the attention kernel indexed another batch's blocks
(memory corruption). The fix stages through a freshly-allocated pinned buffer
each call.

This test drives that exact window deterministically: it stalls the stream with
``torch.cuda._sleep`` (standing in for the production ``syncTransfers`` offload
stall that delays the H2D), enqueues batch A's async H2D behind the stall, then
lets the host immediately issue batch B before A's copy can drain. With the bug
present, batch A's device tensor ends up holding batch B's offsets.
"""

import pytest
import torch

from tensorrt_llm._torch.pyexecutor.resource_manager import KVCacheManager
from tensorrt_llm.bindings import DataType, LayerType
from tensorrt_llm.bindings import ModelConfig as ModelConfigCpp
from tensorrt_llm.bindings.internal.batch_manager import CacheType
from tensorrt_llm.llmapi.llm_args import KvCacheConfig
from tensorrt_llm.mapping import Mapping

# Small geometry so each sequence spans several blocks and the two batches get
# visibly different physical block indices.
_NUM_LAYERS = 2
_NUM_KV_HEADS = 2
_HEAD_DIM = 16
_TOKENS_PER_BLOCK = 16
_MAX_SEQ_LEN = 256
_BATCH = 6
_TOKENS_PER_SEQ = _TOKENS_PER_BLOCK * 5
# Long GPU stall so the host reliably wins the race and overwrites the staging
# buffer before batch A's H2D drains. Generous to stay deterministic on fast GPUs.
_SLEEP_CYCLES = 2_000_000_000


def _build_manager():
model_config = ModelConfigCpp(
vocab_size=32000,
num_layers=_NUM_LAYERS,
num_attention_layers=_NUM_LAYERS,
num_rnn_layers=0,
num_heads=_NUM_KV_HEADS,
hidden_size=_NUM_KV_HEADS * _HEAD_DIM,
data_type=DataType.HALF,
)
model_config.layer_types = [LayerType.ATTENTION] * _NUM_LAYERS
model_config.set_num_kv_heads(_NUM_KV_HEADS)

kv_cache_config = KvCacheConfig(
max_tokens=4096, free_gpu_memory_fraction=0.2, enable_block_reuse=False
)
mapping = Mapping(world_size=1, tp_size=1, pp_size=1)

return KVCacheManager(
kv_cache_config=kv_cache_config,
kv_cache_type=CacheType.SELF,
num_layers=_NUM_LAYERS,
num_kv_heads=_NUM_KV_HEADS,
head_dim=_HEAD_DIM,
tokens_per_block=_TOKENS_PER_BLOCK,
max_seq_len=_MAX_SEQ_LEN,
max_batch_size=2 * _BATCH,
mapping=mapping,
dtype=DataType.HALF,
model_config=model_config,
max_beam_width=1,
)


def _empty_device_offsets(mgr):
return torch.zeros(
mgr.num_pools, 2 * _BATCH, 2, mgr.max_blocks_per_seq, dtype=torch.int32, device="cuda"
)


def _reference_offsets(mgr, ids):
"""Non-racy ground truth: copy followed by an immediate synchronize."""
dst = _empty_device_offsets(mgr)
mgr.copy_batch_block_offsets(dst, ids, 1, len(ids), len(ids))
torch.cuda.synchronize()
return dst[:, : len(ids)].clone()


@pytest.mark.skipif(not torch.cuda.is_available(), reason="requires a CUDA device")
def test_copy_batch_block_offsets_survives_overlap_overwrite():
mgr = _build_manager()

# Two disjoint batches -> different physical block indices.
ids_a = list(range(1, 1 + _BATCH))
ids_b = list(range(101, 101 + _BATCH))
toks = [_TOKENS_PER_SEQ] * _BATCH
mgr.add_dummy_requests(request_ids=ids_a, token_nums=toks, prepare_resource=True)
mgr.add_dummy_requests(request_ids=ids_b, token_nums=toks, prepare_resource=True)

ref_a = _reference_offsets(mgr, ids_a)
ref_b = _reference_offsets(mgr, ids_b)
assert not torch.equal(ref_a, ref_b), (
"test setup invalid: batches A and B produced identical offsets"
)

# Drive the overlap window: stall the stream, enqueue batch A's async H2D
# behind the stall, then let the host run ahead and issue batch B (whose
# host-side fill would overwrite a shared staging buffer in place) before
# A's copy can drain.
dst_a = _empty_device_offsets(mgr)
dst_b = _empty_device_offsets(mgr)
torch.cuda.synchronize()

torch.cuda._sleep(_SLEEP_CYCLES)
mgr.copy_batch_block_offsets(dst_a, ids_a, 1, _BATCH, _BATCH)
mgr.copy_batch_block_offsets(dst_b, ids_b, 1, _BATCH, _BATCH)
torch.cuda.synchronize()

# Batch A's device offsets must still be batch A's, not clobbered by B.
assert torch.equal(dst_a[:, :_BATCH], ref_a), (
"nvbug 6293536: batch A's async H2D read another batch's host staging "
"buffer (overlap-scheduler data race on host_kv_cache_block_offsets)"
)
# And batch B's copy is independently correct.
assert torch.equal(dst_b[:, :_BATCH], ref_b)
Loading
Loading