Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/svs/core/translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class IDTranslator {
"external_to_internal_translation";
static constexpr lib::Version save_version = lib::Version(0, 0, 0);

lib::SaveTable save_table() const {
lib::SaveTable metadata() const {
return lib::SaveTable(
serialization_schema,
save_version,
Expand All @@ -348,7 +348,7 @@ class IDTranslator {
// Save the translations to a file.
auto os = lib::open_write(filename);
save(os);
auto table = save_table();
auto table = metadata();
table.insert("filename", lib::save(filename.filename()));
return table;
}
Expand Down
2 changes: 1 addition & 1 deletion include/svs/index/flat/dynamic_flat.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ template <typename Data, typename Dist> class DynamicFlatIndex {
save_version,
{
{"name", name()},
{"translation", lib::detail::exit_hook(translator_.save_table())},
{"translation", lib::detail::exit_hook(translator_.metadata())},
}
);
lib::save_to_stream(save_table, os);
Expand Down
78 changes: 76 additions & 2 deletions include/svs/index/ivf/dynamic_ivf.h
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,32 @@ class DynamicIVFIndex {
lib::save_to_disk(clusters_, clusters_dir);
}

void save(std::ostream& SVS_UNUSED(os)) const {
throw ANNEXCEPTION("Placeholder; not implemented!");
void save(std::ostream& os) {
// Compact before saving to remove empty slots
compact();

// Get data type configuration for automatic loader construction during load
auto data_type_config = DataTypeTraits<Data>::get_config();
data_type_config.centroid_type = datatype_v<typename centroids_type::element_type>;

lib::begin_serialization(os);

auto save_table = lib::SaveTable(
"dynamic_ivf_config",
save_version,
{{"name", lib::save(name())},
{"translation", lib::detail::exit_hook(translator_.metadata())},
{"num_clusters", lib::save(clusters_.size())}}
);
save_table.insert("data_type_config", lib::save(data_type_config));
lib::save_to_stream(save_table, os);
translator_.save(os);

// Save centroids
lib::save_to_stream(centroids_, os);

// Save clusters
lib::save_to_stream(clusters_, os);
}

private:
Expand Down Expand Up @@ -1171,4 +1195,54 @@ auto load_dynamic_ivf_index(
return index;
}

template <
typename CentroidType,
typename DataType,
typename Distance,
typename ThreadpoolProto>
auto load_dynamic_ivf_index(
std::istream& is,
Distance distance,
ThreadpoolProto threadpool_proto,
const size_t intra_query_thread_count = 1,
svs::logging::logger_ptr logger = svs::logging::get()
) {
using centroids_type = data::SimpleData<CentroidType>;
using I = uint32_t;
using blocked_data_type = typename DataType::lib_blocked_alloc_data_type;
using cluster_type = DenseClusteredDataset<centroids_type, I, blocked_data_type>;

// Read config table and translator
auto table = lib::detail::read_metadata(is);

auto translation =
table.template cast<toml::table>().at("translation").template cast<toml::table>();
auto translator = IDTranslator::load(translation, is);

// Load centroids
auto centroids = lib::load_from_stream<centroids_type>(is);

// Load clusters with small block size allocator for IVF
auto blocking_params = data::BlockingParameters{
.blocksize_bytes = lib::PowerOfTwo(20) // 2^20 = 1MB
};
using allocator_type = typename blocked_data_type::allocator_type;
auto blocked_allocator =
allocator_type(blocking_params, typename allocator_type::allocator_type());

auto dense_clusters = lib::load_from_stream<cluster_type>(is, blocked_allocator);

auto threadpool = threads::as_threadpool(std::move(threadpool_proto));

return DynamicIVFIndex<centroids_type, cluster_type, Distance, decltype(threadpool)>(
std::move(centroids),
std::move(dense_clusters),
std::move(translator),
std::move(distance),
std::move(threadpool),
intra_query_thread_count,
logger
);
}

} // namespace svs::index::ivf
2 changes: 1 addition & 1 deletion include/svs/index/vamana/dynamic_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ class MutableVamanaIndex {
{
{"name", lib::save(name())},
{"parameters", lib::save(parameters())},
{"translation", lib::detail::exit_hook(translator_.save_table())},
{"translation", lib::detail::exit_hook(translator_.metadata())},
}
);
lib::save_to_stream(save_table, os);
Expand Down
80 changes: 54 additions & 26 deletions include/svs/orchestrators/dynamic_ivf.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,11 @@ class DynamicIVFImpl : public IVFImpl<QueryTypes, Impl, DynamicIVFInterface> {
}

void save(std::ostream& stream) override {
lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_save"};
const auto config_dir = tempdir.get() / "config";
const auto data_dir = tempdir.get() / "data";
std::filesystem::create_directories(config_dir);
std::filesystem::create_directories(data_dir);
save(config_dir, data_dir);
lib::DirectoryArchiver::pack(tempdir, stream);
if constexpr (Impl::supports_saving) {
impl().save(stream);
} else {
throw ANNEXCEPTION("The current DynamicIVF backend doesn't support saving!");
}
}
};

Expand Down Expand Up @@ -431,28 +429,58 @@ class DynamicIVF : public manager::IndexManager<DynamicIVFInterface> {
ThreadPoolProto threadpool_proto,
size_t intra_query_threads = 1
) {
namespace fs = std::filesystem;
lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_load"};
lib::DirectoryArchiver::unpack(stream, tempdir);
auto deserializer = svs::lib::detail::Deserializer::build(stream);
if (deserializer.is_native()) {
if constexpr (std::is_same_v<std::decay_t<Distance>, DistanceType>) {
auto dispatcher = DistanceDispatcher(distance);
return dispatcher([&](auto distance_function) {
return DynamicIVF(
AssembleTag(),
manager::as_typelist<QueryTypes>{},
index::ivf::load_dynamic_ivf_index<CentroidType, DataType>(
stream,
std::move(distance_function),
std::move(threadpool_proto),
intra_query_threads
)
);
});
} else {
return DynamicIVF(
AssembleTag(),
manager::as_typelist<QueryTypes>{},
index::ivf::load_dynamic_ivf_index<CentroidType, DataType>(
stream, distance, std::move(threadpool_proto), intra_query_threads
)
);
}
} else {
namespace fs = std::filesystem;
lib::UniqueTempDirectory tempdir{"svs_dynamic_ivf_load"};
lib::DirectoryArchiver::unpack(stream, tempdir, deserializer.magic());

const auto config_path = tempdir.get() / "config";
if (!fs::is_directory(config_path)) {
throw ANNEXCEPTION(
"Invalid DynamicIVF index archive: missing config directory!"
);
}

const auto config_path = tempdir.get() / "config";
if (!fs::is_directory(config_path)) {
throw ANNEXCEPTION("Invalid DynamicIVF index archive: missing config directory!"
);
}
const auto data_path = tempdir.get() / "data";
if (!fs::is_directory(data_path)) {
throw ANNEXCEPTION(
"Invalid DynamicIVF index archive: missing data directory!"
);
}

const auto data_path = tempdir.get() / "data";
if (!fs::is_directory(data_path)) {
throw ANNEXCEPTION("Invalid DynamicIVF index archive: missing data directory!");
return assemble<QueryTypes, CentroidType, DataType>(
config_path,
data_path,
distance,
std::move(threadpool_proto),
intra_query_threads
);
}

return assemble<QueryTypes, CentroidType, DataType>(
config_path,
data_path,
distance,
std::move(threadpool_proto),
intra_query_threads
);
}
};

Expand Down
Loading
Loading