diff --git a/include/svs/core/translation.h b/include/svs/core/translation.h index db65bf21..4b91a031 100644 --- a/include/svs/core/translation.h +++ b/include/svs/core/translation.h @@ -324,7 +324,7 @@ class IDTranslator { "external_to_internal_translation"; static constexpr lib::Version save_version = lib::Version(0, 0, 0); - lib::SaveTable save_table() const { + lib::SaveTable metadata() const { return lib::SaveTable( serialization_schema, save_version, @@ -348,7 +348,7 @@ class IDTranslator { // Save the translations to a file. auto os = lib::open_write(filename); save(os); - auto table = save_table(); + auto table = metadata(); table.insert("filename", lib::save(filename.filename())); return table; } diff --git a/include/svs/index/flat/dynamic_flat.h b/include/svs/index/flat/dynamic_flat.h index 71d72e10..e0bd5421 100644 --- a/include/svs/index/flat/dynamic_flat.h +++ b/include/svs/index/flat/dynamic_flat.h @@ -415,7 +415,7 @@ template class DynamicFlatIndex { save_version, { {"name", name()}, - {"translation", lib::detail::exit_hook(translator_.save_table())}, + {"translation", lib::detail::exit_hook(translator_.metadata())}, } ); lib::save_to_stream(save_table, os); diff --git a/include/svs/index/ivf/dynamic_ivf.h b/include/svs/index/ivf/dynamic_ivf.h index 40ad32c3..7462224f 100644 --- a/include/svs/index/ivf/dynamic_ivf.h +++ b/include/svs/index/ivf/dynamic_ivf.h @@ -761,8 +761,32 @@ class DynamicIVFIndex { lib::save_to_disk(clusters_, clusters_dir); } - void save(std::ostream& SVS_UNUSED(os)) const { - throw ANNEXCEPTION("Placeholder; not implemented!"); + void save(std::ostream& os) { + // Compact before saving to remove empty slots + compact(); + + // Get data type configuration for automatic loader construction during load + auto data_type_config = DataTypeTraits::get_config(); + data_type_config.centroid_type = datatype_v; + + lib::begin_serialization(os); + + auto save_table = lib::SaveTable( + "dynamic_ivf_config", + save_version, + {{"name", lib::save(name())}, + {"translation", lib::detail::exit_hook(translator_.metadata())}, + {"num_clusters", lib::save(clusters_.size())}} + ); + save_table.insert("data_type_config", lib::save(data_type_config)); + lib::save_to_stream(save_table, os); + translator_.save(os); + + // Save centroids + lib::save_to_stream(centroids_, os); + + // Save clusters + lib::save_to_stream(clusters_, os); } private: @@ -1171,4 +1195,54 @@ auto load_dynamic_ivf_index( return index; } +template < + typename CentroidType, + typename DataType, + typename Distance, + typename ThreadpoolProto> +auto load_dynamic_ivf_index( + std::istream& is, + Distance distance, + ThreadpoolProto threadpool_proto, + const size_t intra_query_thread_count = 1, + svs::logging::logger_ptr logger = svs::logging::get() +) { + using centroids_type = data::SimpleData; + using I = uint32_t; + using blocked_data_type = typename DataType::lib_blocked_alloc_data_type; + using cluster_type = DenseClusteredDataset; + + // Read config table and translator + auto table = lib::detail::read_metadata(is); + + auto translation = + table.template cast().at("translation").template cast(); + auto translator = IDTranslator::load(translation, is); + + // Load centroids + auto centroids = lib::load_from_stream(is); + + // Load clusters with small block size allocator for IVF + auto blocking_params = data::BlockingParameters{ + .blocksize_bytes = lib::PowerOfTwo(20) // 2^20 = 1MB + }; + using allocator_type = typename blocked_data_type::allocator_type; + auto blocked_allocator = + allocator_type(blocking_params, typename allocator_type::allocator_type()); + + auto dense_clusters = lib::load_from_stream(is, blocked_allocator); + + auto threadpool = threads::as_threadpool(std::move(threadpool_proto)); + + return DynamicIVFIndex( + std::move(centroids), + std::move(dense_clusters), + std::move(translator), + std::move(distance), + std::move(threadpool), + intra_query_thread_count, + logger + ); +} + } // namespace svs::index::ivf diff --git a/include/svs/index/vamana/dynamic_index.h b/include/svs/index/vamana/dynamic_index.h index a5b88b4d..5f0ce7c1 100644 --- a/include/svs/index/vamana/dynamic_index.h +++ b/include/svs/index/vamana/dynamic_index.h @@ -1047,7 +1047,7 @@ class MutableVamanaIndex { { {"name", lib::save(name())}, {"parameters", lib::save(parameters())}, - {"translation", lib::detail::exit_hook(translator_.save_table())}, + {"translation", lib::detail::exit_hook(translator_.metadata())}, } ); lib::save_to_stream(save_table, os); diff --git a/include/svs/orchestrators/dynamic_ivf.h b/include/svs/orchestrators/dynamic_ivf.h index 0b3592ab..f571807e 100644 --- a/include/svs/orchestrators/dynamic_ivf.h +++ b/include/svs/orchestrators/dynamic_ivf.h @@ -123,13 +123,11 @@ class DynamicIVFImpl : public IVFImpl { } void save(std::ostream& stream) override { - lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_save"}; - const auto config_dir = tempdir.get() / "config"; - const auto data_dir = tempdir.get() / "data"; - std::filesystem::create_directories(config_dir); - std::filesystem::create_directories(data_dir); - save(config_dir, data_dir); - lib::DirectoryArchiver::pack(tempdir, stream); + if constexpr (Impl::supports_saving) { + impl().save(stream); + } else { + throw ANNEXCEPTION("The current DynamicIVF backend doesn't support saving!"); + } } }; @@ -431,28 +429,58 @@ class DynamicIVF : public manager::IndexManager { ThreadPoolProto threadpool_proto, size_t intra_query_threads = 1 ) { - namespace fs = std::filesystem; - lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_load"}; - lib::DirectoryArchiver::unpack(stream, tempdir); + auto deserializer = svs::lib::detail::Deserializer::build(stream); + if (deserializer.is_native()) { + if constexpr (std::is_same_v, DistanceType>) { + auto dispatcher = DistanceDispatcher(distance); + return dispatcher([&](auto distance_function) { + return DynamicIVF( + AssembleTag(), + manager::as_typelist{}, + index::ivf::load_dynamic_ivf_index( + stream, + std::move(distance_function), + std::move(threadpool_proto), + intra_query_threads + ) + ); + }); + } else { + return DynamicIVF( + AssembleTag(), + manager::as_typelist{}, + index::ivf::load_dynamic_ivf_index( + stream, distance, std::move(threadpool_proto), intra_query_threads + ) + ); + } + } else { + namespace fs = std::filesystem; + lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_load"}; + lib::DirectoryArchiver::unpack(stream, tempdir, deserializer.magic()); + + const auto config_path = tempdir.get() / "config"; + if (!fs::is_directory(config_path)) { + throw ANNEXCEPTION( + "Invalid DynamicIVF index archive: missing config directory!" + ); + } - const auto config_path = tempdir.get() / "config"; - if (!fs::is_directory(config_path)) { - throw ANNEXCEPTION("Invalid DynamicIVF index archive: missing config directory!" - ); - } + const auto data_path = tempdir.get() / "data"; + if (!fs::is_directory(data_path)) { + throw ANNEXCEPTION( + "Invalid DynamicIVF index archive: missing data directory!" + ); + } - const auto data_path = tempdir.get() / "data"; - if (!fs::is_directory(data_path)) { - throw ANNEXCEPTION("Invalid DynamicIVF index archive: missing data directory!"); + return assemble( + config_path, + data_path, + distance, + std::move(threadpool_proto), + intra_query_threads + ); } - - return assemble( - config_path, - data_path, - distance, - std::move(threadpool_proto), - intra_query_threads - ); } }; diff --git a/tests/svs/index/ivf/dynamic_ivf.cpp b/tests/svs/index/ivf/dynamic_ivf.cpp index 9ae9cfd6..6547a911 100644 --- a/tests/svs/index/ivf/dynamic_ivf.cpp +++ b/tests/svs/index/ivf/dynamic_ivf.cpp @@ -25,6 +25,7 @@ #include "svs/lib/threads.h" #include "svs/lib/timing.h" #include "svs/misc/dynamic_helper.h" +#include "svs/orchestrators/dynamic_ivf.h" // tests #include "tests/utils/test_dataset.h" @@ -1193,95 +1194,193 @@ CATCH_TEST_CASE("Dynamic IVF Save and Load", "[dynamic_ivf][saveload]") { index.set_search_parameters(search_params); // Perform initial search to get baseline results + auto batch_queries = svs::data::ConstSimpleDataView{ + queries.data(), queries.size(), queries.dimensions()}; auto original_results = svs::QueryResult(queries.size(), NUM_NEIGHBORS); - index.search( - original_results.view(), - svs::data::ConstSimpleDataView{ - queries.data(), queries.size(), queries.dimensions()}, - search_params - ); + index.search(original_results.view(), batch_queries, search_params); + + CATCH_SECTION("Save and load from disk") { + // Create temporary directories for saving + auto temp_dir = svs_test::temp_directory(); + svs_test::prepare_temp_directory(); + auto config_dir = temp_dir / "config"; + auto data_dir = temp_dir / "data"; + + // Save the index + index.save(config_dir, data_dir); + + // Verify saved files exist + CATCH_REQUIRE(std::filesystem::exists(config_dir)); + CATCH_REQUIRE(std::filesystem::exists(data_dir / "centroids")); + // Verify format files exist in clusters/ subdirectory + // DenseClusteredDataset saves: clusters_archive.bin, ids.bin, cluster_sizes.bin, + // ids_offsets.bin + CATCH_REQUIRE( + std::filesystem::exists(data_dir / "clusters" / "clusters_archive.bin") + ); + CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "ids.bin")); + CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "cluster_sizes.bin")); + CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "ids_offsets.bin")); + + // Load the index back using the load function + auto loaded_index = svs::index::ivf::load_dynamic_ivf_index( + config_dir, + data_dir, + Distance(), + svs::threads::as_threadpool(num_threads), + 1 // intra_query_threads + ); - // Create temporary directories for saving - auto temp_dir = svs_test::temp_directory(); - svs_test::prepare_temp_directory(); - auto config_dir = temp_dir / "config"; - auto data_dir = temp_dir / "data"; - - // Save the index - index.save(config_dir, data_dir); - - // Verify saved files exist - CATCH_REQUIRE(std::filesystem::exists(config_dir)); - CATCH_REQUIRE(std::filesystem::exists(data_dir / "centroids")); - // Verify format files exist in clusters/ subdirectory - // DenseClusteredDataset saves: clusters_archive.bin, ids.bin, cluster_sizes.bin, - // ids_offsets.bin - CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "clusters_archive.bin")); - CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "ids.bin")); - CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "cluster_sizes.bin")); - CATCH_REQUIRE(std::filesystem::exists(data_dir / "clusters" / "ids_offsets.bin")); - - // Load the index back using the load function - auto loaded_index = svs::index::ivf::load_dynamic_ivf_index( - config_dir, - data_dir, - Distance(), - svs::threads::as_threadpool(num_threads), - 1 // intra_query_threads - ); + // Set search parameters on loaded index + loaded_index.set_search_parameters(search_params); - // Set search parameters on loaded index - loaded_index.set_search_parameters(search_params); + auto loaded_results = svs::QueryResult(queries.size(), NUM_NEIGHBORS); + loaded_index.search(loaded_results.view(), batch_queries, search_params); - // Perform search on loaded index - auto loaded_results = svs::QueryResult(queries.size(), NUM_NEIGHBORS); - loaded_index.search( - loaded_results.view(), - svs::data::ConstSimpleDataView{ - queries.data(), queries.size(), queries.dimensions()}, - search_params - ); + // Verify results match + for (size_t q = 0; q < queries.size(); ++q) { + for (size_t k = 0; k < NUM_NEIGHBORS; ++k) { + CATCH_REQUIRE(original_results.index(q, k) == loaded_results.index(q, k)); + CATCH_REQUIRE( + original_results.distance(q, k) == loaded_results.distance(q, k) + ); + } + } + + // Verify index properties are preserved + CATCH_REQUIRE(loaded_index.size() == index.size()); + CATCH_REQUIRE(loaded_index.num_clusters() == index.num_clusters()); + CATCH_REQUIRE(loaded_index.dimensions() == index.dimensions()); + + // Test that dynamic operations still work after loading + // Delete some points + std::vector ids_to_delete; + for (size_t i = 0; i < 10 && i < initial_indices.size(); ++i) { + ids_to_delete.push_back(initial_indices[i]); + } + size_t deleted = loaded_index.delete_entries(ids_to_delete); + CATCH_REQUIRE(deleted == ids_to_delete.size()); + CATCH_REQUIRE(loaded_index.size() == index.size() - deleted); + + // Compact and verify + loaded_index.compact(1000); + + // Search should still work after modifications + loaded_index.search( + loaded_results.view(), + svs::data::ConstSimpleDataView{ + queries.data(), queries.size(), queries.dimensions()}, + search_params + ); - // Verify results match - for (size_t q = 0; q < queries.size(); ++q) { - for (size_t k = 0; k < NUM_NEIGHBORS; ++k) { - CATCH_REQUIRE(original_results.index(q, k) == loaded_results.index(q, k)); - CATCH_REQUIRE(original_results.distance(q, k) == loaded_results.distance(q, k)); + // Verify we still get valid results + size_t valid_results = 0; + for (size_t i = 0; i < loaded_results.n_queries(); ++i) { + if (loaded_results.index(i, 0) != std::numeric_limits::max()) { + valid_results++; + } } + CATCH_REQUIRE(valid_results > 0); + svs_test::cleanup_temp_directory(); } - // Verify index properties are preserved - CATCH_REQUIRE(loaded_index.size() == index.size()); - CATCH_REQUIRE(loaded_index.num_clusters() == index.num_clusters()); - CATCH_REQUIRE(loaded_index.dimensions() == index.dimensions()); + CATCH_SECTION("Load DynamicIVF serialized natively to stream") { + std::stringstream stream; + index.save(stream); + { + using LoadDataType = svs::data::SimpleData; + + auto loaded_index = svs::DynamicIVF::assemble( + stream, Distance(), svs::threads::as_threadpool(num_threads), 1 + ); + + CATCH_REQUIRE(loaded_index.size() == index.size()); + CATCH_REQUIRE(loaded_index.dimensions() == index.dimensions()); + + auto loaded_results = svs::QueryResult(queries.size(), NUM_NEIGHBORS); + loaded_index.search(loaded_results.view(), batch_queries, search_params); + + for (size_t q = 0; q < queries.size(); ++q) { + for (size_t k = 0; k < NUM_NEIGHBORS; ++k) { + CATCH_REQUIRE( + original_results.index(q, k) == loaded_results.index(q, k) + ); + CATCH_REQUIRE( + original_results.distance(q, k) == loaded_results.distance(q, k) + ); + } + } + + // Verify index properties are preserved + CATCH_REQUIRE(loaded_index.size() == index.size()); + CATCH_REQUIRE(loaded_index.dimensions() == index.dimensions()); + + // Test that dynamic operations still work after loading + // Delete some points + std::vector ids_to_delete; + for (size_t i = 0; i < 10 && i < initial_indices.size(); ++i) { + ids_to_delete.push_back(initial_indices[i]); + } + size_t deleted = loaded_index.delete_points(ids_to_delete); + CATCH_REQUIRE(deleted == ids_to_delete.size()); + CATCH_REQUIRE(loaded_index.size() == index.size() - deleted); - // Test that dynamic operations still work after loading - // Delete some points - std::vector ids_to_delete; - for (size_t i = 0; i < 10 && i < initial_indices.size(); ++i) { - ids_to_delete.push_back(initial_indices[i]); + // Compact and verify + loaded_index.compact(1000); + + // Search should still work after modifications + loaded_index.search( + loaded_results.view(), + svs::data::ConstSimpleDataView{ + queries.data(), queries.size(), queries.dimensions()}, + search_params + ); + + // Verify we still get valid results + size_t valid_results = 0; + for (size_t i = 0; i < loaded_results.n_queries(); ++i) { + if (loaded_results.index(i, 0) != std::numeric_limits::max()) { + valid_results++; + } + } + CATCH_REQUIRE(valid_results > 0); + } } - size_t deleted = loaded_index.delete_entries(ids_to_delete); - CATCH_REQUIRE(deleted == ids_to_delete.size()); - CATCH_REQUIRE(loaded_index.size() == index.size() - deleted); - // Compact and verify - loaded_index.compact(1000); + CATCH_SECTION("Load DynamicIVF serialized with intermediate files") { + std::stringstream stream; + { + svs::lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_save"}; + const auto config_dir = tempdir.get() / "config"; + const auto data_dir = tempdir.get() / "data"; + std::filesystem::create_directories(config_dir); + std::filesystem::create_directories(data_dir); + index.save(config_dir, data_dir); + svs::lib::DirectoryArchiver::pack(tempdir, stream); + } + { + using LoadDataType = svs::data::SimpleData; + + auto loaded_ivf = svs::DynamicIVF::assemble( + stream, Distance(), svs::threads::as_threadpool(num_threads), 1 + ); - // Search should still work after modifications - loaded_index.search( - loaded_results.view(), - svs::data::ConstSimpleDataView{ - queries.data(), queries.size(), queries.dimensions()}, - search_params - ); + CATCH_REQUIRE(loaded_ivf.size() == index.size()); + CATCH_REQUIRE(loaded_ivf.dimensions() == index.dimensions()); - // Verify we still get valid results - size_t valid_results = 0; - for (size_t i = 0; i < loaded_results.n_queries(); ++i) { - if (loaded_results.index(i, 0) != std::numeric_limits::max()) { - valid_results++; + auto loaded_results = svs::QueryResult(queries.size(), NUM_NEIGHBORS); + loaded_ivf.search(loaded_results.view(), batch_queries, search_params); + + for (size_t q = 0; q < queries.size(); ++q) { + for (size_t k = 0; k < NUM_NEIGHBORS; ++k) { + CATCH_REQUIRE( + original_results.index(q, k) == loaded_results.index(q, k) + ); + CATCH_REQUIRE( + original_results.distance(q, k) == loaded_results.distance(q, k) + ); + } + } } } - CATCH_REQUIRE(valid_results > 0); }