Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ struct SVSIndexBase
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual bool isLabelExists(labelType label) const = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual size_t getParallelism() const = 0;
virtual void setParallelism(size_t parallelism) = 0;
virtual size_t getPoolSize() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }

Expand All @@ -66,9 +66,9 @@ struct SVSIndexBase
};

/** Thread Management Strategy:
* - addVector(): Requires numThreads == 1
* - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1
* - Callers are responsible for setting appropriate thread counts
* - addVector(): Requires parallelism == 1
* - addVectors(): Allows any parallelism value, but prohibits n=1 with parallelism>1
* - Callers are responsible for setting appropriate parallelism
**/
template <typename MetricType, typename DataType, bool isMulti, size_t QuantBits,
size_t ResidualBits, bool IsLeanVec>
Expand Down Expand Up @@ -251,12 +251,12 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
this->impl_ = std::move(svs_handler->impl);
}

// Assuming numThreads was updated to reflect the number of available threads before this
// Assuming parallelism was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
// This function assumes that the caller has already set parallelism to the appropriate value
// for the operation.
// Important NOTE: For single vector operations (n=1), numThreads should be 1.
// For bulk operations (n>1), numThreads should reflect the number of available threads.
// Important NOTE: For single vector operations (n=1), parallelism should be 1.
// For bulk operations (n>1), parallelism should reflect the number of available threads.
int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) {
if (n == 0) {
return 0;
Expand Down Expand Up @@ -412,8 +412,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
.maxCandidatePoolSize = this->buildParams.max_candidate_pool_size,
.pruneTo = this->buildParams.prune_to,
.useSearchHistory = this->buildParams.use_full_search_history,
.numThreads = this->getThreadPoolCapacity(),
.lastReservedThreads = this->getNumThreads(),
.numThreads = this->getPoolSize(),
.lastReservedThreads = this->getParallelism(),
.numberOfMarkedDeletedNodes = this->num_marked_deleted,
.searchWindowSize = this->search_window_size,
.searchBufferCapacity = this->search_buffer_capacity,
Expand Down Expand Up @@ -517,16 +517,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

int addVector(const void *vector_data, labelType label) override {
// Enforce single-threaded execution for single vector operations to ensure optimal
// performance and consistent behavior. Callers must set numThreads=1 before calling this
// performance and consistent behavior. Callers must set parallelism=1 before calling this
// method.
assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector");
assert(getParallelism() == 1 && "Can't use more than one thread to insert a single vector");
return addVectorsImpl(vector_data, &label, 1);
}

int addVectors(const void *vectors_data, const labelType *labels, size_t n) override {
// Prevent misuse: single vector operations should use addVector(), not addVectors() with
// n=1 This ensures proper thread management and API contract enforcement.
assert(!(n == 1 && getNumThreads() > 1) &&
assert(!(n == 1 && getParallelism() > 1) &&
"Can't use more than one thread to insert a single vector");
return addVectorsImpl(vectors_data, labels, n);
}
Expand All @@ -541,10 +541,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return impl_ ? impl_->has_id(label) : false;
}

size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }
size_t getParallelism() const override { return threadpool_.size(); }
void setParallelism(size_t parallelism) override { threadpool_.resize(parallelism); }

size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); }
size_t getPoolSize() const override { return threadpool_.capacity(); }

bool isCompressed() const override { return storage_traits_t::is_compressed(); }

Expand Down
18 changes: 9 additions & 9 deletions src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
// No need to run GC on an empty index.
return;
}
svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize()));
svs_index->setParallelism(std::min(availableThreads, index->backendIndex->indexSize()));
// VecSimIndexAbstract::runGC() is protected
static_cast<VecSimIndexInterface *>(index->backendIndex)->runGC();
}
Expand All @@ -601,7 +601,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}

auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
auto total_threads = this->GetSVSIndex()->getPoolSize();
auto jobs = SVSMultiThreadJob::createJobs(
this->allocator, SVS_BATCH_UPDATE_JOB, updateSVSIndexWrapper, this, total_threads,
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
Expand All @@ -614,7 +614,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}

auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
auto total_threads = this->GetSVSIndex()->getPoolSize();
auto jobs = SVSMultiThreadJob::createJobs(
this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads,
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
Expand Down Expand Up @@ -683,7 +683,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
if (this->backendIndex->indexSize() == 0) {
// If backend index is empty, we need to initialize it first.
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->setParallelism(std::min(availableThreads, labels_to_move.size()));
auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());

Expand All @@ -696,7 +696,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
// Upgrade to unique lock to add vectors
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->setParallelism(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
}
Expand Down Expand Up @@ -821,9 +821,9 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard);
// Set available thread count to 1 for single vector write-in-place operation.
// This maintains the contract that single vector operations use exactly one thread.
// TODO: Replace this setNumThreads(1) call with an assertion once we establish
// a contract that write-in-place mode guarantees numThreads == 1.
svs_index->setNumThreads(1);
// TODO: Replace this setParallelism(1) call with an assertion once we establish
// a contract that write-in-place mode guarantees parallelism == 1.
svs_index->setParallelism(1);
return this->backendIndex->addVector(storage_blob.get(), label);
}
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}
// Force single thread for write-in-place mode.
this->GetSVSIndex()->setNumThreads(1);
this->GetSVSIndex()->setParallelism(1);
// VecSimIndexAbstract::runGC() is protected
static_cast<VecSimIndexInterface *>(this->backendIndex)->runGC();
return;
Expand Down
4 changes: 2 additions & 2 deletions tests/benchmark/bm_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ CreateTieredSVSParams(VecSimParams &svs_params, tieredIndexMock &mock_thread_poo
template <typename data_t>
static void verifyNumThreads(TieredSVSIndex<data_t> *tiered_index, size_t expected_num_threads,
size_t expected_capcity, std::string msg = "") {
ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity)
ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity)
<< msg << ": thread pool capacity mismatch";
size_t num_reserved_threads = tiered_index->GetSVSIndex()->getNumThreads();
size_t num_reserved_threads = tiered_index->GetSVSIndex()->getParallelism();
if (num_reserved_threads < expected_num_threads) {
std::cout << msg << ": WARNING: last reserved threads (" << num_reserved_threads
<< ") is less than expected (" << expected_num_threads << ")." << std::endl;
Expand Down
2 changes: 1 addition & 1 deletion tests/benchmark/bm_vecsim_svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class BM_VecSimSVS : public BM_VecSimGeneral {
tiered_params.primaryIndexParams->algoParams.svsParams.num_threads;
size_t num_threads =
params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size;
tiered_index->GetSVSIndex()->setNumThreads(num_threads);
tiered_index->GetSVSIndex()->setParallelism(num_threads);
test_utils::verifyNumThreads(tiered_index, num_threads, num_threads,
std::string("CreateTieredSVSIndex"));

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_svs_fp16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2242,8 +2242,8 @@ class FP16SVSTieredIndexTest : public FP16SVSTest<index_type_t> {
}
void verifyNumThreads(TieredSVSIndex<data_t> *tiered_index, size_t expected_num_threads,
size_t expected_capcity) {
ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads);
ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity);
ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads);
ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity);
}

TieredSVSIndex<data_t> *CreateTieredSVSIndex(const TieredIndexParams &tiered_params,
Expand All @@ -2258,7 +2258,7 @@ class FP16SVSTieredIndexTest : public FP16SVSTest<index_type_t> {

// Set number of available threads to 1 unless specified otherwise,
// so we can insert one vector at a time directly to svs.
tiered_index->GetSVSIndex()->setNumThreads(num_available_threads);
tiered_index->GetSVSIndex()->setParallelism(num_available_threads);
size_t params_threadpool_size =
tiered_params.primaryIndexParams->algoParams.svsParams.num_threads;
size_t expected_capacity =
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/test_svs_tiered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class SVSTieredIndexTest : public ::testing::Test {

void verifyNumThreads(TieredSVSIndex<data_t> *tiered_index, size_t expected_num_threads,
size_t expected_capcity) {
ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads);
ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity);
ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads);
ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity);
}
TieredSVSIndex<data_t> *CreateTieredSVSIndex(const TieredIndexParams &tiered_params,
tieredIndexMock &mock_thread_pool,
Expand All @@ -104,11 +104,11 @@ class SVSTieredIndexTest : public ::testing::Test {
// Set the created tiered index in the index external context (it will take ownership over
// the index, and we'll need to release the ctx at the end of the test.
mock_thread_pool.ctx->index_strong_ref.reset(tiered_index);
// Set numThreads to 1 by default to allow direct calls to SVS addVector() API,
// Set parallelism to 1 by default to allow direct calls to SVS addVector() API,
// which requires exactly 1 thread. When using tiered index addVector API,
// the thread count is managed internally according to the operation and threadpool
// capacity, so testing parallelism remains intact.
tiered_index->GetSVSIndex()->setNumThreads(num_available_threads);
// the parallelism is managed internally according to the operation and pool
// size, so testing parallelism remains intact.
tiered_index->GetSVSIndex()->setParallelism(num_available_threads);
size_t params_threadpool_size =
tiered_params.primaryIndexParams->algoParams.svsParams.num_threads;
size_t expected_capacity =
Expand Down
Loading