From 18d59e5879dcfab12e565a0de75b5c1ccafadeba Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Tue, 23 Jun 2026 20:00:20 +0200 Subject: [PATCH 1/6] Use KvikIO for CAGRA-ACE --- .../all_cuda-129_arch-aarch64.yaml | 1 + .../all_cuda-129_arch-x86_64.yaml | 2 + .../all_cuda-133_arch-aarch64.yaml | 2 + .../all_cuda-133_arch-x86_64.yaml | 2 + .../bench_ann_cuda-129_arch-x86_64.yaml | 1 + .../bench_ann_cuda-133_arch-aarch64.yaml | 1 + .../bench_ann_cuda-133_arch-x86_64.yaml | 1 + .../environments/go_cuda-129_arch-x86_64.yaml | 1 + .../go_cuda-133_arch-aarch64.yaml | 1 + .../environments/go_cuda-133_arch-x86_64.yaml | 1 + .../rust_cuda-129_arch-x86_64.yaml | 1 + .../rust_cuda-133_arch-aarch64.yaml | 1 + .../rust_cuda-133_arch-x86_64.yaml | 1 + conda/recipes/libcuvs/recipe.yaml | 22 ++ cpp/CMakeLists.txt | 4 + cpp/cmake/thirdparty/get_kvikio.cmake | 40 +++ cpp/include/cuvs/util/file_io.hpp | 239 ++++++++++++--- .../neighbors/detail/cagra/cagra_build.cuh | 287 ++++++++++-------- cpp/src/neighbors/detail/hnsw.hpp | 87 +++--- cpp/src/util/file_io.cpp | 220 +++++++++++--- cpp/tests/CMakeLists.txt | 7 + cpp/tests/util/file_io_test.cpp | 218 +++++++++++++ dependencies.yaml | 44 +++ python/libcuvs/pyproject.toml | 2 + 24 files changed, 929 insertions(+), 257 deletions(-) create mode 100644 cpp/cmake/thirdparty/get_kvikio.cmake create mode 100644 cpp/tests/util/file_io_test.cpp diff --git a/conda/environments/all_cuda-129_arch-aarch64.yaml b/conda/environments/all_cuda-129_arch-aarch64.yaml index bf34a36008..39e300eade 100644 --- a/conda/environments/all_cuda-129_arch-aarch64.yaml +++ b/conda/environments/all_cuda-129_arch-aarch64.yaml @@ -27,6 +27,7 @@ dependencies: - libcurand-dev - libcusolver-dev - libcusparse-dev +- libkvikio==26.8.*,>=0.0.0a0 - libnvjitlink-dev - libopenblas<=0.3.30 - librmm==26.8.*,>=0.0.0a0 diff --git a/conda/environments/all_cuda-129_arch-x86_64.yaml b/conda/environments/all_cuda-129_arch-x86_64.yaml index 8c83164f58..f372dfdeb7 100644 --- a/conda/environments/all_cuda-129_arch-x86_64.yaml +++ b/conda/environments/all_cuda-129_arch-x86_64.yaml @@ -24,9 +24,11 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev +- libkvikio==26.8.*,>=0.0.0a0 - libnvjitlink-dev - librmm==26.8.*,>=0.0.0a0 - make diff --git a/conda/environments/all_cuda-133_arch-aarch64.yaml b/conda/environments/all_cuda-133_arch-aarch64.yaml index 2f10891439..1da8d32820 100644 --- a/conda/environments/all_cuda-133_arch-aarch64.yaml +++ b/conda/environments/all_cuda-133_arch-aarch64.yaml @@ -24,9 +24,11 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev +- libkvikio==26.8.*,>=0.0.0a0 - libnvjitlink-dev - libopenblas<=0.3.30 - librmm==26.8.*,>=0.0.0a0 diff --git a/conda/environments/all_cuda-133_arch-x86_64.yaml b/conda/environments/all_cuda-133_arch-x86_64.yaml index 0d98002ba2..77173ae228 100644 --- a/conda/environments/all_cuda-133_arch-x86_64.yaml +++ b/conda/environments/all_cuda-133_arch-x86_64.yaml @@ -24,9 +24,11 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev +- libkvikio==26.8.*,>=0.0.0a0 - libnvjitlink-dev - librmm==26.8.*,>=0.0.0a0 - make diff --git a/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml b/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml index d84bf87234..009f31cd5f 100644 --- a/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml +++ b/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml @@ -28,6 +28,7 @@ dependencies: - libboost-devel=1.87 - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml b/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml index c874298d7d..31c5e64c21 100644 --- a/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml +++ b/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml @@ -26,6 +26,7 @@ dependencies: - h5py>=3.8.0 - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml b/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml index 43b2dbce46..e2a344e6d6 100644 --- a/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml +++ b/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml @@ -28,6 +28,7 @@ dependencies: - libboost-devel=1.87 - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-129_arch-x86_64.yaml b/conda/environments/go_cuda-129_arch-x86_64.yaml index e99615ab2c..eb9fcbf319 100644 --- a/conda/environments/go_cuda-129_arch-x86_64.yaml +++ b/conda/environments/go_cuda-129_arch-x86_64.yaml @@ -22,6 +22,7 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-133_arch-aarch64.yaml b/conda/environments/go_cuda-133_arch-aarch64.yaml index eeb5511537..05b00c1537 100644 --- a/conda/environments/go_cuda-133_arch-aarch64.yaml +++ b/conda/environments/go_cuda-133_arch-aarch64.yaml @@ -22,6 +22,7 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-133_arch-x86_64.yaml b/conda/environments/go_cuda-133_arch-x86_64.yaml index 5fd2153ffa..6b02c21a87 100644 --- a/conda/environments/go_cuda-133_arch-x86_64.yaml +++ b/conda/environments/go_cuda-133_arch-x86_64.yaml @@ -22,6 +22,7 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-129_arch-x86_64.yaml b/conda/environments/rust_cuda-129_arch-x86_64.yaml index a9e9e2142c..13c2c77147 100644 --- a/conda/environments/rust_cuda-129_arch-x86_64.yaml +++ b/conda/environments/rust_cuda-129_arch-x86_64.yaml @@ -16,6 +16,7 @@ dependencies: - cxx-compiler - gcc_linux-64=14.* - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-133_arch-aarch64.yaml b/conda/environments/rust_cuda-133_arch-aarch64.yaml index 3daa10d2be..0223fba1ca 100644 --- a/conda/environments/rust_cuda-133_arch-aarch64.yaml +++ b/conda/environments/rust_cuda-133_arch-aarch64.yaml @@ -16,6 +16,7 @@ dependencies: - cxx-compiler - gcc_linux-aarch64=14.* - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-133_arch-x86_64.yaml b/conda/environments/rust_cuda-133_arch-x86_64.yaml index 9e0fe2d529..8aa9341870 100644 --- a/conda/environments/rust_cuda-133_arch-x86_64.yaml +++ b/conda/environments/rust_cuda-133_arch-x86_64.yaml @@ -16,6 +16,7 @@ dependencies: - cxx-compiler - gcc_linux-64=14.* - libcublas-dev +- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/recipes/libcuvs/recipe.yaml b/conda/recipes/libcuvs/recipe.yaml index aa7a37db44..3d645fa093 100644 --- a/conda/recipes/libcuvs/recipe.yaml +++ b/conda/recipes/libcuvs/recipe.yaml @@ -75,6 +75,12 @@ cache: - libnvjitlink-dev - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} + - libkvikio =${{ minor_version }} + # libcufile (GPUDirect Storage backend that kvikio compiles against): CUDA 13 on all arches, + # CUDA 12 on x86_64 only. + - if: cuda_major == "13" or linux64 + then: + - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -174,6 +180,10 @@ outputs: - ${{ pin_subpackage("libcuvs-headers", exact=True) }} - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} + - libkvikio =${{ minor_version }} + - if: cuda_major == "13" or linux64 + then: + - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -189,6 +199,10 @@ outputs: - ${{ pin_subpackage("libcuvs-headers", exact=True) }} - libraft-headers =${{ minor_version }} - librmm =${{ minor_version }} + - libkvikio =${{ minor_version }} + - if: cuda_major == "13" or linux64 + then: + - libcufile - cuda-nvrtc - nccl - libcublas @@ -235,6 +249,10 @@ outputs: - ${{ pin_subpackage("libcuvs-headers", exact=True) }} - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} + - libkvikio =${{ minor_version }} + - if: cuda_major == "13" or linux64 + then: + - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -250,6 +268,10 @@ outputs: - ${{ pin_subpackage("libcuvs-headers", exact=True) }} - libraft-headers =${{ minor_version }} - librmm =${{ minor_version }} + - libkvikio =${{ minor_version }} + - if: cuda_major == "13" or linux64 + then: + - libcufile - cuda-nvrtc - nccl - libcublas diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 89cbadfcfc..39f4139985 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -189,6 +189,7 @@ if(NOT BUILD_CPU_ONLY) include(${rapids-cmake-dir}/cpm/cccl.cmake) rapids_cpm_cccl() include(cmake/thirdparty/get_raft.cmake) + include(cmake/thirdparty/get_kvikio.cmake) include(cmake/thirdparty/get_cutlass.cmake) include(${rapids-cmake-dir}/cpm/cuco.cmake) rapids_cpm_cuco() @@ -1463,6 +1464,7 @@ if(NOT BUILD_CPU_ONLY) ${CUVS_CTK_MATH_DEPENDENCIES} $ $ + kvikio::kvikio ) target_include_directories( @@ -1541,6 +1543,7 @@ if(NOT BUILD_CPU_ONLY) $<$:CUDA::nvtx3> PRIVATE $ $ $ CUDA::nvJitLink CUDA::nvrtc + $ ) # ensure CUDA symbols aren't relocated to the middle of the debug build binaries @@ -1600,6 +1603,7 @@ SECTIONS $<$:CUDA::nvtx3> $ $ + $ ) endif() diff --git a/cpp/cmake/thirdparty/get_kvikio.cmake b/cpp/cmake/thirdparty/get_kvikio.cmake new file mode 100644 index 0000000000..9fb46a0dcf --- /dev/null +++ b/cpp/cmake/thirdparty/get_kvikio.cmake @@ -0,0 +1,40 @@ +# ============================================================================= +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on + +# Use RAPIDS_VERSION_MAJOR_MINOR from rapids_config.cmake +set(KVIKIO_VERSION "${RAPIDS_VERSION_MAJOR_MINOR}") +set(KVIKIO_FORK "rapidsai") +set(KVIKIO_PINNED_TAG "${rapids-cmake-checkout-tag}") + +function(find_and_configure_kvikio) + set(oneValueArgs VERSION FORK PINNED_TAG) + cmake_parse_arguments(PKG "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + # ---------------------------------------------------------------------------- + # KvikIO provides the GPUDirect Storage (cuFile) and POSIX file I/O backends used by the CAGRA + # ACE disk-mode build. It is a private, build-time dependency: it is linked into libcuvs via + # $ and therefore is NOT part of cuvs' exported interface + # (no export set / find_dependency is generated). At runtime the conda 'libkvikio' package + # provides the shared library. + # ---------------------------------------------------------------------------- + rapids_cpm_find( + kvikio ${PKG_VERSION} + GLOBAL_TARGETS kvikio::kvikio + CPM_ARGS + EXCLUDE_FROM_ALL TRUE + GIT_REPOSITORY https://github.com/${PKG_FORK}/kvikio.git + GIT_TAG ${PKG_PINNED_TAG} + GIT_SHALLOW TRUE + SOURCE_SUBDIR cpp + OPTIONS "KvikIO_BUILD_EXAMPLES OFF" "KvikIO_REMOTE_SUPPORT OFF" + ) +endfunction() + +# Change pinned tag here to test a commit in CI. +# To use a different KvikIO locally, set the CMake variable CPM_kvikio_SOURCE=/path/to/local/kvikio +find_and_configure_kvikio( + VERSION ${KVIKIO_VERSION}.00 FORK ${KVIKIO_FORK} PINNED_TAG ${KVIKIO_PINNED_TAG} +) diff --git a/cpp/include/cuvs/util/file_io.hpp b/cpp/include/cuvs/util/file_io.hpp index f02e9c2d7d..fc336a3935 100644 --- a/cpp/include/cuvs/util/file_io.hpp +++ b/cpp/include/cuvs/util/file_io.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -26,19 +27,41 @@ namespace CUVS_EXPORT cuvs { namespace util { + +/** + * @brief Alignment (in bytes) for the numpy data body in on-disk ACE artifacts. + * + * The numpy header is space-padded so that the array data begins on this boundary. This keeps the + * file numpy-compatible (readers use the stored HEADER_LEN) while letting kvikio's O_DIRECT / + * GPUDirect Storage path transfer the aligned interior directly. 4096 ensures support for most + * storage devices. + */ +inline constexpr size_t kNumpyDataAlignment = 4096; + +/** @brief Round @p x up to the next multiple of @p alignment (must be a power of two). */ +inline constexpr size_t numpy_align_up(size_t x, size_t alignment = kNumpyDataAlignment) noexcept +{ + return (x + alignment - 1) & ~(alignment - 1); +} + /** * @brief Streambuf that reads from a POSIX file descriptor */ class fd_streambuf : public std::streambuf { - int fd_; + int fd_ = -1; std::unique_ptr buffer_; - size_t buffer_size_; + size_t buffer_size_ = 0; protected: int_type underflow() override { - if (gptr() < egptr()) return traits_type::to_int_type(*gptr()); - ssize_t n = ::read(fd_, buffer_.get(), buffer_size_); + if (gptr() < egptr()) { return traits_type::to_int_type(*gptr()); } + if (fd_ == -1 || !buffer_) { return traits_type::eof(); } + + ssize_t n = 0; + do { + n = ::read(fd_, buffer_.get(), buffer_size_); + } while (n < 0 && errno == EINTR); if (n <= 0) return traits_type::eof(); setg(buffer_.get(), buffer_.get(), buffer_.get() + n); return traits_type::to_int_type(*gptr()); @@ -48,18 +71,43 @@ class fd_streambuf : public std::streambuf { explicit fd_streambuf(int fd, size_t buffer_size = 8192) : fd_(fd), buffer_(new char[buffer_size]), buffer_size_(buffer_size) { + RAFT_EXPECTS(buffer_size > 0, "fd_streambuf buffer size must be greater than zero"); setg(buffer_.get(), buffer_.get(), buffer_.get()); } - ~fd_streambuf() + ~fd_streambuf() override { close(); } + + fd_streambuf(const fd_streambuf&) = delete; + fd_streambuf& operator=(const fd_streambuf&) = delete; + + fd_streambuf(fd_streambuf&& other) noexcept + : fd_(std::exchange(other.fd_, -1)), + buffer_(std::move(other.buffer_)), + buffer_size_(std::exchange(other.buffer_size_, 0)) { - if (fd_ != -1) ::close(fd_); + setg(buffer_.get(), buffer_.get(), buffer_.get()); } - fd_streambuf(const fd_streambuf&) = delete; - fd_streambuf& operator=(const fd_streambuf&) = delete; - fd_streambuf(fd_streambuf&&) noexcept = default; - fd_streambuf& operator=(fd_streambuf&&) noexcept = default; + fd_streambuf& operator=(fd_streambuf&& other) noexcept + { + if (this != &other) { + close(); + fd_ = std::exchange(other.fd_, -1); + buffer_ = std::move(other.buffer_); + buffer_size_ = std::exchange(other.buffer_size_, 0); + setg(buffer_.get(), buffer_.get(), buffer_.get()); + } + return *this; + } + + private: + void close() noexcept + { + if (fd_ != -1) { + ::close(fd_); + fd_ = -1; + } + } }; /** @@ -91,8 +139,9 @@ class fd_istream : public std::istream { /** * @brief RAII wrapper for POSIX file descriptors * - * Manages file descriptor lifecycle with automatic cleanup. - * Non-copyable, move-only. + * Manages file descriptor lifecycle with automatic cleanup. Used to own the lifetime of disk-backed + * ACE artifacts and to parse their numpy headers; the bulk data transfers go through kvikio (see + * ::read_large_file / ::write_large_file). Non-copyable, move-only. */ class file_descriptor { public: @@ -173,19 +222,25 @@ class file_descriptor { /** * @brief Create a numpy file with pre-allocated space and write the header. * - * Opens a file, writes a numpy header for the given shape/dtype, and pre-allocates - * space for the data. This is useful for memory-mapped or streaming writes. + * Opens a file, writes a numpy header for the given shape/dtype, and pre-allocates space for the + * data. The numpy header is space-padded so that the data body begins on a ::kNumpyDataAlignment + * boundary, which keeps the file numpy-compatible (readers use the stored HEADER_LEN) while letting + * kvikio transfer the aligned interior via O_DIRECT / GPUDirect Storage. The data region is rounded + * up to a block boundary so that an O_DIRECT write of the trailing block stays within the + * allocated file. The returned descriptor owns the file lifetime; bulk data is written separately + * through kvikio (::write_large_file). * * @tparam T Data type for the numpy array * @param path File path to create * @param shape Shape of the numpy array (e.g., {rows, cols} for 2D) - * @return Pair of (file_descriptor, header_size) + * @return Pair of (file_descriptor, header_size) where header_size is the data offset in bytes */ template std::pair create_numpy_file(const std::string& path, const std::vector& shape) { - // Open file + // Open the file for the header write + preallocation. Bulk data is written via kvikio (which + // opens its own descriptor, including an O_DIRECT one when supported). file_descriptor fd(path, O_CREAT | O_RDWR | O_TRUNC, 0644); // Build header @@ -196,42 +251,86 @@ std::pair create_numpy_file(const std::string& path, std::stringstream ss; raft::detail::numpy_serializer::write_header(ss, header); std::string header_str = ss.str(); - size_t header_size = header_str.size(); + + RAFT_EXPECTS((kNumpyDataAlignment & (kNumpyDataAlignment - 1)) == 0, + "kNumpyDataAlignment must be a power of two"); + + // Re-pad the numpy v1.0 header so the data body starts on a block boundary. + // Layout: [6 bytes magic][2 bytes version][2 bytes HEADER_LEN][dict padded to HEADER_LEN]. + RAFT_EXPECTS(header_str.size() >= 10 && static_cast(header_str[6]) == 0x01, + "Expected a numpy v1.0 header to align the data body"); + std::string dict = header_str.substr(10); + while (!dict.empty() && (dict.back() == '\n' || dict.back() == ' ')) { + dict.pop_back(); + } + const size_t min_total = 10 + dict.size() + 1; // +1 for the terminating newline + const size_t aligned_total = numpy_align_up(min_total); + const size_t header_len = aligned_total - 10; + RAFT_EXPECTS(header_len <= 0xFFFF, "Aligned numpy v1.0 header is too large"); + + std::string padded_dict = dict; + padded_dict.append(header_len - dict.size() - 1, ' '); + padded_dict.push_back('\n'); + + std::string aligned_header = header_str.substr(0, 8); // magic + version + aligned_header.push_back(static_cast(header_len & 0xFF)); + aligned_header.push_back(static_cast((header_len >> 8) & 0xFF)); + aligned_header.append(padded_dict); + header_str = std::move(aligned_header); + + const size_t header_size = header_str.size(); // Calculate data size from shape + auto checked_mul = [](size_t lhs, size_t rhs) { + RAFT_EXPECTS(rhs == 0 || lhs <= std::numeric_limits::max() / rhs, + "Numpy file size calculation overflowed"); + return lhs * rhs; + }; size_t data_bytes = sizeof(T); for (auto dim : shape) { - data_bytes *= dim; - } - - // Pre-allocate file space - if (posix_fallocate(fd.get(), 0, header_size + data_bytes) != 0) { - RAFT_FAIL("Failed to pre-allocate space for file: %s", path.c_str()); + data_bytes = checked_mul(data_bytes, dim); } - // Seek to beginning and write header - if (lseek(fd.get(), 0, SEEK_SET) == -1) { - RAFT_FAIL("Failed to seek to beginning of file: %s", path.c_str()); + // Pre-allocate file space. The data region is rounded up to a block boundary so read-modify-write + // of the trailing block during an O_DIRECT write stays within the allocated file. + const size_t padded_data_bytes = numpy_align_up(data_bytes); + RAFT_EXPECTS(header_size <= std::numeric_limits::max() - padded_data_bytes, + "Numpy file size calculation overflowed"); + const size_t alloc_bytes = header_size + padded_data_bytes; + const int fallocate_status = posix_fallocate(fd.get(), 0, alloc_bytes); + if (fallocate_status != 0) { + RAFT_FAIL( + "Failed to pre-allocate space for file %s: %s", path.c_str(), strerror(fallocate_status)); } - ssize_t written = write(fd.get(), header_str.data(), header_str.size()); - if (written < 0 || static_cast(written) != header_str.size()) { - RAFT_FAIL("Failed to write numpy header to file: %s", path.c_str()); + // Write the small numpy header (one-time, buffered). + const char* hp = header_str.data(); + size_t hremaining = header_size; + off_t hoff = 0; + while (hremaining > 0) { + const size_t chunk = std::min(hremaining, static_cast(SSIZE_MAX)); + const ssize_t w = ::pwrite(fd.get(), hp, chunk, hoff); + if (w < 0 && errno == EINTR) { continue; } + RAFT_EXPECTS(w > 0, "Failed to write numpy header to %s: %s", path.c_str(), strerror(errno)); + hp += w; + hoff += w; + hremaining -= static_cast(w); } return {std::move(fd), header_size}; } /** - * @brief Read large file in chunks using pread + * @brief Read a region of a file into @p dest_ptr through kvikio. * - * Reads a file in chunks to avoid issues with very large reads. - * Uses pread for thread-safe, offset-based reading. + * Routed through kvikio::FileHandle, so it uses GPUDirect Storage (cuFile) when @p dest_ptr is + * device memory on a GDS-capable system, and the POSIX + threadpool backend (with O_DIRECT when + * available) otherwise. @p dest_ptr may be host or device memory. * - * @param fd File descriptor to read from - * @param dest_ptr Destination buffer - * @param total_bytes Total bytes to read - * @param file_offset Starting offset in file + * @param fd File descriptor identifying the file (its path is used to open the handle). + * @param dest_ptr Destination buffer (host or device). + * @param total_bytes Total bytes to read. + * @param file_offset Starting offset in file. */ void read_large_file(const file_descriptor& fd, void* dest_ptr, @@ -239,35 +338,79 @@ void read_large_file(const file_descriptor& fd, const uint64_t file_offset); /** - * @brief Write large file in chunks using pwrite + * @brief Write @p data_ptr to a region of a (pre-created) file through kvikio. * - * Writes data to a file in chunks to avoid issues with very large writes. - * Uses pwrite for thread-safe, offset-based writing. + * Counterpart of ::read_large_file. The file must already exist (e.g. created by + * ::create_numpy_file); the handle is opened in read+write mode so the existing header and + * preallocation are preserved. @p data_ptr may be host or device memory. * - * @param fd File descriptor to write to - * @param data_ptr Source data buffer - * @param total_bytes Total bytes to write - * @param file_offset Starting offset in file + * @param fd File descriptor identifying the file (its path is used to open the handle). + * @param data_ptr Source data buffer (host or device). + * @param total_bytes Total bytes to write. + * @param file_offset Starting offset in file. */ void write_large_file(const file_descriptor& fd, const void* data_ptr, const size_t total_bytes, const uint64_t file_offset); +/** + * @brief Sequential std::ostream backed by kvikio. + * + * A std::ostream whose bytes are staged into a large buffer and written to disk through kvikio, + * which bypasses the page cache via O_DIRECT when supported (and falls back to buffered POSIX + * writes otherwise). Because it is a std::ostream it can be passed anywhere an ostream is expected + * (e.g. the hnswlib serializer), routing that output through kvikio. Non-copyable, non-movable. + */ +class kvikio_ofstream : public std::ostream { + public: + /** + * @brief Open @p path for writing (created/truncated). + * + * @param path Output file path. + * @param buffer_size Staging-buffer capacity in bytes; full buffers are written at aligned + * offsets. + */ + explicit kvikio_ofstream(const std::string& path, size_t buffer_size = (size_t(32) << 20)); + + ~kvikio_ofstream() override; + + kvikio_ofstream(const kvikio_ofstream&) = delete; + kvikio_ofstream& operator=(const kvikio_ofstream&) = delete; + kvikio_ofstream(kvikio_ofstream&&) = delete; + kvikio_ofstream& operator=(kvikio_ofstream&&) = delete; + + /** @brief Flush any remaining staged bytes and close the file. */ + void close(); + + /** @brief Total number of logical bytes written so far. */ + [[nodiscard]] size_t bytes_written() const noexcept; + + private: + class sbuf; + std::unique_ptr buf_; +}; + /** * @brief Buffered output stream wrapper * - * Wraps an std::ostream with a buffer to improve write performance by - * reducing the number of system calls. Automatically flushes on destruction. - * Non-copyable, non-movable. + * Wraps an std::ostream with a buffer to improve write performance by reducing the number of + * system calls. Automatically flushes on destruction. Used by the hnswlib serializer. Non-copyable, + * non-movable. */ class buffered_ofstream { public: - buffered_ofstream(std::ostream* os, size_t buffer_size) : os_(os), buffer_(buffer_size), pos_(0) + buffered_ofstream(std::ostream* os, size_t buffer_size) : buffer_(buffer_size), os_(os), pos_(0) { } - ~buffered_ofstream() noexcept { flush(); } + ~buffered_ofstream() noexcept + { + try { + flush(); + } catch (...) { + } + } buffered_ofstream(const buffered_ofstream& res) = delete; auto operator=(const buffered_ofstream& other) -> buffered_ofstream& = delete; diff --git a/cpp/src/neighbors/detail/cagra/cagra_build.cuh b/cpp/src/neighbors/detail/cagra/cagra_build.cuh index e119d7e6ca..36404811ea 100644 --- a/cpp/src/neighbors/detail/cagra/cagra_build.cuh +++ b/cpp/src/neighbors/detail/cagra/cagra_build.cuh @@ -28,6 +28,8 @@ #include #include +#include + // TODO: This shouldn't be calling spatial/knn APIs #include "../ann_utils.cuh" @@ -35,6 +37,7 @@ #include #include +#include #include #include #include @@ -674,127 +677,109 @@ void ace_reorder_and_store_dataset( // ACE: Load partition dataset and augmented dataset from disk template void ace_load_partition_dataset_from_disk( - raft::resources const& res, - const std::string& build_dir, size_t partition_id, size_t dataset_dim, raft::host_matrix_view partition_histogram, raft::host_vector_view core_partition_offsets, raft::host_vector_view augmented_partition_offsets, - raft::host_matrix_view sub_dataset) + const cuvs::util::file_descriptor& reordered_fd, + const cuvs::util::file_descriptor& augmented_fd, + size_t reordered_header_size, + size_t augmented_header_size, + T* sub_dataset_ptr) { - size_t n_partitions = partition_histogram.extent(0); - RAFT_LOG_DEBUG("ACE: Loading partition %lu dataset from disk", partition_id); - size_t core_size = partition_histogram(partition_id, 0); - size_t augmented_size = partition_histogram(partition_id, 1); - size_t total_partition_size = core_size + augmented_size; + const std::string reordered_path = reordered_fd.get_path(); + const std::string augmented_path = augmented_fd.get_path(); + RAFT_EXPECTS(sub_dataset_ptr != nullptr, "ACE: sub-dataset destination must not be null"); + RAFT_EXPECTS(reordered_fd.is_valid() && !reordered_path.empty(), + "ACE: reordered dataset file descriptor is not valid"); + RAFT_EXPECTS(augmented_fd.is_valid() && !augmented_path.empty(), + "ACE: augmented dataset file descriptor is not valid"); + RAFT_EXPECTS(partition_id < static_cast(partition_histogram.extent(0)), + "ACE: partition id is out of range"); + RAFT_EXPECTS(partition_id < static_cast(core_partition_offsets.extent(0)), + "ACE: core partition offset is out of range"); + RAFT_EXPECTS(partition_id < static_cast(augmented_partition_offsets.extent(0)), + "ACE: augmented partition offset is out of range"); + + size_t core_size = partition_histogram(partition_id, 0); + size_t augmented_size = partition_histogram(partition_id, 1); RAFT_LOG_DEBUG("ACE: Partition %lu: %lu core + %lu augmented = %lu total vectors", partition_id, core_size, augmented_size, - total_partition_size); - - RAFT_EXPECTS(static_cast(sub_dataset.extent(0)) == total_partition_size, - "sub_dataset rows (%lu) must match total partition size (%lu)", - sub_dataset.extent(0), - total_partition_size); - RAFT_EXPECTS(static_cast(sub_dataset.extent(1)) == dataset_dim, - "sub_dataset columns (%lu) must match dataset dimensions (%lu)", - sub_dataset.extent(1), - dataset_dim); + core_size + augmented_size); const size_t vector_size = dataset_dim * sizeof(T); - const std::string reordered_dataset_path = build_dir + "/reordered_dataset.npy"; - const std::string augmented_dataset_path = build_dir + "/augmented_dataset.npy"; - - if (!std::filesystem::exists(reordered_dataset_path)) { - RAFT_FAIL("ACE: Required file does not exist: %s", reordered_dataset_path.c_str()); - } - if (!std::filesystem::exists(augmented_dataset_path)) { - RAFT_FAIL("ACE: Required file does not exist: %s", augmented_dataset_path.c_str()); - } - - size_t core_header_size = 0; - size_t augmented_header_size = 0; - size_t core_file_offset = 0; - size_t augmented_file_offset = 0; - { - std::ifstream is(reordered_dataset_path, std::ios::in | std::ios::binary); - if (!is) { RAFT_FAIL("Cannot open file %s", reordered_dataset_path.c_str()); } - auto start_pos = is.tellg(); - raft::detail::numpy_serializer::read_header(is); - core_header_size = static_cast(is.tellg() - start_pos); - } - { - std::ifstream is(augmented_dataset_path, std::ios::in | std::ios::binary); - if (!is) { RAFT_FAIL("Cannot open file %s", augmented_dataset_path.c_str()); } - auto start_pos = is.tellg(); - raft::detail::numpy_serializer::read_header(is); - augmented_header_size = static_cast(is.tellg() - start_pos); - } - - for (size_t p = 0; p < partition_id; p++) { - core_file_offset += partition_histogram(p, 0); - augmented_file_offset += partition_histogram(p, 1); - } - - core_file_offset *= vector_size; - augmented_file_offset *= vector_size; - - core_file_offset += core_header_size; - augmented_file_offset += augmented_header_size; + const size_t core_file_offset = + reordered_header_size + static_cast(core_partition_offsets(partition_id)) * vector_size; + const size_t augmented_file_offset = + augmented_header_size + + static_cast(augmented_partition_offsets(partition_id)) * vector_size; RAFT_LOG_DEBUG("ACE: Core file offset: %lu bytes, Augmented file offset: %lu bytes", core_file_offset, augmented_file_offset); - // Read core and augmented data in parallel - std::exception_ptr core_exception = nullptr; - std::exception_ptr augmented_exception = nullptr; + // Read core and augmented vectors through kvikio. @p sub_dataset_ptr may be host or device + // memory; kvikio uses GPUDirect Storage (cuFile) for device destinations on a GDS-capable system + // and the POSIX + threadpool backend (with O_DIRECT when available) otherwise. The two file + // regions are independent, so issue both reads before waiting for either one. + const size_t core_bytes = core_size * vector_size; + const size_t augmented_bytes = augmented_size * vector_size; + T* augmented_dest = sub_dataset_ptr + (core_size * dataset_dim); + + auto expect_complete_read = [partition_id](const char* name, size_t expected, size_t actual) { + RAFT_EXPECTS(actual == expected, + "ACE: Short %s read for partition %lu: expected %zu bytes, got %zu", + name, + partition_id, + expected, + actual); + }; -#pragma omp parallel sections - { -#pragma omp section - { - try { - if (core_size > 0) { - RAFT_LOG_DEBUG( - "ACE: Reading %lu core vectors from offset %lu", core_size, core_file_offset); - cuvs::util::file_descriptor reordered_fd(reordered_dataset_path, O_RDONLY); - const size_t core_bytes = core_size * vector_size; - cuvs::util::read_large_file( - reordered_fd, sub_dataset.data_handle(), core_bytes, core_file_offset); - } - } catch (...) { - core_exception = std::current_exception(); - } + if (core_bytes > 0 && augmented_bytes > 0) { + RAFT_LOG_DEBUG("ACE: Reading %lu core vectors from offset %lu", core_size, core_file_offset); + RAFT_LOG_DEBUG( + "ACE: Reading %lu augmented vectors from offset %lu", augmented_size, augmented_file_offset); + kvikio::FileHandle reordered_handle(reordered_path, "r"); + kvikio::FileHandle augmented_handle(augmented_path, "r"); + auto core_future = reordered_handle.pread(sub_dataset_ptr, core_bytes, core_file_offset); + auto augmented_future = + augmented_handle.pread(augmented_dest, augmented_bytes, augmented_file_offset); + std::exception_ptr read_exception = nullptr; + size_t core_read = 0; + size_t augmented_read = 0; + try { + core_read = core_future.get(); + } catch (...) { + read_exception = std::current_exception(); } -#pragma omp section - { - try { - if (augmented_size > 0) { - RAFT_LOG_DEBUG("ACE: Reading %lu augmented vectors from offset %lu", - augmented_size, - augmented_file_offset); - cuvs::util::file_descriptor augmented_fd(augmented_dataset_path, O_RDONLY); - const size_t augmented_bytes = augmented_size * vector_size; - T* augmented_dest = sub_dataset.data_handle() + (core_size * dataset_dim); - cuvs::util::read_large_file( - augmented_fd, augmented_dest, augmented_bytes, augmented_file_offset); - } - } catch (...) { - augmented_exception = std::current_exception(); - } + try { + augmented_read = augmented_future.get(); + } catch (...) { + if (!read_exception) { read_exception = std::current_exception(); } } + if (read_exception) { std::rethrow_exception(read_exception); } + expect_complete_read("core", core_bytes, core_read); + expect_complete_read("augmented", augmented_bytes, augmented_read); + } else if (core_bytes > 0) { + RAFT_LOG_DEBUG("ACE: Reading %lu core vectors from offset %lu", core_size, core_file_offset); + kvikio::FileHandle reordered_handle(reordered_path, "r"); + auto core_future = reordered_handle.pread(sub_dataset_ptr, core_bytes, core_file_offset); + expect_complete_read("core", core_bytes, core_future.get()); + } else if (augmented_bytes > 0) { + RAFT_LOG_DEBUG( + "ACE: Reading %lu augmented vectors from offset %lu", augmented_size, augmented_file_offset); + kvikio::FileHandle augmented_handle(augmented_path, "r"); + auto augmented_future = + augmented_handle.pread(augmented_dest, augmented_bytes, augmented_file_offset); + expect_complete_read("augmented", augmented_bytes, augmented_future.get()); } - - // Check for exceptions from parallel sections - if (core_exception) { std::rethrow_exception(core_exception); } - if (augmented_exception) { std::rethrow_exception(augmented_exception); } } // Memory requirements for ACE operation @@ -1229,7 +1214,9 @@ index build_ace(raft::resources const& res, // Mark for cleanup if we fail after creating the directory cleanup_on_failure = true; - // Create numpy files with pre-allocated space + // Create numpy files with pre-allocated space. All bulk I/O on these artifacts goes through + // kvikio, which bypasses the page cache via O_DIRECT (and uses GPUDirect Storage for device + // transfers) when supported, falling back to buffered POSIX I/O otherwise. std::tie(reordered_fd, reordered_header_size) = cuvs::util::create_numpy_file( build_dir + "/reordered_dataset.npy", {dataset_size, dataset_dim}); @@ -1351,20 +1338,78 @@ index build_ace(raft::resources const& res, core_sub_dataset_size, augmented_sub_dataset_size); - auto sub_dataset = raft::make_host_matrix(sub_dataset_size, dataset_dim); + // Index parameters depend only on partition sizes, not on where the dataset lives. + cuvs::neighbors::cagra::index_params sub_index_params; + sub_index_params = cuvs::neighbors::cagra::index_params::from_hnsw_params( + raft::make_extents(sub_dataset_size, dataset_dim), + graph_degree / 2, + ef_construction, + cuvs::neighbors::cagra::hnsw_heuristic_type::SAME_GRAPH_FOOTPRINT, + params.metric); + sub_index_params.attach_dataset_on_build = false; + sub_index_params.guarantee_connectivity = params.guarantee_connectivity; - if (use_disk_mode) { - // Load partition dataset from disk files - ace_load_partition_dataset_from_disk(res, - build_dir, - partition_id, - dataset_dim, - partition_histogram.view(), - core_partition_offsets.view(), - augmented_partition_offsets.view(), - sub_dataset.view()); - } else { - // Gather partition dataset from memory + // In disk mode, try to read this partition's vectors straight into device memory so CAGRA + // builds from a device view. This enables GPUDirect Storage (NVMe->GPU DMA via kvikio/cuFile) + // on a GDS-capable system and avoids a host staging copy. Only attempt it when the vectors + // comfortably fit free device memory (leaving headroom for the CAGRA build working set); + // otherwise fall back to a host read + CAGRA's internal host->device streaming. + auto read_end = start; + bool used_device_read = false; + const size_t sub_ds_bytes = sub_dataset_size * dataset_dim * sizeof(T); + auto sub_index = [&]() { + if (use_disk_mode) { + const size_t free_gpu_bytes = + ace_params.max_gpu_memory_gb > 0 + ? static_cast(ace_params.max_gpu_memory_gb * (1ULL << 30)) + : rmm::available_device_memory().first; + if (sub_ds_bytes < static_cast(0.4 * free_gpu_bytes)) { + try { + auto sub_dataset_dev = + raft::make_device_matrix(res, sub_dataset_size, dataset_dim); + ace_load_partition_dataset_from_disk(partition_id, + dataset_dim, + partition_histogram.view(), + core_partition_offsets.view(), + augmented_partition_offsets.view(), + reordered_fd, + augmented_fd, + reordered_header_size, + augmented_header_size, + sub_dataset_dev.data_handle()); + used_device_read = true; + read_end = std::chrono::high_resolution_clock::now(); + return cuvs::neighbors::cagra::build( + res, sub_index_params, raft::make_const_mdspan(sub_dataset_dev.view())); + } catch (const std::bad_alloc&) { + RAFT_LOG_WARN( + "ACE: partition %lu did not fit in device memory for a direct (GDS) read; falling " + "back to a host read", + partition_id); + } catch (const raft::logic_error&) { + RAFT_LOG_WARN( + "ACE: device allocation failed for partition %lu direct read; falling back to a " + "host read", + partition_id); + } + } + auto sub_dataset = raft::make_host_matrix(sub_dataset_size, dataset_dim); + ace_load_partition_dataset_from_disk(partition_id, + dataset_dim, + partition_histogram.view(), + core_partition_offsets.view(), + augmented_partition_offsets.view(), + reordered_fd, + augmented_fd, + reordered_header_size, + augmented_header_size, + sub_dataset.data_handle()); + read_end = std::chrono::high_resolution_clock::now(); + return cuvs::neighbors::cagra::build( + res, sub_index_params, raft::make_const_mdspan(sub_dataset.view())); + } + // In-memory mode: gather this partition's vectors from the host dataset. + auto sub_dataset = raft::make_host_matrix(sub_dataset_size, dataset_dim); ace_gather_partition_dataset(core_sub_dataset_size, augmented_sub_dataset_size, dataset_dim, @@ -1375,25 +1420,17 @@ index build_ace(raft::resources const& res, core_partition_offsets.view(), augmented_partition_offsets.view(), sub_dataset.view()); + read_end = std::chrono::high_resolution_clock::now(); + return cuvs::neighbors::cagra::build( + res, sub_index_params, raft::make_const_mdspan(sub_dataset.view())); + }(); + if (used_device_read) { + RAFT_LOG_DEBUG("ACE: partition %lu read directly into device memory (GDS path)", + partition_id); } - auto read_end = std::chrono::high_resolution_clock::now(); auto read_elapsed = std::chrono::duration_cast(read_end - start).count(); - // Create index for this partition - cuvs::neighbors::cagra::index_params sub_index_params; - sub_index_params = cuvs::neighbors::cagra::index_params::from_hnsw_params( - raft::make_extents(sub_dataset_size, dataset_dim), - graph_degree / 2, - ef_construction, - cuvs::neighbors::cagra::hnsw_heuristic_type::SAME_GRAPH_FOOTPRINT, - params.metric); - sub_index_params.attach_dataset_on_build = false; - sub_index_params.guarantee_connectivity = params.guarantee_connectivity; - - auto sub_index = cuvs::neighbors::cagra::build( - res, sub_index_params, raft::make_const_mdspan(sub_dataset.view())); - auto optimize_end = std::chrono::high_resolution_clock::now(); auto optimize_elapsed = std::chrono::duration_cast(optimize_end - read_end).count(); diff --git a/cpp/src/neighbors/detail/hnsw.hpp b/cpp/src/neighbors/detail/hnsw.hpp index d2ceee7bd2..789a1d09a6 100644 --- a/cpp/src/neighbors/detail/hnsw.hpp +++ b/cpp/src/neighbors/detail/hnsw.hpp @@ -14,6 +14,8 @@ #include #include +#include + #include #include #include @@ -722,9 +724,12 @@ void serialize_to_hnswlib_from_disk(raft::resources const& res, RAFT_EXPECTS(!dataset_path.empty(), "Unable to get path from dataset file descriptor"); RAFT_EXPECTS(!mapping_path.empty(), "Unable to get path from mapping file descriptor"); - int graph_fd = graph_fd_opt->get(); - int dataset_fd = dataset_fd_opt->get(); - int label_fd = mapping_fd_opt->get(); + // Open kvikio handles for the disk-backed artifacts. Reads here target host buffers (the hnswlib + // layout is assembled on the CPU), so kvikio uses its POSIX + threadpool backend, with O_DIRECT + // when available; it handles any alignment internally. + kvikio::FileHandle graph_kv(graph_path, "r"); + kvikio::FileHandle dataset_kv(dataset_path, "r"); + kvikio::FileHandle label_kv(mapping_path, "r"); // Read headers from files to get dimensions size_t graph_header_size = 0; @@ -813,44 +818,47 @@ void serialize_to_hnswlib_from_disk(raft::resources const& res, raft::host_matrix_view graph_buf, raft::host_matrix_view dataset_buf, raft::host_vector_view label_buf) { - const size_t graph_bytes = rows_to_read * graph_degree_int * sizeof(IdxT); - const size_t dataset_bytes = rows_to_read * dim * sizeof(T); - const size_t label_bytes = rows_to_read * sizeof(uint32_t); + RAFT_EXPECTS(start_row >= 0 && rows_to_read >= 0, + "Batch start row and row count must be non-negative"); + const size_t row = static_cast(start_row); + const size_t rows = static_cast(rows_to_read); + const size_t graph_degree = static_cast(graph_degree_int); + const size_t dim_size = static_cast(dim); + + const size_t graph_bytes = rows * graph_degree * sizeof(IdxT); + const size_t dataset_bytes = rows * dim_size * sizeof(T); + const size_t label_bytes = rows * sizeof(uint32_t); - const off_t graph_offset = graph_header_size + start_row * graph_degree_int * sizeof(IdxT); - const off_t dataset_offset = dataset_header_size + start_row * dim * sizeof(T); - const off_t label_offset = label_header_size + start_row * sizeof(uint32_t); + const size_t graph_offset = graph_header_size + row * graph_degree * sizeof(IdxT); + const size_t dataset_offset = dataset_header_size + row * dim_size * sizeof(T); + const size_t label_offset = label_header_size + row * sizeof(uint32_t); RAFT_LOG_DEBUG("Reading batch: row=%ld, rows=%ld", start_row, rows_to_read); -#pragma omp parallel sections num_threads(3) - { -#pragma omp section - { - ssize_t bytes_read = pread(graph_fd, graph_buf.data_handle(), graph_bytes, graph_offset); - RAFT_EXPECTS(bytes_read == static_cast(graph_bytes), - "Failed to read graph data: expected %zu, got %zd", - graph_bytes, - bytes_read); - } -#pragma omp section - { - ssize_t bytes_read = - pread(dataset_fd, dataset_buf.data_handle(), dataset_bytes, dataset_offset); - RAFT_EXPECTS(bytes_read == static_cast(dataset_bytes), - "Failed to read dataset data: expected %zu, got %zd", - dataset_bytes, - bytes_read); - } -#pragma omp section - { - ssize_t bytes_read = pread(label_fd, label_buf.data_handle(), label_bytes, label_offset); - RAFT_EXPECTS(bytes_read == static_cast(label_bytes), - "Failed to read label data: expected %zu, got %zd", - label_bytes, - bytes_read); - } - } + // Issue the three reads concurrently through kvikio (its threadpool parallelizes each), then + // wait for all to complete. + auto graph_future = graph_kv.pread(graph_buf.data_handle(), graph_bytes, graph_offset); + auto dataset_future = + dataset_kv.pread(dataset_buf.data_handle(), dataset_bytes, dataset_offset); + auto label_future = label_kv.pread(label_buf.data_handle(), label_bytes, label_offset); + const size_t graph_read = graph_future.get(); + const size_t dataset_read = dataset_future.get(); + const size_t label_read = label_future.get(); + RAFT_EXPECTS(graph_read == graph_bytes, + "Short graph read at row %ld: expected %zu, got %zu", + start_row, + graph_bytes, + graph_read); + RAFT_EXPECTS(dataset_read == dataset_bytes, + "Short dataset read at row %ld: expected %zu, got %zu", + start_row, + dataset_bytes, + dataset_read); + RAFT_EXPECTS(label_read == label_bytes, + "Short label read at row %ld: expected %zu, got %zu", + start_row, + label_bytes, + label_read); }; serialize_to_hnswlib_batched( @@ -1258,7 +1266,10 @@ std::unique_ptr> from_cagra( std::string index_filename = (std::filesystem::path(index_directory) / "hnsw_index.bin").string(); - std::ofstream of(index_filename, std::ios::out | std::ios::binary); + // Route the disk-backed hnswlib output through kvikio (bypassing the page cache via O_DIRECT + // when supported) so all ACE disk I/O goes through kvikio. kvikio_ofstream is a std::ostream + // backed by a kvikio writer, so it plugs into the shared serializer below. + cuvs::util::kvikio_ofstream of(index_filename); RAFT_EXPECTS(of, "Cannot open file %s", index_filename.c_str()); diff --git a/cpp/src/util/file_io.cpp b/cpp/src/util/file_io.cpp index d924527e72..459094e481 100644 --- a/cpp/src/util/file_io.cpp +++ b/cpp/src/util/file_io.cpp @@ -1,16 +1,17 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #include +#include + #include +#include #include -#include - -#include -#include +#include +#include namespace cuvs::util { @@ -22,28 +23,19 @@ void read_large_file(const file_descriptor& fd, RAFT_EXPECTS(total_bytes > 0, "Total bytes must be greater than 0"); RAFT_EXPECTS(dest_ptr != nullptr, "Destination pointer must not be nullptr"); RAFT_EXPECTS(fd.is_valid(), "File descriptor must be valid"); + const std::string path = fd.get_path(); + RAFT_EXPECTS(!path.empty(), "File descriptor must have an associated path for kvikio I/O"); - const size_t read_chunk_size = std::min(1024 * 1024 * 1024, SSIZE_MAX); - size_t bytes_remaining = total_bytes; - size_t offset = 0; - - while (bytes_remaining > 0) { - const size_t chunk_size = std::min(read_chunk_size, bytes_remaining); - const uint64_t file_pos = file_offset + offset; - const ssize_t bytes_read = - pread(fd.get(), reinterpret_cast(dest_ptr) + offset, chunk_size, file_pos); - - RAFT_EXPECTS( - bytes_read != -1, "Failed to read from file at offset %lu: %s", file_pos, strerror(errno)); - RAFT_EXPECTS(bytes_read == static_cast(chunk_size), - "Incomplete read from file. Expected %zu bytes, got %zd at offset %lu", - chunk_size, - bytes_read, - file_pos); - - bytes_remaining -= chunk_size; - offset += chunk_size; - } + // kvikio selects GPUDirect Storage (cuFile) for device destinations on a GDS-capable system, and + // the POSIX + threadpool backend (with O_DIRECT when available) otherwise. The destination may be + // host or device memory; kvikio detects which. + kvikio::FileHandle handle(path, "r"); + const size_t bytes_read = handle.pread(dest_ptr, total_bytes, file_offset).get(); + RAFT_EXPECTS(bytes_read == total_bytes, + "Incomplete read from %s: expected %zu bytes, got %zu", + path.c_str(), + total_bytes, + bytes_read); } void write_large_file(const file_descriptor& fd, @@ -54,28 +46,164 @@ void write_large_file(const file_descriptor& fd, RAFT_EXPECTS(total_bytes > 0, "Total bytes must be greater than 0"); RAFT_EXPECTS(data_ptr != nullptr, "Data pointer must not be nullptr"); RAFT_EXPECTS(fd.is_valid(), "File descriptor must be valid"); + const std::string path = fd.get_path(); + RAFT_EXPECTS(!path.empty(), "File descriptor must have an associated path for kvikio I/O"); + + // Open in read+write mode ("r+") so the existing numpy header and preallocation are preserved + // (kvikio's "w" mode would truncate). The source may be host or device memory. + kvikio::FileHandle handle(path, "r+"); + const size_t bytes_written = handle.pwrite(data_ptr, total_bytes, file_offset).get(); + RAFT_EXPECTS(bytes_written == total_bytes, + "Incomplete write to %s: expected %zu bytes, wrote %zu", + path.c_str(), + total_bytes, + bytes_written); +} + +// std::streambuf that stages output into a large buffer and writes full buffers to disk through +// kvikio at an increasing file offset. The trailing partial buffer is written on sync()/close(). +class kvikio_ofstream::sbuf : public std::streambuf { + public: + sbuf(const std::string& path, size_t cap) + : handle_(path, "w"), buffer_(std::max(cap, kNumpyDataAlignment)) + { + RAFT_EXPECTS(buffer_.size() <= static_cast(std::numeric_limits::max()), + "kvikio_ofstream buffer size must fit in std::streambuf::pbump"); + setp(buffer_.data(), buffer_.data() + buffer_.size()); + } + + ~sbuf() override + { + try { + close(); + } catch (...) { + // Swallow during destruction. + } + } + + void close() + { + if (closed_) { return; } + flush_buffer(); + handle_.close(); + closed_ = true; + } + + // Total logical bytes accepted = already-written + currently-staged. + [[nodiscard]] size_t bytes_written() const noexcept + { + return offset_ + static_cast(pptr() - pbase()); + } + + protected: + int_type overflow(int_type ch) override + { + RAFT_EXPECTS(!closed_, "kvikio_ofstream: write attempted after close"); + flush_buffer(); + if (!traits_type::eq_int_type(ch, traits_type::eof())) { + *pptr() = traits_type::to_char_type(ch); + pbump(1); + } + return traits_type::not_eof(ch); + } + + std::streamsize xsputn(const char* input, std::streamsize count) override + { + RAFT_EXPECTS(!closed_, "kvikio_ofstream: write attempted after close"); + if (count <= 0) { return 0; } + + const auto requested = count; + auto* current = input; + size_t remaining = static_cast(count); + + while (remaining > 0) { + // If the caller hands us a large contiguous chunk, flush pending staged bytes and pass the + // chunk straight to kvikio. This avoids std::streambuf's byte-at-a-time fallback path. + if (remaining >= buffer_.size()) { + flush_buffer(); + write_direct(current, remaining); + return requested; + } + + size_t available = static_cast(epptr() - pptr()); + if (available == 0) { + flush_buffer(); + available = static_cast(epptr() - pptr()); + } + + const size_t n = std::min(remaining, available); + std::memcpy(pptr(), current, n); + pbump(static_cast(n)); + current += n; + remaining -= n; + } - const size_t write_chunk_size = std::min(1024 * 1024 * 1024, SSIZE_MAX); - size_t bytes_remaining = total_bytes; - size_t offset = 0; - - while (bytes_remaining > 0) { - const size_t chunk_size = std::min(write_chunk_size, bytes_remaining); - const uint64_t file_pos = file_offset + offset; - const ssize_t chunk_written = - pwrite(fd.get(), reinterpret_cast(data_ptr) + offset, chunk_size, file_pos); - - RAFT_EXPECTS( - chunk_written != -1, "Failed to write to file at offset %lu: %s", file_pos, strerror(errno)); - RAFT_EXPECTS(chunk_written == static_cast(chunk_size), - "Incomplete write to file. Expected %zu bytes, wrote %zd at offset %lu", - chunk_size, - chunk_written, - file_pos); - - bytes_remaining -= chunk_size; - offset += chunk_size; + return requested; + } + + int sync() override + { + try { + flush_buffer(); + return 0; + } catch (...) { + return -1; + } + } + + // Support tellp(): report the current output position. + pos_type seekoff(off_type off, std::ios_base::seekdir dir, std::ios_base::openmode which) override + { + if ((which & std::ios_base::out) && dir == std::ios_base::cur && off == 0) { + return pos_type(static_cast(bytes_written())); + } + return pos_type(off_type(-1)); + } + + private: + void flush_buffer() + { + const size_t n = static_cast(pptr() - pbase()); + if (n > 0) { + write_direct(pbase(), n); + setp(buffer_.data(), buffer_.data() + buffer_.size()); + } + } + + void write_direct(const char* data, size_t n) + { + RAFT_EXPECTS(!closed_, "kvikio_ofstream: write attempted after close"); + const size_t w = handle_.pwrite(data, n, offset_).get(); + RAFT_EXPECTS(w == n, "kvikio_ofstream: short write (expected %zu, wrote %zu)", n, w); + offset_ += n; + } + + kvikio::FileHandle handle_; + std::vector buffer_; + size_t offset_ = 0; + bool closed_ = false; +}; + +kvikio_ofstream::kvikio_ofstream(const std::string& path, size_t buffer_size) + : std::ostream(nullptr), buf_(std::make_unique(path, buffer_size)) +{ + rdbuf(buf_.get()); +} + +kvikio_ofstream::~kvikio_ofstream() +{ + try { + if (buf_) { buf_->close(); } + } catch (...) { + // Swallow during destruction. } } +void kvikio_ofstream::close() +{ + if (buf_) { buf_->close(); } +} + +size_t kvikio_ofstream::bytes_written() const noexcept { return buf_ ? buf_->bytes_written() : 0; } + } // namespace cuvs::util diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 13a07b10b5..ec2dbddf4a 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -393,6 +393,13 @@ ConfigureTest( PERCENT 100 ) +ConfigureTest( + NAME UTIL_TEST + PATH util/file_io_test.cpp + GPUS 1 + PERCENT 100 +) + # ################################################################################################## # Install tests #################################################################################### # ################################################################################################## diff --git a/cpp/tests/util/file_io_test.cpp b/cpp/tests/util/file_io_test.cpp new file mode 100644 index 0000000000..79809e79c5 --- /dev/null +++ b/cpp/tests/util/file_io_test.cpp @@ -0,0 +1,218 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace cuvs::util { + +namespace { + +// Deterministic byte pattern so reads can be validated independently of the writer. +std::vector make_pattern(size_t n, uint8_t seed) +{ + std::vector v(n); + for (size_t i = 0; i < n; i++) { + v[i] = static_cast((i * 131u + seed * 7u + 17u) & 0xFFu); + } + return v; +} + +// Create a unique writable scratch directory. Prefer the current working directory (typically the +// on-disk build tree, where O_DIRECT is usually supported) and fall back to the system temp dir. +class scratch_dir { + public: + scratch_dir() + { + const std::string name = ".cuvs_file_io_test_" + std::to_string(::getpid()); + std::error_code ec; + std::filesystem::path base = std::filesystem::current_path(ec); + if (ec) { base = std::filesystem::temp_directory_path(); } + path_ = base / name; + std::filesystem::create_directories(path_, ec); + if (ec || !std::filesystem::is_directory(path_)) { + path_ = std::filesystem::temp_directory_path() / name; + std::filesystem::create_directories(path_); + } + } + + ~scratch_dir() + { + std::error_code ec; + std::filesystem::remove_all(path_, ec); + } + + [[nodiscard]] std::string file(const std::string& name) const { return (path_ / name).string(); } + [[nodiscard]] std::string dir() const { return path_.string(); } + + private: + std::filesystem::path path_; +}; + +std::vector read_whole_file(const std::string& path) +{ + std::ifstream is(path, std::ios::in | std::ios::binary); + EXPECT_TRUE(is.good()) << "cannot open " << path; + return std::vector((std::istreambuf_iterator(is)), std::istreambuf_iterator()); +} + +} // namespace + +// create_numpy_file must produce a numpy-compatible header whose data body begins on a block +// boundary (so kvikio's O_DIRECT / GDS interior is aligned), and the file must be readable back +// through the numpy deserializer. +TEST(FileIO, CreateNumpyFileAlignedHeader) +{ + scratch_dir scratch; + const std::string path = scratch.file("aligned.npy"); + const size_t rows = 1234; + const size_t cols = 17; + auto [fd, header_size] = create_numpy_file(path, {rows, cols}); + EXPECT_TRUE(fd.is_valid()); + EXPECT_EQ(header_size % kNumpyDataAlignment, 0u) + << "numpy data body must start on a " << kNumpyDataAlignment << "-byte boundary"; + + // The numpy deserializer must recover the shape from the (re-padded) header. + auto stream = fd.make_istream(); + auto header = raft::detail::numpy_serializer::read_header(stream); + ASSERT_EQ(header.shape.size(), 2u); + EXPECT_EQ(header.shape[0], rows); + EXPECT_EQ(header.shape[1], cols); +} + +// write_large_file followed by read_large_file with host buffers must round-trip the data exactly. +TEST(FileIO, HostReadWriteRoundTrip) +{ + scratch_dir scratch; + const std::string path = scratch.file("host.npy"); + const size_t rows = 4096; + const size_t cols = 33; + const size_t n = rows * cols; + + auto [fd, header_size] = create_numpy_file(path, {rows, cols}); + + std::vector src(n); + for (size_t i = 0; i < n; i++) { + src[i] = static_cast(i % 1000) * 0.5f; + } + write_large_file(fd, src.data(), n * sizeof(float), header_size); + + std::vector dst(n, -1.0f); + read_large_file(fd, dst.data(), n * sizeof(float), header_size); + + EXPECT_EQ(src, dst); +} + +// read_large_file must also fill device memory (kvikio uses GPUDirect Storage when available, and +// stages through a host bounce buffer in compatibility mode). The data must match after copying +// back to the host. +TEST(FileIO, DeviceReadRoundTrip) +{ + raft::resources res; + scratch_dir scratch; + const std::string path = scratch.file("device.npy"); + const size_t rows = 5000; + const size_t cols = 24; + const size_t n = rows * cols; + + auto [fd, header_size] = create_numpy_file(path, {rows, cols}); + + std::vector src(n); + for (size_t i = 0; i < n; i++) { + src[i] = static_cast((i * 7) % 4096); + } + write_large_file(fd, src.data(), n * sizeof(float), header_size); + + auto dev = raft::make_device_vector(res, n); + read_large_file(fd, dev.data_handle(), n * sizeof(float), header_size); + + std::vector dst(n, -1.0f); + raft::copy(dst.data(), dev.data_handle(), n, raft::resource::get_cuda_stream(res)); + raft::resource::sync_stream(res); + + EXPECT_EQ(src, dst); +} + +// kvikio_ofstream must reproduce exactly the bytes handed to write(), across many small writes that +// span multiple internal buffer flushes, and report the correct logical size. +TEST(FileIO, KvikioOfstreamRoundTrip) +{ + scratch_dir scratch; + const std::string path = scratch.file("stream.bin"); + std::vector data = make_pattern(5'000'003, 42); + const size_t cap = size_t(1) << 20; // 1 MiB staging buffer -> several flushes + + { + kvikio_ofstream os(path, cap); + size_t pos = 0; + while (pos < data.size()) { + const size_t chunk = std::min(7777, data.size() - pos); + os.write(data.data() + pos, chunk); + pos += chunk; + } + os.flush(); + EXPECT_EQ(os.bytes_written(), data.size()); + EXPECT_EQ(os.tellp(), std::streampos(static_cast(data.size()))); + os.close(); + } + + const std::vector got = read_whole_file(path); + ASSERT_EQ(got.size(), data.size()); + EXPECT_EQ(got, data); +} + +// A large single write must bypass the small staging buffer correctly and still round-trip. +TEST(FileIO, KvikioOfstreamLargeSingleWrite) +{ + scratch_dir scratch; + const std::string path = scratch.file("stream_large.bin"); + std::vector data = make_pattern((size_t(8) << 20) + 123, 7); + + { + kvikio_ofstream os(path, size_t(1) << 20); + os.write(data.data(), data.size()); + EXPECT_EQ(os.bytes_written(), data.size()); + EXPECT_EQ(os.tellp(), std::streampos(static_cast(data.size()))); + os.close(); + } + + const std::vector got = read_whole_file(path); + ASSERT_EQ(got.size(), data.size()); + EXPECT_EQ(got, data); +} + +// Defensive checks on the bulk helpers. +TEST(FileIO, InvalidArguments) +{ + scratch_dir scratch; + auto [fd, header_size] = create_numpy_file(scratch.file("args.npy"), {16, 4}); + char buf[8] = {0}; + EXPECT_THROW(read_large_file(fd, buf, 0, header_size), raft::exception); + EXPECT_THROW(write_large_file(fd, buf, 0, header_size), raft::exception); + EXPECT_THROW(read_large_file(fd, nullptr, 8, header_size), raft::exception); + + EXPECT_NO_THROW(read_large_file(fd, buf, sizeof(buf), header_size)); + const int dup_fd = ::dup(fd.get()); + ASSERT_NE(dup_fd, -1); + file_descriptor pathless_fd(dup_fd); + EXPECT_THROW(read_large_file(pathless_fd, buf, sizeof(buf), header_size), raft::exception); +} + +} // namespace cuvs::util diff --git a/dependencies.yaml b/dependencies.yaml index 744e4d9227..820813c39c 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,6 +17,7 @@ files: - depends_on_cuda_python - depends_on_cupy - depends_on_librmm + - depends_on_libkvikio - depends_on_pylibraft - depends_on_nccl - docs @@ -140,6 +141,7 @@ files: includes: - depends_on_libraft - depends_on_librmm + - depends_on_libkvikio - depends_on_nccl - rapids_build py_run_libcuvs: @@ -151,6 +153,7 @@ files: - cuda_wheels - depends_on_libraft - depends_on_librmm + - depends_on_libkvikio - depends_on_nccl py_build_cuvs: output: pyproject @@ -400,6 +403,24 @@ dependencies: - libcusolver-dev - libcusparse-dev - libnvjitlink-dev + specific: + # libcufile provides the GPUDirect Storage (cuFile) backend that kvikio compiles against. + # It is packaged for CUDA 12 on x86_64 only, and for CUDA 13 on all arches. + - output_types: [conda] + matrices: + - matrix: + cuda: "12.*" + arch: x86_64 + packages: + - libcufile-dev + - matrix: + cuda: "12.*" + arch: aarch64 + packages: + - matrix: + cuda: "13.*" + packages: + - libcufile-dev cuda_wheels: specific: # cuVS needs 'nvJitLink>={whatever-cuvs-was-built-against}' at runtime, and mixing @@ -686,6 +707,29 @@ dependencies: packages: - librmm-cu13==26.8.*,>=0.0.0a0 - {matrix: null, packages: [*librmm_unsuffixed]} + depends_on_libkvikio: + common: + - output_types: conda + packages: + - &libkvikio_unsuffixed libkvikio==26.8.*,>=0.0.0a0 + - output_types: requirements + packages: + # pip recognizes the index as a global option for the requirements.txt file + - --extra-index-url=https://pypi.anaconda.org/rapidsai-wheels-nightly/simple + specific: + - output_types: [requirements, pyproject] + matrices: + - matrix: + cuda: "12.*" + cuda_suffixed: "true" + packages: + - libkvikio-cu12==26.8.*,>=0.0.0a0 + - matrix: + cuda: "13.*" + cuda_suffixed: "true" + packages: + - libkvikio-cu13==26.8.*,>=0.0.0a0 + - {matrix: null, packages: [*libkvikio_unsuffixed]} depends_on_pylibraft: common: - output_types: conda diff --git a/python/libcuvs/pyproject.toml b/python/libcuvs/pyproject.toml index 5025daa66d..d41e39976f 100644 --- a/python/libcuvs/pyproject.toml +++ b/python/libcuvs/pyproject.toml @@ -20,6 +20,7 @@ license = "Apache-2.0" requires-python = ">=3.11" dependencies = [ "cuda-toolkit[cublas,curand,cusolver,cusparse,nvrtc]==13.*", + "libkvikio==26.8.*,>=0.0.0a0", "libraft==26.8.*,>=0.0.0a0", "librmm==26.8.*,>=0.0.0a0", "nvidia-nvjitlink>=13.0,<14", @@ -81,6 +82,7 @@ regex = "(?P.*)" build-backend = "scikit_build_core.build" requires = [ "cmake>=4.0", + "libkvikio==26.8.*,>=0.0.0a0", "libraft==26.8.*,>=0.0.0a0", "librmm==26.8.*,>=0.0.0a0", "ninja", From c6df26012592f58cccb2b76d7c7a36b3326b24f4 Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Wed, 24 Jun 2026 10:30:24 +0200 Subject: [PATCH 2/6] Address review feedback --- ci/release/update-version.sh | 1 + .../all_cuda-129_arch-x86_64.yaml | 1 - .../all_cuda-133_arch-aarch64.yaml | 1 - .../all_cuda-133_arch-x86_64.yaml | 1 - .../bench_ann_cuda-129_arch-x86_64.yaml | 1 - .../bench_ann_cuda-133_arch-aarch64.yaml | 1 - .../bench_ann_cuda-133_arch-x86_64.yaml | 1 - .../environments/go_cuda-129_arch-x86_64.yaml | 1 - .../go_cuda-133_arch-aarch64.yaml | 1 - .../environments/go_cuda-133_arch-x86_64.yaml | 1 - .../rust_cuda-129_arch-x86_64.yaml | 1 - .../rust_cuda-133_arch-aarch64.yaml | 1 - .../rust_cuda-133_arch-x86_64.yaml | 1 - conda/recipes/libcuvs/recipe.yaml | 17 ------- cpp/CMakeLists.txt | 7 ++- cpp/src/neighbors/detail/hnsw.hpp | 21 ++++++-- cpp/src/util/file_io.cpp | 49 +++++++++++++++++++ cpp/tests/util/file_io_test.cpp | 23 +++++++++ dependencies.yaml | 18 ------- python/libcuvs/libcuvs/load.py | 4 +- 20 files changed, 98 insertions(+), 54 deletions(-) diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index f36c14f771..de1802adb4 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -110,6 +110,7 @@ DEPENDENCIES=( cuvs-bench libcuvs libcuvs-tests + libkvikio libraft librmm pylibraft diff --git a/conda/environments/all_cuda-129_arch-x86_64.yaml b/conda/environments/all_cuda-129_arch-x86_64.yaml index f372dfdeb7..9e8d26acf8 100644 --- a/conda/environments/all_cuda-129_arch-x86_64.yaml +++ b/conda/environments/all_cuda-129_arch-x86_64.yaml @@ -24,7 +24,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/all_cuda-133_arch-aarch64.yaml b/conda/environments/all_cuda-133_arch-aarch64.yaml index 1da8d32820..3befeac713 100644 --- a/conda/environments/all_cuda-133_arch-aarch64.yaml +++ b/conda/environments/all_cuda-133_arch-aarch64.yaml @@ -24,7 +24,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/all_cuda-133_arch-x86_64.yaml b/conda/environments/all_cuda-133_arch-x86_64.yaml index 77173ae228..63508294d5 100644 --- a/conda/environments/all_cuda-133_arch-x86_64.yaml +++ b/conda/environments/all_cuda-133_arch-x86_64.yaml @@ -24,7 +24,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml b/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml index 009f31cd5f..d84bf87234 100644 --- a/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml +++ b/conda/environments/bench_ann_cuda-129_arch-x86_64.yaml @@ -28,7 +28,6 @@ dependencies: - libboost-devel=1.87 - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml b/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml index 31c5e64c21..c874298d7d 100644 --- a/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml +++ b/conda/environments/bench_ann_cuda-133_arch-aarch64.yaml @@ -26,7 +26,6 @@ dependencies: - h5py>=3.8.0 - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml b/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml index e2a344e6d6..43b2dbce46 100644 --- a/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml +++ b/conda/environments/bench_ann_cuda-133_arch-x86_64.yaml @@ -28,7 +28,6 @@ dependencies: - libboost-devel=1.87 - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-129_arch-x86_64.yaml b/conda/environments/go_cuda-129_arch-x86_64.yaml index eb9fcbf319..e99615ab2c 100644 --- a/conda/environments/go_cuda-129_arch-x86_64.yaml +++ b/conda/environments/go_cuda-129_arch-x86_64.yaml @@ -22,7 +22,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-133_arch-aarch64.yaml b/conda/environments/go_cuda-133_arch-aarch64.yaml index 05b00c1537..eeb5511537 100644 --- a/conda/environments/go_cuda-133_arch-aarch64.yaml +++ b/conda/environments/go_cuda-133_arch-aarch64.yaml @@ -22,7 +22,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/go_cuda-133_arch-x86_64.yaml b/conda/environments/go_cuda-133_arch-x86_64.yaml index 6b02c21a87..5fd2153ffa 100644 --- a/conda/environments/go_cuda-133_arch-x86_64.yaml +++ b/conda/environments/go_cuda-133_arch-x86_64.yaml @@ -22,7 +22,6 @@ dependencies: - go - libclang==20.1.8 - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-129_arch-x86_64.yaml b/conda/environments/rust_cuda-129_arch-x86_64.yaml index 13c2c77147..a9e9e2142c 100644 --- a/conda/environments/rust_cuda-129_arch-x86_64.yaml +++ b/conda/environments/rust_cuda-129_arch-x86_64.yaml @@ -16,7 +16,6 @@ dependencies: - cxx-compiler - gcc_linux-64=14.* - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-133_arch-aarch64.yaml b/conda/environments/rust_cuda-133_arch-aarch64.yaml index 0223fba1ca..3daa10d2be 100644 --- a/conda/environments/rust_cuda-133_arch-aarch64.yaml +++ b/conda/environments/rust_cuda-133_arch-aarch64.yaml @@ -16,7 +16,6 @@ dependencies: - cxx-compiler - gcc_linux-aarch64=14.* - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/environments/rust_cuda-133_arch-x86_64.yaml b/conda/environments/rust_cuda-133_arch-x86_64.yaml index 8aa9341870..9e0fe2d529 100644 --- a/conda/environments/rust_cuda-133_arch-x86_64.yaml +++ b/conda/environments/rust_cuda-133_arch-x86_64.yaml @@ -16,7 +16,6 @@ dependencies: - cxx-compiler - gcc_linux-64=14.* - libcublas-dev -- libcufile-dev - libcurand-dev - libcusolver-dev - libcusparse-dev diff --git a/conda/recipes/libcuvs/recipe.yaml b/conda/recipes/libcuvs/recipe.yaml index 3d645fa093..7d0542984e 100644 --- a/conda/recipes/libcuvs/recipe.yaml +++ b/conda/recipes/libcuvs/recipe.yaml @@ -76,11 +76,6 @@ cache: - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} - libkvikio =${{ minor_version }} - # libcufile (GPUDirect Storage backend that kvikio compiles against): CUDA 13 on all arches, - # CUDA 12 on x86_64 only. - - if: cuda_major == "13" or linux64 - then: - - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -181,9 +176,6 @@ outputs: - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} - libkvikio =${{ minor_version }} - - if: cuda_major == "13" or linux64 - then: - - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -200,9 +192,6 @@ outputs: - libraft-headers =${{ minor_version }} - librmm =${{ minor_version }} - libkvikio =${{ minor_version }} - - if: cuda_major == "13" or linux64 - then: - - libcufile - cuda-nvrtc - nccl - libcublas @@ -250,9 +239,6 @@ outputs: - librmm =${{ minor_version }} - libraft-headers =${{ minor_version }} - libkvikio =${{ minor_version }} - - if: cuda_major == "13" or linux64 - then: - - libcufile-dev - nccl ${{ nccl_version }} - cuda-version =${{ cuda_version }} - cuda-cudart-dev @@ -269,9 +255,6 @@ outputs: - libraft-headers =${{ minor_version }} - librmm =${{ minor_version }} - libkvikio =${{ minor_version }} - - if: cuda_major == "13" or linux64 - then: - - libcufile - cuda-nvrtc - nccl - libcublas diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 39f4139985..ecbf377fcc 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1541,8 +1541,11 @@ if(NOT BUILD_CPU_ONLY) $> $> $<$:CUDA::nvtx3> - PRIVATE $ $ - $ CUDA::nvJitLink CUDA::nvrtc + PRIVATE $ + $ + $ + CUDA::nvJitLink + CUDA::nvrtc $ ) diff --git a/cpp/src/neighbors/detail/hnsw.hpp b/cpp/src/neighbors/detail/hnsw.hpp index 789a1d09a6..8e55d56a4d 100644 --- a/cpp/src/neighbors/detail/hnsw.hpp +++ b/cpp/src/neighbors/detail/hnsw.hpp @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -840,10 +841,22 @@ void serialize_to_hnswlib_from_disk(raft::resources const& res, auto graph_future = graph_kv.pread(graph_buf.data_handle(), graph_bytes, graph_offset); auto dataset_future = dataset_kv.pread(dataset_buf.data_handle(), dataset_bytes, dataset_offset); - auto label_future = label_kv.pread(label_buf.data_handle(), label_bytes, label_offset); - const size_t graph_read = graph_future.get(); - const size_t dataset_read = dataset_future.get(); - const size_t label_read = label_future.get(); + auto label_future = label_kv.pread(label_buf.data_handle(), label_bytes, label_offset); + + // Drain all three futures before propagating any failure. + std::exception_ptr read_error; + auto drain = [&read_error](auto& fut) -> size_t { + try { + return fut.get(); + } catch (...) { + if (!read_error) { read_error = std::current_exception(); } + return 0; + } + }; + const size_t graph_read = drain(graph_future); + const size_t dataset_read = drain(dataset_future); + const size_t label_read = drain(label_future); + if (read_error) { std::rethrow_exception(read_error); } RAFT_EXPECTS(graph_read == graph_bytes, "Short graph read at row %ld: expected %zu, got %zu", start_row, diff --git a/cpp/src/util/file_io.cpp b/cpp/src/util/file_io.cpp index 459094e481..55432fd6ed 100644 --- a/cpp/src/util/file_io.cpp +++ b/cpp/src/util/file_io.cpp @@ -11,9 +11,56 @@ #include #include #include +#include #include namespace cuvs::util { +namespace { + +struct file_identity { + dev_t device; + ino_t inode; +}; + +file_identity get_file_identity(int fd, const char* description) +{ + struct stat status{}; + RAFT_EXPECTS(::fstat(fd, &status) == 0, + "Failed to stat %s file descriptor: %s", + description, + std::strerror(errno)); + return {status.st_dev, status.st_ino}; +} + +void expect_matching_file_identity(file_identity expected, + int actual_fd, + const char* actual_description, + const std::string& path) +{ + RAFT_EXPECTS(actual_fd >= 0, "kvikio did not open %s file descriptor", actual_description); + + const auto actual = get_file_identity(actual_fd, actual_description); + RAFT_EXPECTS(actual.device == expected.device && actual.inode == expected.inode, + "File path changed while opening %s for kvikio I/O", + path.c_str()); +} + +void validate_kvikio_handle_matches_fd(const file_descriptor& fd, + const kvikio::FileHandle& handle, + const std::string& path) +{ + const auto expected = get_file_identity(fd.get(), "source"); + + const int buffered_fd = handle.fd(false); + expect_matching_file_identity(expected, buffered_fd, "kvikio buffered", path); + + const int direct_fd = handle.fd(true); + if (direct_fd >= 0 && direct_fd != buffered_fd) { + expect_matching_file_identity(expected, direct_fd, "kvikio direct", path); + } +} + +} // namespace void read_large_file(const file_descriptor& fd, void* dest_ptr, @@ -30,6 +77,7 @@ void read_large_file(const file_descriptor& fd, // the POSIX + threadpool backend (with O_DIRECT when available) otherwise. The destination may be // host or device memory; kvikio detects which. kvikio::FileHandle handle(path, "r"); + validate_kvikio_handle_matches_fd(fd, handle, path); const size_t bytes_read = handle.pread(dest_ptr, total_bytes, file_offset).get(); RAFT_EXPECTS(bytes_read == total_bytes, "Incomplete read from %s: expected %zu bytes, got %zu", @@ -52,6 +100,7 @@ void write_large_file(const file_descriptor& fd, // Open in read+write mode ("r+") so the existing numpy header and preallocation are preserved // (kvikio's "w" mode would truncate). The source may be host or device memory. kvikio::FileHandle handle(path, "r+"); + validate_kvikio_handle_matches_fd(fd, handle, path); const size_t bytes_written = handle.pwrite(data_ptr, total_bytes, file_offset).get(); RAFT_EXPECTS(bytes_written == total_bytes, "Incomplete write to %s: expected %zu bytes, wrote %zu", diff --git a/cpp/tests/util/file_io_test.cpp b/cpp/tests/util/file_io_test.cpp index 79809e79c5..fd84ce9321 100644 --- a/cpp/tests/util/file_io_test.cpp +++ b/cpp/tests/util/file_io_test.cpp @@ -213,6 +213,29 @@ TEST(FileIO, InvalidArguments) ASSERT_NE(dup_fd, -1); file_descriptor pathless_fd(dup_fd); EXPECT_THROW(read_large_file(pathless_fd, buf, sizeof(buf), header_size), raft::exception); + EXPECT_THROW(write_large_file(pathless_fd, buf, sizeof(buf), header_size), raft::exception); +} + +TEST(FileIO, RejectsReplacedPathForBulkIO) +{ + scratch_dir scratch; + const std::string path = scratch.file("replace.npy"); + auto [fd, header_size] = create_numpy_file(path, {16, 4}); + char buf[8] = {0}; + + std::error_code ec; + ASSERT_TRUE(std::filesystem::remove(path, ec)); + ASSERT_FALSE(ec) << ec.message(); + + std::ofstream replacement(path, std::ios::out | std::ios::binary); + ASSERT_TRUE(replacement.good()) << "cannot create replacement " << path; + std::vector replacement_data(header_size + sizeof(buf), 0); + replacement.write(replacement_data.data(), replacement_data.size()); + replacement.close(); + ASSERT_TRUE(replacement.good()) << "cannot write replacement " << path; + + EXPECT_THROW(read_large_file(fd, buf, sizeof(buf), header_size), raft::exception); + EXPECT_THROW(write_large_file(fd, buf, sizeof(buf), header_size), raft::exception); } } // namespace cuvs::util diff --git a/dependencies.yaml b/dependencies.yaml index 820813c39c..a159b0ded2 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -403,24 +403,6 @@ dependencies: - libcusolver-dev - libcusparse-dev - libnvjitlink-dev - specific: - # libcufile provides the GPUDirect Storage (cuFile) backend that kvikio compiles against. - # It is packaged for CUDA 12 on x86_64 only, and for CUDA 13 on all arches. - - output_types: [conda] - matrices: - - matrix: - cuda: "12.*" - arch: x86_64 - packages: - - libcufile-dev - - matrix: - cuda: "12.*" - arch: aarch64 - packages: - - matrix: - cuda: "13.*" - packages: - - libcufile-dev cuda_wheels: specific: # cuVS needs 'nvJitLink>={whatever-cuvs-was-built-against}' at runtime, and mixing diff --git a/python/libcuvs/libcuvs/load.py b/python/libcuvs/libcuvs/load.py index 3e66761a2a..db33c758a6 100644 --- a/python/libcuvs/libcuvs/load.py +++ b/python/libcuvs/libcuvs/load.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # @@ -37,9 +37,11 @@ def load_library(): try: # These libraries must be loaded before libcuvs because libcuvs # references their symbols + import libkvikio import libraft import librmm + libkvikio.load_library() librmm.load_library() libraft.load_library() except ModuleNotFoundError: From 7496f4febb6cda2156c1fa76779400b29d546146 Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Wed, 24 Jun 2026 11:57:52 +0200 Subject: [PATCH 3/6] Fix missing C kvikio private linkage --- c/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index be4bc7a051..4c9cda6747 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -171,6 +171,7 @@ target_link_libraries( $,cuvs::cuvs_static,cuvs::cuvs> # $ # enforce we shouldn't use raft symbols $ + $> ) # ################################################################################################## From 66f1fe1b09c94a72eeb850a5b9f247aa115cd010 Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Wed, 24 Jun 2026 13:00:24 +0200 Subject: [PATCH 4/6] Fix C kvikio include and wheel build --- c/CMakeLists.txt | 4 ++++ ci/build_wheel.sh | 1 + 2 files changed, 5 insertions(+) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 4c9cda6747..74cab676d2 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -71,6 +71,10 @@ if(BUILD_CAGRA_HNSWLIB) include(../cpp/cmake/thirdparty/get_hnswlib.cmake) endif() +if(CUVSC_STATIC_CUVS_LIBRARY AND NOT TARGET kvikio::kvikio) + include(../cpp/cmake/thirdparty/get_kvikio.cmake) +endif() + if(BUILD_MG_ALGOS AND CUVSC_STATIC_CUVS_LIBRARY) rapids_find_generate_module( NCCL diff --git a/ci/build_wheel.sh b/ci/build_wheel.sh index 6adee1fe34..ca3903fd1f 100755 --- a/ci/build_wheel.sh +++ b/ci/build_wheel.sh @@ -40,6 +40,7 @@ EXCLUDE_ARGS=( --exclude "libcurand.so.*" --exclude "libcusolver.so.*" --exclude "libcusparse.so.*" + --exclude "libkvikio.so*" --exclude "libnccl.so.*" --exclude "libnvJitLink.so.*" --exclude "libnvrtc.so.*" From 46b5cf2ce762ba20537515001c2baa8760b9ecfc Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Wed, 24 Jun 2026 15:28:00 +0200 Subject: [PATCH 5/6] Fix C kvikio include order --- c/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 74cab676d2..45b4c29d28 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -71,10 +71,6 @@ if(BUILD_CAGRA_HNSWLIB) include(../cpp/cmake/thirdparty/get_hnswlib.cmake) endif() -if(CUVSC_STATIC_CUVS_LIBRARY AND NOT TARGET kvikio::kvikio) - include(../cpp/cmake/thirdparty/get_kvikio.cmake) -endif() - if(BUILD_MG_ALGOS AND CUVSC_STATIC_CUVS_LIBRARY) rapids_find_generate_module( NCCL @@ -219,6 +215,10 @@ if(BUILD_TESTS) enable_language(CUDA) find_package(CUDAToolkit REQUIRED) + if(CUVSC_STATIC_CUVS_LIBRARY AND NOT TARGET kvikio::kvikio) + include(../cpp/cmake/thirdparty/get_kvikio.cmake) + endif() + enable_testing() add_subdirectory(tests) endif() From f9ee2cb97cd6b9e87500f5c6be6c743bbf373b81 Mon Sep 17 00:00:00 2001 From: Julian Miller Date: Wed, 24 Jun 2026 18:49:00 +0200 Subject: [PATCH 6/6] Add C kvikio runtime library --- c/CMakeLists.txt | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 45b4c29d28..44e3cef0f9 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -191,6 +191,19 @@ if(PROJECT_IS_TOP_LEVEL) COMPONENT c_api EXPORT cuvs-c-exports ) + if(CUVSC_STATIC_CUVS_LIBRARY AND TARGET kvikio::kvikio) + get_target_property(_cuvs_kvikio_aliased_target kvikio::kvikio ALIASED_TARGET) + if(_cuvs_kvikio_aliased_target) + set(_cuvs_kvikio_runtime_target "${_cuvs_kvikio_aliased_target}") + else() + set(_cuvs_kvikio_runtime_target kvikio::kvikio) + endif() + install( + FILES "$" + DESTINATION ${lib_dir} + COMPONENT c_api + ) + endif() install( DIRECTORY include/cuvs COMPONENT c_api