diff --git a/bindings/cpp/include/svs/runtime/dynamic_vamana_index.h b/bindings/cpp/include/svs/runtime/dynamic_vamana_index.h index 1d487baf..d5b9e8c2 100644 --- a/bindings/cpp/include/svs/runtime/dynamic_vamana_index.h +++ b/bindings/cpp/include/svs/runtime/dynamic_vamana_index.h @@ -75,6 +75,15 @@ struct SVS_RUNTIME_API DynamicVamanaIndex : public VamanaIndex { virtual size_t blocksize_bytes() const noexcept = 0; + /// @brief Storage kind currently backing the index. + /// + /// When deferred compression is enabled and the threshold has not yet been crossed, + /// this returns the *initial* (uncompressed) storage kind (FP32 / FP16). After the + /// swap, this returns the same value as ``get_storage_kind()`` (the trained target + /// kind). When deferred compression is disabled, this is always equal to + /// ``get_storage_kind()``. + virtual StorageKind get_current_storage_kind() const noexcept = 0; + // Override for VamanaIndex interface Status add(size_t, const float*) noexcept override { return Status( diff --git a/bindings/cpp/include/svs/runtime/vamana_index.h b/bindings/cpp/include/svs/runtime/vamana_index.h index 98831952..6df23742 100644 --- a/bindings/cpp/include/svs/runtime/vamana_index.h +++ b/bindings/cpp/include/svs/runtime/vamana_index.h @@ -52,6 +52,29 @@ struct SVS_RUNTIME_API VamanaIndex { struct DynamicIndexParams { size_t blocksize_exp = 30; + + /// @brief Threshold (in number of valid vectors) at which to train and switch + /// from ``initial_storage_kind`` to the dynamic index's target storage kind. + /// + /// When ``0`` (the default) deferred compression is **disabled** and the index is + /// built directly with the requested compressed storage kind (current eager + /// behavior, fully backward compatible). + /// + /// When ``> 0`` and the requested target storage kind is a *trained* compressed + /// type (LVQ / LeanVec / SQ), the index is initially built using + /// ``initial_storage_kind`` (uncompressed). Once the live valid-vector count + /// reaches this threshold, statistics are trained from the accumulated data, + /// the dataset is replaced with the trained compressed form, and the existing + /// graph + ID translation are reused (no graph rebuild). + size_t deferred_compression_threshold = 0; + + /// @brief Storage kind used while accumulating vectors below the delayed + /// compression threshold. Must be an *untrained* type (FP32 or FP16). + /// + /// Defaults to FP32. Ignored when + /// ``deferred_compression_threshold == 0`` or the target storage kind itself is + /// already untrained. + StorageKind initial_storage_kind = StorageKind::FP32; }; virtual Status add(size_t n, const float* x) noexcept = 0; diff --git a/bindings/cpp/src/dynamic_vamana_index.cpp b/bindings/cpp/src/dynamic_vamana_index.cpp index 0c1a6a89..54e455e7 100644 --- a/bindings/cpp/src/dynamic_vamana_index.cpp +++ b/bindings/cpp/src/dynamic_vamana_index.cpp @@ -65,6 +65,10 @@ struct DynamicVamanaIndexManagerBase : public DynamicVamanaIndex { size_t blocksize_bytes() const noexcept { return impl_->blocksize_bytes(); } + StorageKind get_current_storage_kind() const noexcept override { + return impl_->get_current_storage_kind(); + } + Status remove_selected(size_t* num_removed, const IDFilter& selector) noexcept override { return runtime_error_wrapper([&] { diff --git a/bindings/cpp/src/dynamic_vamana_index_impl.h b/bindings/cpp/src/dynamic_vamana_index_impl.h index e80f83c1..aaf6a0e9 100644 --- a/bindings/cpp/src/dynamic_vamana_index_impl.h +++ b/bindings/cpp/src/dynamic_vamana_index_impl.h @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -63,6 +64,23 @@ class DynamicVamanaIndexImpl { "The specified storage kind is not compatible with the " "DynamicVamanaIndex"}; } + // Validate the deferred-compression configuration up front so misconfiguration + // surfaces at construction time rather than only on the first add. + if (dynamic_index_params_.deferred_compression_threshold > 0) { + const auto initial = dynamic_index_params_.initial_storage_kind; + if (initial != StorageKind::FP32 && initial != StorageKind::FP16) { + throw StatusException{ + ErrorCode::INVALID_ARGUMENT, + "Deferred compression initial_storage_kind must be FP32 or FP16"}; + } + if (!storage::is_supported_storage_kind(initial)) { + throw StatusException{ + ErrorCode::INVALID_ARGUMENT, + "Deferred compression initial_storage_kind is not supported on " + "this CPU"}; + } + } + current_storage_kind_ = effective_initial_storage_kind(); } size_t size() const { return impl_ ? impl_->size() : 0; } @@ -75,13 +93,32 @@ class DynamicVamanaIndexImpl { StorageKind get_storage_kind() const { return storage_kind_; } + /// @brief Storage kind currently backing the index. + /// + /// Equal to `get_storage_kind()` unless deferred compression is enabled and the + /// threshold has not yet been crossed, in which case this returns the *initial* + /// (uncompressed) storage kind. Public so that tests and Python bindings can + /// observe the deferred-compression transition. + StorageKind get_current_storage_kind() const { return current_storage_kind_; } + + /// @brief Whether deferred compression is configured (threshold > 0 and target + /// requires training). + bool deferred_compression_enabled() const { + return dynamic_index_params_.deferred_compression_threshold > 0 && + storage_kind_ != effective_initial_storage_kind(); + } + void add(data::ConstSimpleDataView data, std::span labels) { if (!impl_) { auto blocksize_bytes = lib::PowerOfTwo(dynamic_index_params_.blocksize_exp); - return init_impl(data, labels, blocksize_bytes); + init_impl(data, labels, blocksize_bytes); + } else { + impl_->add_points(data, labels); } - - impl_->add_points(data, labels); + // Deferred compression: once the live count reaches the threshold and we are + // still on the initial (uncompressed) storage, train the target compression + // and swap the dataset in-place while reusing the existing graph. + maybe_swap_to_target_storage(); } void search( @@ -415,8 +452,22 @@ class DynamicVamanaIndexImpl { std::span labels, lib::PowerOfTwo blocksize_bytes ) { + // Deferred compression fast-path: if the very first add already meets the + // threshold there is no benefit to building an uncompressed graph and + // immediately retraining it. Skip the deferred path entirely and build the + // target compressed backend directly. + if (deferred_compression_enabled() && + data.size() >= dynamic_index_params_.deferred_compression_threshold) { + current_storage_kind_ = storage_kind_; + } + // When deferred compression is enabled (and the fast-path above didn't fire), + // build the initial backend with the uncompressed `initial_storage_kind`. + // Otherwise (eager path) build directly with the target kind. The swap + // closure (if any) is installed by `setup_deferred_compression_swap` after + // the build. + const auto build_kind = current_storage_kind_; impl_.reset(storage::dispatch_storage_kind( - get_storage_kind(), + build_kind, [this]( auto&& tag, data::ConstSimpleDataView data, @@ -425,7 +476,7 @@ class DynamicVamanaIndexImpl { ) { using Tag = std::decay_t; return build_impl( - std::forward(tag), + Tag{}, this->metric_type_, this->vamana_build_parameters(), data, @@ -437,6 +488,29 @@ class DynamicVamanaIndexImpl { labels, blocksize_bytes )); + if (deferred_compression_enabled() && + build_kind != storage_kind_) { + setup_deferred_compression_swap(build_kind, blocksize_bytes); + } + } + + /// @brief Install the closure that performs the eventual deferred-compression + /// swap. Called after `init_impl` builds the initial uncompressed backend. + /// + /// The base implementation handles SQ / LVQ / LeanVec (with PCA-trained matrices + /// and default leanvec_dims). Subclasses (e.g. `DynamicVamanaIndexLeanVecImpl`) + /// override this to inject pre-trained matrices or a user-specified + /// ``leanvec_dims``. + virtual void setup_deferred_compression_swap( + StorageKind initial_kind, lib::PowerOfTwo blocksize_bytes + ) { + storage::dispatch_storage_kind( + initial_kind, + [&](auto&& tag) { + using Tag = std::decay_t; + this->install_swap_closure(blocksize_bytes); + } + ); } // Constructor used during loading @@ -452,6 +526,7 @@ class DynamicVamanaIndexImpl { buffer_config.get_search_window_size(), buffer_config.get_total_capacity()}; metric_type_ = metric; storage_kind_ = storage_kind; + current_storage_kind_ = storage_kind; build_params_ = VamanaIndex::BuildParams{ impl_->get_graph_max_degree(), impl_->get_prune_to(), @@ -516,6 +591,283 @@ class DynamicVamanaIndexImpl { VamanaIndex::DynamicIndexParams dynamic_index_params_; std::unique_ptr impl_; size_t ntotal_soft_deleted{0}; + + /// Storage kind currently backing `impl_`. Differs from `storage_kind_` only when + /// deferred compression is enabled and the threshold has not yet been crossed. + StorageKind current_storage_kind_{StorageKind::FP32}; + + /// Closure that, when invoked, trains the target compressed dataset from the + /// current uncompressed `impl_` and reseats `impl_` with a new + /// `MutableVamanaIndex<...>` whose `Data` template is the trained type. Reuses the + /// existing graph + ID translation. Set during `init_impl` only when delayed + /// compression is enabled. Reset to empty after a successful swap. + std::function swap_to_target_fn_; + + /// @brief Choose the storage kind used while accumulating below the threshold. + StorageKind effective_initial_storage_kind() const { + if (dynamic_index_params_.deferred_compression_threshold == 0) { + // Eager path: build directly with the requested storage kind. + return storage_kind_; + } + // If the user requested an untrained target there's nothing to delay. + if (storage_kind_ == StorageKind::FP32 || + storage_kind_ == StorageKind::FP16) { + return storage_kind_; + } + return dynamic_index_params_.initial_storage_kind; + } + + /// @brief If deferred compression is configured and the threshold is reached, run + /// the swap. Called after every successful `add()`. + void maybe_swap_to_target_storage() { + if (!swap_to_target_fn_ || !impl_) { + return; + } + if (impl_->size() < dynamic_index_params_.deferred_compression_threshold) { + return; + } + try { + swap_to_target_fn_(); + // Successful swap: clear the closure so we don't try again. + swap_to_target_fn_ = nullptr; + current_storage_kind_ = storage_kind_; + } catch (const std::exception&) { + // Leave the existing uncompressed index intact; retry on next add(). + // (See plan section "Rollback on swap failure".) + } + } + + /// @brief Set up the swap closure. Captures the current source storage tag (so we + /// know the concrete `MutableVamanaIndex<...>` type to downcast to inside the + /// type-erased orchestrator) and the target storage kind. + /// + /// Instantiated for every storage tag from `init_impl`'s dispatch but only + /// effective for FP32/FP16 sources — the constructor validates + /// `initial_storage_kind` is one of those. + template + void install_swap_closure(lib::PowerOfTwo blocksize_bytes) { + install_swap_closure_with_trainer( + blocksize_bytes, DefaultTrainer{} + ); + } + + /// @brief Install a swap closure that uses a caller-supplied trainer callable + /// instead of the default one. + /// + /// The trainer is invoked as ``trainer(target_tag, source_data, threadpool, + /// allocator)`` and must return the trained compressed dataset of type + /// ``typename decltype(target_tag)::type``. Used by `DynamicVamanaIndexLeanVecImpl` + /// to inject pre-trained LeanVec matrices and a user-specified ``leanvec_dims``. + template + void install_swap_closure_with_trainer( + lib::PowerOfTwo blocksize_bytes, Trainer trainer + ) { + if constexpr (is_uncompressed_source_tag_v) { + const auto target_kind = storage_kind_; + swap_to_target_fn_ = + [this, target_kind, blocksize_bytes, trainer = std::move(trainer)]() { + this->do_swap_to_target_storage( + target_kind, blocksize_bytes, trainer + ); + }; + } else { + // Compressed source kinds are rejected at construction time. Avoid + // instantiating the swap path for them. + (void)blocksize_bytes; + (void)trainer; + } + } + + /// @brief Trait: is the source storage tag one of the uncompressed (trainable + /// from) backends FP32 / FP16? + template struct is_uncompressed_source_tag : std::false_type {}; + template + struct is_uncompressed_source_tag> + : std::true_type {}; + template + struct is_uncompressed_source_tag> + : std::true_type {}; + template + static constexpr bool is_uncompressed_source_tag_v = + is_uncompressed_source_tag::value; + + /// @brief Trait: is the target storage tag a *trainable* compressed type that the + /// deferred-compression swap can produce from an uncompressed source? + /// + /// Used to avoid instantiating the swap body for FP32/FP16 target tags (which + /// would require returning a default-constructed dataset, which not all dataset + /// types support). + template struct is_trainable_target_tag : std::false_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; +#ifdef SVS_RUNTIME_HAVE_LVQ_LEANVEC + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; + template + struct is_trainable_target_tag> + : std::true_type {}; +#endif + template + static constexpr bool is_trainable_target_tag_v = is_trainable_target_tag::value; + + /// @brief Perform the actual swap: train the compressed dataset from the + /// accumulated source dataset, then transplant onto a fresh `MutableVamanaIndex` + /// with the trained `Data` type, reusing graph + translator + status + entry-point. + template + void do_swap_to_target_storage( + StorageKind target_kind, + lib::PowerOfTwo blocksize_bytes, + const Trainer& trainer + ) { + using SourceData = typename SourceTag::type; + using Graph = svs::graphs::SimpleBlockedGraph; + + // The DynamicVamanaIndexImpl always builds with `lib::Types` as the + // query type set (see build_impl below). + using QueryTypes = svs::lib::Types; + + svs::DistanceDispatcher distance_dispatcher(to_svs_distance(metric_type_)); + distance_dispatcher([&](auto distance_function) { + using Dist = std::decay_t; + using SourceIndex = + svs::index::vamana::MutableVamanaIndex; + + auto* concrete = + impl_->template get_typed_impl(); + if (concrete == nullptr) { + throw StatusException{ + ErrorCode::RUNTIME_ERROR, + "Deferred compression swap: unable to recover concrete index type"}; + } + + // Train the new compressed dataset from the source. Uses a freshly owned + // threadpool because `concrete`'s pool is about to be released. + auto train_pool = default_threadpool(); + + // Dispatch to the target storage tag so we have the static target type. + // Then build the compressed dataset and transplant onto a new + // MutableVamanaIndex via the friendly transplant ctor (which moves out + // of `*concrete` internally). + storage::dispatch_storage_kind( + target_kind, + [&](auto&& target_tag) { + using TargetTag = std::decay_t; + if constexpr (!is_trainable_target_tag_v || + !Trainer::template supports) { + // Either the target storage isn't a trainable kind (FP32 / + // FP16) or the active trainer doesn't support this target + // (e.g. the LeanVec subclass's trainer doesn't handle SQ / + // LVQ targets). Both are configuration errors caught at + // construction time, so this branch is unreachable in + // practice; we simply avoid instantiating the body. + throw StatusException{ + ErrorCode::INVALID_ARGUMENT, + "Deferred compression: trainer does not support the " + "configured target storage kind"}; + } else { + using TargetData = typename TargetTag::type; + using TargetAlloc = typename TargetTag::allocator_type; + + auto allocator = + storage::make_allocator(blocksize_bytes); + + // Train the compressed dataset directly from the source + // dataset using the caller-supplied trainer. The default + // trainer dispatches to each backend's native factory; the + // LeanVec subclass uses a trainer that injects pre-trained + // matrices and ``leanvec_dims``. + TargetData new_data = trainer( + TargetTag{}, + concrete->view_data(), + train_pool, + allocator + ); + + // Construct the new MutableVamanaIndex via the transplant + // constructor. The ctor moves out of `*concrete` internally + // (graph / status / entry-point / translator / distance / + // build & search params), so this single call replaces the + // explicit release_*() shuffle. + using TargetIndex = svs::index::vamana:: + MutableVamanaIndex; + auto new_index = TargetIndex{ + typename TargetIndex::TransplantTag{}, + std::move(*concrete), + std::move(new_data), + default_threadpool() + }; + + // Reseat `impl_` with a freshly type-erased DynamicVamana + // around the new compressed-backend index. Destroying the + // old `impl_` also destroys the (now moved-from) source + // MutableVamanaIndex. + impl_ = std::make_unique( + svs::DynamicVamana::AssembleTag{}, + QueryTypes{}, + std::move(new_index) + ); + } // end of `if constexpr (is_trainable_target_tag_v)` + } + ); + }); + } + + /// @brief Default trainer used by the base class. Dispatches to each backend's + /// native training factory (SQ::compress / LVQ::compress / LeanDataset::reduce + /// with auto-PCA matrices and default leanvec_dims). + struct DefaultTrainer { + // Supports every trainable target tag. + template + static constexpr bool supports = is_trainable_target_tag_v; + + template + auto operator()( + TargetTag, const Source& source, Pool& pool, const Alloc& allocator + ) const { + using TargetData = typename TargetTag::type; + if constexpr (svs::quantization::scalar::IsSQData) { + return TargetData::compress(source, pool, allocator); + } +#ifdef SVS_RUNTIME_HAVE_LVQ_LEANVEC + else if constexpr (svs::quantization::lvq::IsLVQDataset) { + return TargetData::compress(source, pool, 0, allocator); + } else if constexpr (svs::leanvec::IsLeanDataset) { + const size_t leanvec_d = (source.dimensions() + 1) / 2; + return TargetData::reduce( + source, + std::nullopt, + pool, + 0, + svs::lib::MaybeStatic{leanvec_d}, + allocator + ); + } +#endif + else { + static_assert( + !sizeof(TargetData*), + "DefaultTrainer instantiated for an unsupported target type" + ); + } + } + }; }; } // namespace runtime diff --git a/bindings/cpp/src/dynamic_vamana_index_leanvec_impl.h b/bindings/cpp/src/dynamic_vamana_index_leanvec_impl.h index 4d59281d..aa45f685 100644 --- a/bindings/cpp/src/dynamic_vamana_index_leanvec_impl.h +++ b/bindings/cpp/src/dynamic_vamana_index_leanvec_impl.h @@ -107,6 +107,21 @@ struct DynamicVamanaIndexLeanVecImpl : public DynamicVamanaIndexImpl { lib::PowerOfTwo blocksize_bytes ) override { assert(storage::is_leanvec_storage(this->storage_kind_)); + if (this->deferred_compression_enabled() && + data.size() < + this->dynamic_index_params_.deferred_compression_threshold) { + // Delegate the build to the base class (which builds with the + // uncompressed `initial_storage_kind`) and let our overridden + // `setup_deferred_compression_swap` install a LeanVec-aware swap closure. + DynamicVamanaIndexImpl::init_impl(data, labels, blocksize_bytes); + return; + } + // Eager path (also taken when the very first add already meets the deferred + // threshold): build the LeanVec backend directly with the configured + // training data (matrices / leanvec_dims). + if (this->deferred_compression_enabled()) { + this->current_storage_kind_ = this->storage_kind_; + } impl_.reset(dispatch_leanvec_storage_kind( this->storage_kind_, [this]( @@ -133,10 +148,67 @@ struct DynamicVamanaIndexLeanVecImpl : public DynamicVamanaIndexImpl { )); } + void setup_deferred_compression_swap( + StorageKind initial_kind, lib::PowerOfTwo blocksize_bytes + ) override { + // Capture LeanVec training data by value so the closure does not depend on + // the lifetime of `*this` for those parameters. + LeanVecTrainer trainer{leanvec_dims_, leanvec_matrices_}; + storage::dispatch_storage_kind( + initial_kind, + [&](auto&& tag) { + using Tag = std::decay_t; + this->install_swap_closure_with_trainer( + blocksize_bytes, trainer + ); + } + ); + } + protected: size_t leanvec_dims_; std::optional leanvec_matrices_; + /// @brief Trainer used by the deferred-compression swap when the target is a + /// LeanVec storage kind. Reuses pre-trained matrices when supplied; otherwise + /// trains PCA matrices from the accumulated source dataset (the same path the + /// eager builder uses when `leanvec_matrices_ == std::nullopt`). + struct LeanVecTrainer { + size_t leanvec_dims; + std::optional leanvec_matrices; + + // Only LeanVec target storage kinds are supported. + template + static constexpr bool supports = + svs::leanvec::IsLeanDataset; + + template + auto operator()( + TargetTag, const Source& source, Pool& pool, const Alloc& allocator + ) const { + using TargetData = typename TargetTag::type; + if constexpr (svs::leanvec::IsLeanDataset) { + size_t d = leanvec_dims; + if (d == 0) { + d = (source.dimensions() + 1) / 2; + } + return TargetData::reduce( + source, + leanvec_matrices, + pool, + 0, + svs::lib::MaybeStatic{d}, + allocator + ); + } else { + static_assert( + !sizeof(TargetData*), + "LeanVecTrainer instantiated for a non-LeanVec target type" + ); + } + } + }; + StorageKind check_storage_kind(StorageKind kind) { if (!storage::is_leanvec_storage(kind)) { throw StatusException( diff --git a/bindings/cpp/tests/runtime_test.cpp b/bindings/cpp/tests/runtime_test.cpp index 201375d3..e4d9a27a 100644 --- a/bindings/cpp/tests/runtime_test.cpp +++ b/bindings/cpp/tests/runtime_test.cpp @@ -881,3 +881,282 @@ CATCH_TEST_CASE("RangeSearchFunctionalStatic", "[runtime][static_vamana]") { svs::runtime::v0::VamanaIndex::destroy(index); } + +// ===================================================================== +// Deferred-compression tests +// +// These exercise the runtime orchestrator that watches the live insert count +// and triggers the swap from the initial uncompressed storage backend +// (FP32 / FP16) to the configured trained target (LVQ / LeanVec / SQ) once +// the threshold is reached, while reusing the existing graph. +// ===================================================================== +namespace { + +// Add `flat_data` in two chunks straddling `threshold`. Returns the storage +// kind reported between the two `add()` calls and after the second. +struct DelayedSwapObservation { + svs::runtime::v0::StorageKind kind_before_swap; + svs::runtime::v0::StorageKind kind_after_swap; +}; + +DelayedSwapObservation add_in_two_halves( + svs::runtime::v0::DynamicVamanaIndex& index, + const std::vector& flat_data, + size_t n_total, + size_t d, + size_t first_chunk +) { + CATCH_REQUIRE(first_chunk < n_total); + std::vector ids(n_total); + std::iota(ids.begin(), ids.end(), 0); + + auto status = index.add(first_chunk, ids.data(), flat_data.data()); + CATCH_REQUIRE(status.ok()); + auto kind_before = index.get_current_storage_kind(); + + status = index.add( + n_total - first_chunk, + ids.data() + first_chunk, + flat_data.data() + first_chunk * d + ); + CATCH_REQUIRE(status.ok()); + auto kind_after = index.get_current_storage_kind(); + + return {kind_before, kind_after}; +} + +} // namespace + +CATCH_TEST_CASE( + "Deferred compression FP32 -> LVQ4x8 via runtime", + "[runtime][deferred_compression]" +) { + const auto& test_data = get_test_data(); + const size_t threshold = test_n / 2; + const size_t first_chunk = threshold / 2; // stays under threshold + + svs::runtime::v0::DynamicVamanaIndex* index = nullptr; + svs::runtime::v0::VamanaIndex::BuildParams build_params{/*graph_max_degree=*/64}; + svs::runtime::v0::VamanaIndex::DynamicIndexParams dyn_params{}; + dyn_params.deferred_compression_threshold = threshold; + dyn_params.initial_storage_kind = svs::runtime::v0::StorageKind::FP32; + + auto status = svs::runtime::v0::DynamicVamanaIndex::build( + &index, + test_d, + svs::runtime::v0::MetricType::L2, + svs::runtime::v0::StorageKind::LVQ4x8, + build_params, + {}, + dyn_params + ); + if (!svs::runtime::v0::DynamicVamanaIndex::check_storage_kind( + svs::runtime::v0::StorageKind::LVQ4x8) + .ok()) { + CATCH_REQUIRE(!status.ok()); + return; + } + CATCH_REQUIRE(status.ok()); + CATCH_REQUIRE(index != nullptr); + + // Before any add: nothing built yet, current kind is the initial kind. + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::FP32 + ); + + auto obs = add_in_two_halves(*index, test_data, test_n, test_d, first_chunk); + CATCH_REQUIRE(obs.kind_before_swap == svs::runtime::v0::StorageKind::FP32); + CATCH_REQUIRE(obs.kind_after_swap == svs::runtime::v0::StorageKind::LVQ4x8); + + // Sanity-check search after the swap (the graph should survive). + const int nq = 5; + const int k = 5; + std::vector distances(nq * k); + std::vector labels(nq * k); + status = + index->search(nq, test_data.data(), k, distances.data(), labels.data()); + CATCH_REQUIRE(status.ok()); + + svs::runtime::v0::DynamicVamanaIndex::destroy(index); +} + +CATCH_TEST_CASE( + "Deferred compression FP32 -> LeanVec4x8 via runtime", + "[runtime][deferred_compression]" +) { + const auto& test_data = get_test_data(); + const size_t threshold = test_n / 2; + const size_t first_chunk = threshold / 2; + + svs::runtime::v0::DynamicVamanaIndex* index = nullptr; + svs::runtime::v0::VamanaIndex::BuildParams build_params{/*graph_max_degree=*/64}; + svs::runtime::v0::VamanaIndex::DynamicIndexParams dyn_params{}; + dyn_params.deferred_compression_threshold = threshold; + dyn_params.initial_storage_kind = svs::runtime::v0::StorageKind::FP32; + + auto status = svs::runtime::v0::DynamicVamanaIndexLeanVec::build( + &index, + test_d, + svs::runtime::v0::MetricType::L2, + svs::runtime::v0::StorageKind::LeanVec4x8, + /*leanvec_dims=*/test_d / 2, + build_params, + {}, + dyn_params + ); + if (!svs::runtime::v0::DynamicVamanaIndex::check_storage_kind( + svs::runtime::v0::StorageKind::LeanVec4x8) + .ok()) { + CATCH_REQUIRE(!status.ok()); + return; + } + CATCH_REQUIRE(status.ok()); + CATCH_REQUIRE(index != nullptr); + + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::FP32 + ); + + auto obs = add_in_two_halves(*index, test_data, test_n, test_d, first_chunk); + CATCH_REQUIRE(obs.kind_before_swap == svs::runtime::v0::StorageKind::FP32); + CATCH_REQUIRE(obs.kind_after_swap == svs::runtime::v0::StorageKind::LeanVec4x8); + + const int nq = 5; + const int k = 5; + std::vector distances(nq * k); + std::vector labels(nq * k); + status = + index->search(nq, test_data.data(), k, distances.data(), labels.data()); + CATCH_REQUIRE(status.ok()); + + svs::runtime::v0::DynamicVamanaIndex::destroy(index); +} + +CATCH_TEST_CASE( + "Deferred compression disabled by default keeps eager behavior via runtime", + "[runtime][deferred_compression]" +) { + const auto& test_data = get_test_data(); + + svs::runtime::v0::DynamicVamanaIndex* index = nullptr; + svs::runtime::v0::VamanaIndex::BuildParams build_params{/*graph_max_degree=*/64}; + // Default ctor: threshold == 0 -> eager. + svs::runtime::v0::VamanaIndex::DynamicIndexParams dyn_params{}; + + auto status = svs::runtime::v0::DynamicVamanaIndex::build( + &index, + test_d, + svs::runtime::v0::MetricType::L2, + svs::runtime::v0::StorageKind::LVQ4x8, + build_params, + {}, + dyn_params + ); + if (!svs::runtime::v0::DynamicVamanaIndex::check_storage_kind( + svs::runtime::v0::StorageKind::LVQ4x8) + .ok()) { + CATCH_REQUIRE(!status.ok()); + return; + } + CATCH_REQUIRE(status.ok()); + + // Eager mode: target kind is in effect from the start, before and after add. + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::LVQ4x8 + ); + + std::vector ids(test_n); + std::iota(ids.begin(), ids.end(), 0); + status = index->add(test_n, ids.data(), test_data.data()); + CATCH_REQUIRE(status.ok()); + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::LVQ4x8 + ); + + svs::runtime::v0::DynamicVamanaIndex::destroy(index); +} + +CATCH_TEST_CASE( + "Deferred compression rejects non-FP initial storage kinds via runtime", + "[runtime][deferred_compression]" +) { + svs::runtime::v0::DynamicVamanaIndex* index = nullptr; + svs::runtime::v0::VamanaIndex::BuildParams build_params{/*graph_max_degree=*/64}; + svs::runtime::v0::VamanaIndex::DynamicIndexParams dyn_params{}; + dyn_params.deferred_compression_threshold = 1024; + dyn_params.initial_storage_kind = svs::runtime::v0::StorageKind::LVQ4x4; + + auto status = svs::runtime::v0::DynamicVamanaIndex::build( + &index, + test_d, + svs::runtime::v0::MetricType::L2, + svs::runtime::v0::StorageKind::LVQ4x8, + build_params, + {}, + dyn_params + ); + CATCH_REQUIRE(!status.ok()); + CATCH_REQUIRE(index == nullptr); +} + +CATCH_TEST_CASE( + "Deferred compression first-add at-threshold builds target directly via runtime", + "[runtime][deferred_compression]" +) { + // When the very first add already meets the deferred-compression threshold, + // the runtime should skip the uncompressed staging build and construct the + // target compressed backend directly. Externally this is observable as the + // `current_storage_kind` jumping straight from the initial kind (pre-build) + // to the target kind after the first add, without ever materializing the + // uncompressed backend. + const auto& test_data = get_test_data(); + const size_t threshold = test_n / 2; // first add will be `test_n` >= threshold + + svs::runtime::v0::DynamicVamanaIndex* index = nullptr; + svs::runtime::v0::VamanaIndex::BuildParams build_params{/*graph_max_degree=*/64}; + svs::runtime::v0::VamanaIndex::DynamicIndexParams dyn_params{}; + dyn_params.deferred_compression_threshold = threshold; + dyn_params.initial_storage_kind = svs::runtime::v0::StorageKind::FP32; + + auto status = svs::runtime::v0::DynamicVamanaIndex::build( + &index, + test_d, + svs::runtime::v0::MetricType::L2, + svs::runtime::v0::StorageKind::LVQ4x8, + build_params, + {}, + dyn_params + ); + if (!svs::runtime::v0::DynamicVamanaIndex::check_storage_kind( + svs::runtime::v0::StorageKind::LVQ4x8) + .ok()) { + CATCH_REQUIRE(!status.ok()); + return; + } + CATCH_REQUIRE(status.ok()); + + // Pre-add: index reports initial kind. + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::FP32 + ); + + std::vector ids(test_n); + std::iota(ids.begin(), ids.end(), 0); + status = index->add(test_n, ids.data(), test_data.data()); + CATCH_REQUIRE(status.ok()); + + // Post-first-add: target kind is already in effect. + CATCH_REQUIRE( + index->get_current_storage_kind() == svs::runtime::v0::StorageKind::LVQ4x8 + ); + + const int nq = 5; + const int k = 5; + std::vector distances(nq * k); + std::vector labels(nq * k); + status = + index->search(nq, test_data.data(), k, distances.data(), labels.data()); + CATCH_REQUIRE(status.ok()); + + svs::runtime::v0::DynamicVamanaIndex::destroy(index); +} diff --git a/include/svs/index/vamana/dynamic_index.h b/include/svs/index/vamana/dynamic_index.h index 5f0ce7c1..f72c7bec 100644 --- a/include/svs/index/vamana/dynamic_index.h +++ b/include/svs/index/vamana/dynamic_index.h @@ -295,6 +295,137 @@ class MutableVamanaIndex { , use_full_search_history_{config.build_parameters.use_full_search_history} , logger_{std::move(logger)} {} + /// @brief Tag type used to disambiguate the "transplant" constructor. + /// + /// See `MutableVamanaIndex(TransplantTag, ...)` for the rationale. + struct TransplantTag {}; + + private: + /// @brief Internal aggregate of the per-instance state moved out of the source + /// index during a transplant. Constructed by `_take_transplant_inputs` and + /// consumed by the private delegating constructor below. Not part of the + /// public API. + struct TransplantInputs { + Data data; + Graph graph; + std::vector status; + entry_point_type entry_point; + IDTranslator translator; + Dist distance; + VamanaBuildParameters build_parameters; + VamanaSearchParameters search_parameters; + }; + + /// @brief Snapshot value-typed members of `source` and then move-out the + /// owning members in a well-defined sequence (the brace-init list of the + /// returned aggregate enforces left-to-right evaluation in C++17). + template + static TransplantInputs _take_transplant_inputs( + MutableVamanaIndex&& source, Data new_data + ) { + // Snapshot value-typed members BEFORE any release_*() invalidates other + // owning members. + VamanaBuildParameters bp = source.view_build_parameters(); + VamanaSearchParameters sp = source.get_search_parameters(); + return TransplantInputs{ + std::move(new_data), + std::move(source).release_graph(), + std::move(source).release_status(), + std::move(source).release_entry_point(), + std::move(source).release_translator(), + std::move(source).release_distance(), + std::move(bp), + std::move(sp), + }; + } + + /// @brief Private delegating constructor that consumes a `TransplantInputs`. + /// Used by the public `(TransplantTag, source&&, new_data, pool, logger)` + /// constructor below. + MutableVamanaIndex( + TransplantTag SVS_UNUSED(tag), + TransplantInputs args, + threads::ThreadPoolHandle threadpool, + svs::logging::logger_ptr logger + ) + : graph_{std::move(args.graph)} + , data_{std::move(args.data)} + , entry_point_{std::move(args.entry_point)} + , status_{std::move(args.status)} + , first_empty_{0} + , translator_{std::move(args.translator)} + , distance_{std::move(args.distance)} + , threadpool_{std::move(threadpool)} + , search_parameters_{args.search_parameters} + , construction_window_size_{args.build_parameters.window_size} + , max_candidates_{args.build_parameters.max_candidate_pool_size} + , prune_to_{args.build_parameters.prune_to} + , alpha_{args.build_parameters.alpha} + , use_full_search_history_{args.build_parameters.use_full_search_history} + , build_parameters_{args.build_parameters} + , logger_{std::move(logger)} { + // Validate size invariants up front so a bad transplant fails loudly rather + // than silently corrupting later operations. + if (data_.size() != graph_.n_nodes() || data_.size() != status_.size()) { + throw ANNEXCEPTION( + "Transplant size mismatch: data={}, graph={}, status={}", + data_.size(), + graph_.n_nodes(), + status_.size() + ); + } + // Recompute ``first_empty_`` from ``status_`` so subsequent ``add_points`` + // calls correctly find slots to reuse. + first_empty_ = status_.size(); + for (size_t i = 0, imax = status_.size(); i < imax; ++i) { + if (status_[i] == SlotMetadata::Empty) { + first_empty_ = i; + break; + } + } + } + + public: + /// @brief Transplant constructor: build a `MutableVamanaIndex` over a newly + /// supplied `Data` while reusing the graph / translator / status / entry-point + /// / distance / build & search parameters from a prior instance. + /// + /// This is the building block for "deferred compression": once enough vectors + /// have been accumulated in an uncompressed dynamic index, the caller trains a + /// compressed dataset from the accumulated vectors and then transplants the + /// existing graph (and bookkeeping structures) onto a new index whose `Data` + /// template parameter is the compressed type, avoiding a graph rebuild. + /// + /// Callers do not need to know the internal layout of `MutableVamanaIndex`: + /// they hand in the source index by rvalue reference, the new `Data`, and a + /// thread pool. The constructor moves out of `source` internally. + /// + /// Preconditions + /// * ``new_data.size() == source.size()`` (i.e. the trained dataset must + /// contain exactly the same number of slots as the source). + /// * The graph type (template parameter `Graph`) and distance type + /// (template parameter `Dist`) match between `source` and the new index; + /// only `Data` differs. + /// + /// The new instance recomputes ``first_empty_`` from the transplanted status. + template + MutableVamanaIndex( + TransplantTag tag, + MutableVamanaIndex&& source, + Data new_data, + Pool threadpool, + svs::logging::logger_ptr logger = svs::logging::get() + ) + : MutableVamanaIndex( + tag, + _take_transplant_inputs( + std::move(source), std::move(new_data) + ), + threads::ThreadPoolHandle{ + threads::as_threadpool(std::move(threadpool))}, + std::move(logger) + ) {} + ///// Scratchspace scratchspace_type scratchspace(const search_parameters_type& sp) const { return scratchspace_type{ @@ -1179,6 +1310,36 @@ class MutableVamanaIndex { const Data& view_data() const { return data_; } const Graph& view_graph() const { return graph_; } + ///// + ///// Release / move-out accessors (for the "transplant" / deferred-compression path). + ///// + /// + /// These are rvalue-qualified to make the "this object is being consumed" contract + /// explicit. The matching `MutableVamanaIndex(TransplantTag, ...)` constructor + /// accepts each piece by value, so callers can hand the released members in cheaply + /// without copying multi-gigabyte graphs/translators. + /// + /// Typical use: + /// ``` + /// auto data = std::move(old).release_data(); // training input + /// auto graph = std::move(old).release_graph(); + /// auto status = std::move(old).release_status(); + /// auto entry = std::move(old).release_entry_point(); + /// auto translator = std::move(old).release_translator(); + /// // ... train new compressed dataset from `data` ... + /// // ... old is destroyed; build new index via TransplantTag ... + /// ``` + Graph release_graph() && { return std::move(graph_); } + Data release_data() && { return std::move(data_); } + entry_point_type release_entry_point() && { return std::move(entry_point_); } + std::vector release_status() && { return std::move(status_); } + IDTranslator release_translator() && { return std::move(translator_); } + Dist release_distance() && { return std::move(distance_); } + threads::ThreadPoolHandle release_threadpool() && { return std::move(threadpool_); } + + /// @brief View the build parameters captured at construction. + const VamanaBuildParameters& view_build_parameters() const { return build_parameters_; } + /// /// @brief Verify the invariants of this data structure. /// diff --git a/include/svs/orchestrators/dynamic_vamana.h b/include/svs/orchestrators/dynamic_vamana.h index 04507738..acc11f14 100644 --- a/include/svs/orchestrators/dynamic_vamana.h +++ b/include/svs/orchestrators/dynamic_vamana.h @@ -68,6 +68,15 @@ class DynamicVamanaImpl : public VamanaImpl(args)...} {} + /// @brief Public accessor for the contained concrete index. + /// + /// Re-exposes `base_type::impl` (which is `protected` in `ManagerImpl`) so that + /// `svs::DynamicVamana::get_typed_impl()` can return a usable + /// pointer to the underlying `MutableVamanaIndex<...>` for the deferred-compression + /// swap path. + Impl& impl() { return base_type::impl(); } + const Impl& impl() const { return base_type::impl(); } + // Implement the interface. void add_points( const float* data, @@ -139,6 +148,36 @@ class DynamicVamana : public manager::IndexManager { : base_type{ std::make_unique>(std::move(impl))} {} + /// @brief Try to access the contained concrete index. + /// + /// Returns a non-owning pointer to the contained `Impl` (a `MutableVamanaIndex<...>`) + /// if the underlying type-erased storage was constructed with the matching + /// ``QueryTypes`` and ``Impl``, otherwise `nullptr`. + /// + /// This is intended for advanced runtime-API code (e.g. the deferred-compression + /// path in `DynamicVamanaIndexImpl`) that wishes to reach through the type-erasure + /// barrier in a controlled way. Callers are responsible for not invalidating the + /// returned pointer (e.g. by destroying the `DynamicVamana` while the pointer is + /// in use). + template Impl* get_typed_impl() { + auto* concrete = + dynamic_cast*>(impl_.get()); + if (concrete == nullptr) { + return nullptr; + } + return &concrete->impl(); + } + + template + const Impl* get_typed_impl() const { + const auto* concrete = + dynamic_cast*>(impl_.get()); + if (concrete == nullptr) { + return nullptr; + } + return &concrete->impl(); + } + ///// Vamana Interface void experimental_reset_performance_parameters() { impl_->reset_performance_parameters();