From c78a85165516b4b0db3699716b8f0bc1ef331bfe Mon Sep 17 00:00:00 2001 From: meiravgri Date: Tue, 31 Mar 2026 16:29:00 +0000 Subject: [PATCH] **Step 1: Rename SVS thread pool internal methods to match shared pool terminology** MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `setNumThreads`/`getNumThreads` → `setParallelism`/`getParallelism` and `getThreadPoolCapacity` → `getPoolSize` across VectorSimilarity. Public info API fields (`numThreads`, `lastReservedThreads`, `NUM_THREADS`, `LAST_RESERVED_NUM_THREADS`) remain unchanged. No behavioral changes. --- src/VecSim/algorithms/svs/svs.h | 36 +++++++++++++------------- src/VecSim/algorithms/svs/svs_tiered.h | 18 ++++++------- tests/benchmark/bm_utils.h | 4 +-- tests/benchmark/bm_vecsim_svs.h | 2 +- tests/unit/test_svs_fp16.cpp | 6 ++--- tests/unit/test_svs_tiered.cpp | 12 ++++----- 6 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 8b03514f0..9d5ee1c3c 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -41,9 +41,9 @@ struct SVSIndexBase virtual int deleteVectors(const labelType *labels, size_t n) = 0; virtual bool isLabelExists(labelType label) const = 0; virtual size_t indexStorageSize() const = 0; - virtual size_t getNumThreads() const = 0; - virtual void setNumThreads(size_t numThreads) = 0; - virtual size_t getThreadPoolCapacity() const = 0; + virtual size_t getParallelism() const = 0; + virtual void setParallelism(size_t parallelism) = 0; + virtual size_t getPoolSize() const = 0; virtual bool isCompressed() const = 0; size_t getNumMarkedDeleted() const { return num_marked_deleted; } @@ -66,9 +66,9 @@ struct SVSIndexBase }; /** Thread Management Strategy: - * - addVector(): Requires numThreads == 1 - * - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1 - * - Callers are responsible for setting appropriate thread counts + * - addVector(): Requires parallelism == 1 + * - addVectors(): Allows any parallelism value, but prohibits n=1 with parallelism>1 + * - Callers are responsible for setting appropriate parallelism **/ template @@ -251,12 +251,12 @@ class SVSIndex : public VecSimIndexAbstract, fl this->impl_ = std::move(svs_handler->impl); } - // Assuming numThreads was updated to reflect the number of available threads before this + // Assuming parallelism was updated to reflect the number of available threads before this // function was called. - // This function assumes that the caller has already set numThreads to the appropriate value + // This function assumes that the caller has already set parallelism to the appropriate value // for the operation. - // Important NOTE: For single vector operations (n=1), numThreads should be 1. - // For bulk operations (n>1), numThreads should reflect the number of available threads. + // Important NOTE: For single vector operations (n=1), parallelism should be 1. + // For bulk operations (n>1), parallelism should reflect the number of available threads. int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) { if (n == 0) { return 0; @@ -412,8 +412,8 @@ class SVSIndex : public VecSimIndexAbstract, fl .maxCandidatePoolSize = this->buildParams.max_candidate_pool_size, .pruneTo = this->buildParams.prune_to, .useSearchHistory = this->buildParams.use_full_search_history, - .numThreads = this->getThreadPoolCapacity(), - .lastReservedThreads = this->getNumThreads(), + .numThreads = this->getPoolSize(), + .lastReservedThreads = this->getParallelism(), .numberOfMarkedDeletedNodes = this->num_marked_deleted, .searchWindowSize = this->search_window_size, .searchBufferCapacity = this->search_buffer_capacity, @@ -517,16 +517,16 @@ class SVSIndex : public VecSimIndexAbstract, fl int addVector(const void *vector_data, labelType label) override { // Enforce single-threaded execution for single vector operations to ensure optimal - // performance and consistent behavior. Callers must set numThreads=1 before calling this + // performance and consistent behavior. Callers must set parallelism=1 before calling this // method. - assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector"); + assert(getParallelism() == 1 && "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vector_data, &label, 1); } int addVectors(const void *vectors_data, const labelType *labels, size_t n) override { // Prevent misuse: single vector operations should use addVector(), not addVectors() with // n=1 This ensures proper thread management and API contract enforcement. - assert(!(n == 1 && getNumThreads() > 1) && + assert(!(n == 1 && getParallelism() > 1) && "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vectors_data, labels, n); } @@ -541,10 +541,10 @@ class SVSIndex : public VecSimIndexAbstract, fl return impl_ ? impl_->has_id(label) : false; } - size_t getNumThreads() const override { return threadpool_.size(); } - void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } + size_t getParallelism() const override { return threadpool_.size(); } + void setParallelism(size_t parallelism) override { threadpool_.resize(parallelism); } - size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); } + size_t getPoolSize() const override { return threadpool_.capacity(); } bool isCompressed() const override { return storage_traits_t::is_compressed(); } diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 2eac66d24..8ca67bf5a 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -587,7 +587,7 @@ class TieredSVSIndex : public VecSimTieredIndex { // No need to run GC on an empty index. return; } - svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize())); + svs_index->setParallelism(std::min(availableThreads, index->backendIndex->indexSize())); // VecSimIndexAbstract::runGC() is protected static_cast(index->backendIndex)->runGC(); } @@ -601,7 +601,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } - auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity(); + auto total_threads = this->GetSVSIndex()->getPoolSize(); auto jobs = SVSMultiThreadJob::createJobs( this->allocator, SVS_BATCH_UPDATE_JOB, updateSVSIndexWrapper, this, total_threads, std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs); @@ -614,7 +614,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } - auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity(); + auto total_threads = this->GetSVSIndex()->getPoolSize(); auto jobs = SVSMultiThreadJob::createJobs( this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads, std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs); @@ -683,7 +683,7 @@ class TieredSVSIndex : public VecSimTieredIndex { assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); if (this->backendIndex->indexSize() == 0) { // If backend index is empty, we need to initialize it first. - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->setParallelism(std::min(availableThreads, labels_to_move.size())); auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), labels_to_move.size()); @@ -696,7 +696,7 @@ class TieredSVSIndex : public VecSimTieredIndex { main_shared_lock.unlock(); std::lock_guard lock(this->mainIndexGuard); // Upgrade to unique lock to add vectors - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->setParallelism(std::min(availableThreads, labels_to_move.size())); svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), labels_to_move.size()); } @@ -821,9 +821,9 @@ class TieredSVSIndex : public VecSimTieredIndex { std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard); // Set available thread count to 1 for single vector write-in-place operation. // This maintains the contract that single vector operations use exactly one thread. - // TODO: Replace this setNumThreads(1) call with an assertion once we establish - // a contract that write-in-place mode guarantees numThreads == 1. - svs_index->setNumThreads(1); + // TODO: Replace this setParallelism(1) call with an assertion once we establish + // a contract that write-in-place mode guarantees parallelism == 1. + svs_index->setParallelism(1); return this->backendIndex->addVector(storage_blob.get(), label); } } @@ -1077,7 +1077,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } // Force single thread for write-in-place mode. - this->GetSVSIndex()->setNumThreads(1); + this->GetSVSIndex()->setParallelism(1); // VecSimIndexAbstract::runGC() is protected static_cast(this->backendIndex)->runGC(); return; diff --git a/tests/benchmark/bm_utils.h b/tests/benchmark/bm_utils.h index 88f3e2e5d..e952794ad 100644 --- a/tests/benchmark/bm_utils.h +++ b/tests/benchmark/bm_utils.h @@ -30,9 +30,9 @@ CreateTieredSVSParams(VecSimParams &svs_params, tieredIndexMock &mock_thread_poo template static void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity, std::string msg = "") { - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity) + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity) << msg << ": thread pool capacity mismatch"; - size_t num_reserved_threads = tiered_index->GetSVSIndex()->getNumThreads(); + size_t num_reserved_threads = tiered_index->GetSVSIndex()->getParallelism(); if (num_reserved_threads < expected_num_threads) { std::cout << msg << ": WARNING: last reserved threads (" << num_reserved_threads << ") is less than expected (" << expected_num_threads << ")." << std::endl; diff --git a/tests/benchmark/bm_vecsim_svs.h b/tests/benchmark/bm_vecsim_svs.h index fe6c92493..c4b167421 100644 --- a/tests/benchmark/bm_vecsim_svs.h +++ b/tests/benchmark/bm_vecsim_svs.h @@ -115,7 +115,7 @@ class BM_VecSimSVS : public BM_VecSimGeneral { tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t num_threads = params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; - tiered_index->GetSVSIndex()->setNumThreads(num_threads); + tiered_index->GetSVSIndex()->setParallelism(num_threads); test_utils::verifyNumThreads(tiered_index, num_threads, num_threads, std::string("CreateTieredSVSIndex")); diff --git a/tests/unit/test_svs_fp16.cpp b/tests/unit/test_svs_fp16.cpp index 59c11c0a1..82937ef32 100644 --- a/tests/unit/test_svs_fp16.cpp +++ b/tests/unit/test_svs_fp16.cpp @@ -2242,8 +2242,8 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { } void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity) { - ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity); } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, @@ -2258,7 +2258,7 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { // Set number of available threads to 1 unless specified otherwise, // so we can insert one vector at a time directly to svs. - tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + tiered_index->GetSVSIndex()->setParallelism(num_available_threads); size_t params_threadpool_size = tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t expected_capacity = diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 3d9c0bd0d..528df8794 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -92,8 +92,8 @@ class SVSTieredIndexTest : public ::testing::Test { void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity) { - ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity); } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, tieredIndexMock &mock_thread_pool, @@ -104,11 +104,11 @@ class SVSTieredIndexTest : public ::testing::Test { // Set the created tiered index in the index external context (it will take ownership over // the index, and we'll need to release the ctx at the end of the test. mock_thread_pool.ctx->index_strong_ref.reset(tiered_index); - // Set numThreads to 1 by default to allow direct calls to SVS addVector() API, + // Set parallelism to 1 by default to allow direct calls to SVS addVector() API, // which requires exactly 1 thread. When using tiered index addVector API, - // the thread count is managed internally according to the operation and threadpool - // capacity, so testing parallelism remains intact. - tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + // the parallelism is managed internally according to the operation and pool + // size, so testing parallelism remains intact. + tiered_index->GetSVSIndex()->setParallelism(num_available_threads); size_t params_threadpool_size = tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t expected_capacity =