From eeab33f222292612f372329a7f68986e5ca335cc Mon Sep 17 00:00:00 2001 From: meiravgri <109056284+meiravgri@users.noreply.github.com> Date: Wed, 25 Mar 2026 13:07:24 +0200 Subject: [PATCH] Fix incoming edges ghost memory leak (MOD-13761) (#920) * Add incoming edges ghost memory benchmarks (MOD-13761) Add three benchmarks to measure performance and memory impact of the incoming edges shrink_to_fit fix: 1. DeleteZeroVectorsAsync - async deletion path (production default) 2. DeleteZeroVectorsInPlace - in-place deletion path (worst-case latency) 3. InsertZeroVectorsTimed - insertion path (heuristic pruning cost) Stress scenario: 40K random + 50K zero vectors with COSINE metric, which forces hub nodes with large incoming edge vectors. Each benchmark measures ghost memory (wasted capacity) before and after shrink_to_fit, with detailed stats (percentiles, top-10, mean). Run with: make benchmark BM_FILTER=bm-index-internals-incoming-edges * results before * shrinking logic * fix uncoditionally shrink * add bm-index-internals-incoming-edges * use 1 thread * use ratio = 2, remove min * remove results before * rename to bm-hnsw-internals-incoming-edges better output --- .github/workflows/benchmark.yml | 1 + src/VecSim/algorithms/hnsw/graph_data.h | 15 +- .../hnsw/hnsw_tiered_tests_friends.h | 2 + tests/benchmark/benchmarks.sh | 5 + .../index_internals/bm_incoming_edges.h | 363 ++++++++++++++++++ ...bm_index_internals_incoming_edges_fp32.cpp | 34 ++ 6 files changed, 419 insertions(+), 1 deletion(-) create mode 100644 tests/benchmark/index_internals/bm_incoming_edges.h create mode 100644 tests/benchmark/run_files/bm_index_internals_incoming_edges_fp32.cpp diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index be5fe5ff0..84c66a24c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -45,6 +45,7 @@ on: - bm-svs-train-fp16 - bm-basics-svs-fp32-single - bm-spaces + - bm-hnsw-internals-incoming-edges description: 'Benchmarks set to run' default: benchmarks-all architecture: diff --git a/src/VecSim/algorithms/hnsw/graph_data.h b/src/VecSim/algorithms/hnsw/graph_data.h index 28df1167b..b38f74986 100644 --- a/src/VecSim/algorithms/hnsw/graph_data.h +++ b/src/VecSim/algorithms/hnsw/graph_data.h @@ -6,6 +6,10 @@ #include #include "VecSim/utils/vec_utils.h" +// Amortized shrink thresholds for incoming edges vectors. +// Shrink is triggered when: capacity > SHRINK_RATIO * size +constexpr size_t INCOMING_EDGES_SHRINK_RATIO = 2; + template using candidatesList = vecsim_stl::vector>; @@ -73,7 +77,16 @@ struct ElementLevelData { this->incomingUnidirectionalEdges->push_back(node_id); } bool removeIncomingUnidirectionalEdgeIfExists(idType node_id) { - return this->incomingUnidirectionalEdges->remove(node_id); + bool result = this->incomingUnidirectionalEdges->remove(node_id); + + if (result) { + auto &vec = *this->incomingUnidirectionalEdges; + if (vec.capacity() > INCOMING_EDGES_SHRINK_RATIO * vec.size()) { + vec.shrink_to_fit(); + } + } + + return result; } void swapNodeIdInIncomingEdges(idType id_before, idType id_after) { auto it = std::find(this->incomingUnidirectionalEdges->begin(), diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h b/src/VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h index 772d72724..21f99f8f5 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h @@ -74,3 +74,5 @@ friend class CommonTypeMetricTieredTests_TestDataSizeTieredHNSW_Test; INDEX_TEST_FRIEND_CLASS(BM_VecSimBasics) INDEX_TEST_FRIEND_CLASS(BM_VecSimCommon) + +friend class BM_IncomingEdgesBase; diff --git a/tests/benchmark/benchmarks.sh b/tests/benchmark/benchmarks.sh index bc8db7535..e1c2f4094 100755 --- a/tests/benchmark/benchmarks.sh +++ b/tests/benchmark/benchmarks.sh @@ -9,6 +9,7 @@ if [ -z "$BM_TYPE" ] || [ "$BM_TYPE" = "benchmarks-all" ]; then done done echo updated_index_single_fp32 + echo index_internals_incoming_edges_fp32 echo svs_training_fp32 echo svs_training_fp16 echo basics_svs_single_fp32 @@ -89,6 +90,10 @@ elif [ "$BM_TYPE" = "bm-batch-iter-uint8-multi" ] ; then elif [ "$BM_TYPE" = "bm-updated-fp32-single" ] ; then echo updated_index_single_fp32 +# hnsw internals benchmarks +elif [ "$BM_TYPE" = "bm-hnsw-internals-incoming-edges" ] ; then + echo index_internals_incoming_edges_fp32 + # SVS benchmarks elif [ "$BM_TYPE" = "bm-svs-train-fp32" ] ; then echo svs_training_fp32 diff --git a/tests/benchmark/index_internals/bm_incoming_edges.h b/tests/benchmark/index_internals/bm_incoming_edges.h new file mode 100644 index 000000000..ee0b31404 --- /dev/null +++ b/tests/benchmark/index_internals/bm_incoming_edges.h @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2006-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#pragma once + +#include + +#include "VecSim/vec_sim.h" +#include "VecSim/vec_sim_common.h" +#include "VecSim/algorithms/hnsw/hnsw.h" +#include "VecSim/algorithms/hnsw/hnsw_tiered.h" +#include "VecSim/index_factories/tiered_factory.h" +#include "utils/mock_thread_pool.h" + +#include +#include +#include +#include +#include + +// ============================================================================= +// Constants for the stress scenario (MOD-13761 reproduction) +// ============================================================================= +static constexpr size_t BM_DIM = 128; +static constexpr size_t BM_N_BASELINE = 40000; // Random baseline vectors +static constexpr size_t BM_N_ZERO = 50000; // Zero vectors (stress case) +static constexpr size_t BM_M = 16; // HNSW M parameter +static constexpr size_t BM_EF_C = 200; // HNSW efConstruction +static constexpr size_t BM_INITIAL_CAP = BM_N_BASELINE + BM_N_ZERO; +static constexpr unsigned int BM_NUM_THREADS = 1; // Single bg thread for deterministic timing + +// ============================================================================= +// Base fixture class for incoming edges benchmarks (MOD-13761) +// ============================================================================= +// Holds the tiered HNSW index, mock thread pool, and shared helpers for +// measuring ghost memory, shrinking incoming edges, and inserting vectors. +// Derived fixtures (Async, InPlace) implement the benchmark body. +class BM_IncomingEdgesBase : public benchmark::Fixture { +protected: + tieredIndexMock *mock_tp_ = nullptr; + TieredHNSWIndex *tiered_index_ = nullptr; + HNSWIndex *hnsw_ = nullptr; + VecSimWriteMode original_write_mode_; + + // --- Index lifecycle --- + + // Creates the tiered HNSW index with the stress scenario parameters. + // swapJobThreshold=0 so swap jobs accumulate and we control when they run. + // flatBufferLimit=SIZE_MAX so vectors go directly to HNSW via addVector. + void create_tiered_index() { + mock_tp_ = new tieredIndexMock(BM_NUM_THREADS); + HNSWParams hnsw_params = { + .type = VecSimType_FLOAT32, + .dim = BM_DIM, + .metric = VecSimMetric_Cosine, + .initialCapacity = BM_INITIAL_CAP, + .M = BM_M, + .efConstruction = BM_EF_C, + }; + VecSimParams vecsim_params = {.algo = VecSimAlgo_HNSWLIB, + .algoParams = {.hnswParams = HNSWParams{hnsw_params}}}; + TieredIndexParams tiered_params = { + .jobQueue = &mock_tp_->jobQ, + .jobQueueCtx = mock_tp_->ctx, + .submitCb = tieredIndexMock::submit_callback, + .flatBufferLimit = SIZE_MAX, + .primaryIndexParams = &vecsim_params, + .specificParams = {TieredHNSWParams{.swapJobThreshold = 0}}, + }; + tiered_index_ = reinterpret_cast *>( + TieredFactory::NewIndex(&tiered_params)); + mock_tp_->ctx->index_strong_ref.reset(tiered_index_); + mock_tp_->init_threads(); + hnsw_ = tiered_index_->getHNSWIndex(); + } + + // --- Vector insertion helpers --- + + // Inserts BM_N_BASELINE random vectors (labels 0..BM_N_BASELINE-1). + // Called once in SetUp. Waits for background jobs to complete. + void insert_baseline_vectors() { + std::mt19937 rng(42); // Fixed seed for reproducibility + std::uniform_real_distribution dist(-1.0f, 1.0f); + + std::vector vec(BM_DIM); + for (size_t i = 0; i < BM_N_BASELINE; i++) { + for (size_t d = 0; d < BM_DIM; d++) { + vec[d] = dist(rng); + } + VecSimIndex_AddVector(tiered_index_, vec.data(), i); + } + mock_tp_->thread_pool_wait(); + } + + // Inserts BM_N_ZERO zero vectors (labels BM_N_BASELINE..BM_N_BASELINE+BM_N_ZERO-1). + // Zero vectors with COSINE metric create dense hub nodes that stress the + // incoming edges bookkeeping — reproducing the ghost memory growth from MOD-13761. + // Called in SetUp and again during reset between iterations. + void insert_zero_vectors() { + std::vector vec(BM_DIM, 0.0f); + for (size_t i = 0; i < BM_N_ZERO; i++) { + VecSimIndex_AddVector(tiered_index_, vec.data(), BM_N_BASELINE + i); + } + mock_tp_->thread_pool_wait(); + } + + // --- Measurement helpers --- + + // Measures ghost memory across all incoming edges vectors in the HNSW graph. + // Ghost memory = sum of (capacity - size) * sizeof(idType) for every + // incomingUnidirectionalEdges vector across all nodes and all levels. + // Reports scalar stats via benchmark state counters and prints distribution + // details (top-10, percentiles) to stdout. + void measure_ghost_memory(benchmark::State &state, int iteration = -1, + bool report_counters = false) { + size_t total_used_bytes = 0; + size_t total_alloc_bytes = 0; + + // Collect per-vector stats for distribution analysis + std::vector all_sizes; + std::vector all_caps; + + size_t num_elements = hnsw_->indexSize(); + size_t non_empty_count = 0; + + for (size_t id = 0; id < num_elements; id++) { + auto *graph_data = hnsw_->getGraphDataByInternalId(id); + for (size_t level = 0; level <= graph_data->toplevel; level++) { + auto &level_data = hnsw_->getElementLevelData(graph_data, level); + auto &incoming = level_data.getIncomingEdges(); + size_t sz = incoming.size(); + size_t cap = incoming.capacity(); + + total_used_bytes += sz * sizeof(idType); + total_alloc_bytes += cap * sizeof(idType); + + all_sizes.push_back(sz); + all_caps.push_back(cap); + if (sz > 0) + non_empty_count++; + } + } + + size_t wasted_bytes = total_alloc_bytes - total_used_bytes; + size_t total_vectors = all_sizes.size(); + + // Sort for top-10 + std::vector sorted_sizes(all_sizes); + std::vector sorted_caps(all_caps); + std::sort(sorted_sizes.begin(), sorted_sizes.end()); + std::sort(sorted_caps.begin(), sorted_caps.end()); + + // --- Report metrics via benchmark counters (only for the "before shrink" call) --- + if (report_counters) { + state.counters["index_memory"] = hnsw_->getAllocationSize(); + state.counters["wasted_bytes"] = static_cast(wasted_bytes); + } + + // --- Print diagnostic summary to stdout --- + std::cout << "\n=== Incoming Edges Stats" + << (iteration >= 0 ? " (iter=" + std::to_string(iteration) + ")" : "") + << " ===" << std::endl; + std::cout << " Nodes: " << num_elements << " Level entries: " << total_vectors + << " Non-empty: " << non_empty_count << std::endl; + std::cout << " Wasted bytes: " << wasted_bytes << " (used=" << total_used_bytes + << ", alloc=" << total_alloc_bytes << ")" << std::endl; + + // Print top-10 by size (descending) + std::cout << " Top-10 by size: ["; + size_t top_n = std::min(10, sorted_sizes.size()); + for (size_t i = 0; i < top_n; i++) { + if (i > 0) + std::cout << ", "; + std::cout << sorted_sizes[sorted_sizes.size() - 1 - i]; + } + std::cout << "]" << std::endl; + + // Print top-10 by capacity (descending) + std::cout << " Top-10 by cap: ["; + top_n = std::min(10, sorted_caps.size()); + for (size_t i = 0; i < top_n; i++) { + if (i > 0) + std::cout << ", "; + std::cout << sorted_caps[sorted_caps.size() - 1 - i]; + } + std::cout << "]" << std::endl; + } + + // Shrinks all incoming edges vectors to reclaim ghost memory. + // Used to reset state between benchmark iterations so each iteration + // starts from a clean baseline. + void shrink_all_incoming_edges() { + size_t num_elements = hnsw_->indexSize(); + for (size_t id = 0; id < num_elements; id++) { + auto *graph_data = hnsw_->getGraphDataByInternalId(id); + for (size_t level = 0; level <= graph_data->toplevel; level++) { + auto &level_data = hnsw_->getElementLevelData(graph_data, level); + level_data.incomingUnidirectionalEdges->shrink_to_fit(); + } + } + } + +public: + // Common SetUp: save write mode, suppress logs, create index, insert baseline vectors. + // Each benchmark method is responsible for inserting/deleting zero vectors as needed. + void SetUp(benchmark::State &state) override { + original_write_mode_ = VecSimIndexInterface::asyncWriteMode; + VecSim_SetLogCallbackFunction(nullptr); // Suppress verbose resize/capacity logs + VecSim_SetWriteMode(VecSim_WriteAsync); + create_tiered_index(); + insert_baseline_vectors(); + } + + // Common TearDown: restore write mode, clean up mock thread pool. + // The mock thread pool destructor handles joining threads and + // releasing the index via index_strong_ref. + void TearDown(benchmark::State &state) override { + VecSim_SetWriteMode(original_write_mode_); + delete mock_tp_; + mock_tp_ = nullptr; + tiered_index_ = nullptr; + hnsw_ = nullptr; + } + + // --- Benchmark methods (called from run files via BENCHMARK_DEFINE_F) --- + + // Async deletion path (production default). + // In production, deleteVector() on a TieredHNSWIndex does: + // 1. Main thread: markDelete() + create repair/swap jobs + // 2. Background threads: executeRepairJob() → repairNodeConnections() + // 3. executeReadySwapJobs() → removeAndSwap() + // This benchmark measures the full async deletion path including ghost memory. + void DeleteZeroVectorsAsync(benchmark::State &state) { + // Insert zero vectors before the first iteration + insert_zero_vectors(); + + int iteration = 0; + for (auto _ : state) { + // TIMED: delete all 50K zero vectors through the tiered async path + for (size_t i = 0; i < BM_N_ZERO; i++) { + VecSimIndex_DeleteVector(tiered_index_, BM_N_BASELINE + i); + } + // Wait for all background repair jobs to complete + mock_tp_->thread_pool_wait(); + // Execute all accumulated swap jobs (removes marked-deleted nodes) + tiered_index_->executeReadySwapJobs(); + + state.PauseTiming(); + + // Measure ghost memory after deletion, before shrink (the "problem" state) + std::cout << "\n--- Async iteration " << iteration + << ": After deletion (before shrink) ---"; + measure_ghost_memory(state, iteration, true); + + // Shrink all incoming edges to reclaim ghost memory + shrink_all_incoming_edges(); + + // Measure ghost memory after shrink (should be near-zero) + std::cout << "\n--- Async iteration " << iteration << ": After shrink (baseline) ---"; + measure_ghost_memory(state, iteration); + + // Re-insert zero vectors for the next iteration + insert_zero_vectors(); + + iteration++; + state.ResumeTiming(); + } + } + + // Insertion path benchmark. + // Measures the cost of inserting 50K zero vectors into the index. + // During insertion, the HNSW heuristic prunes neighbors, which calls + // removeIncomingUnidirectionalEdgeIfExists(). After the fix, shrink_to_fit + // fires here too, so this benchmark captures the insertion latency impact. + void InsertZeroVectorsTimed(benchmark::State &state) { + int iteration = 0; + for (auto _ : state) { + // TIMED: insert 50K zero vectors (triggers heuristic pruning) + insert_zero_vectors(); + + state.PauseTiming(); + + // Measure state after insertion (90K nodes) + std::cout << "\n--- Insert iteration " << iteration << ": After insertion ---"; + measure_ghost_memory(state, iteration); + + // Delete zero vectors + for (size_t i = 0; i < BM_N_ZERO; i++) { + VecSimIndex_DeleteVector(tiered_index_, BM_N_BASELINE + i); + } + mock_tp_->thread_pool_wait(); + tiered_index_->executeReadySwapJobs(); + + // Measure ghost memory after deletion, before shrink + std::cout << "\n--- Insert iteration " << iteration + << ": After deletion (before shrink) ---"; + measure_ghost_memory(state, iteration, true); + + // Shrink to reclaim ghost memory + shrink_all_incoming_edges(); + + // Measure after shrink (should be near-zero) + std::cout << "\n--- Insert iteration " << iteration << ": After shrink (baseline) ---"; + measure_ghost_memory(state, iteration); + + iteration++; + state.ResumeTiming(); + } + } + + // In-place deletion path (synchronous, worst-case latency). + // Used during RDB loading, AOF rewrite, and certain overwrite scenarios. + // Unlike async, all repair work happens on the calling thread — no bg threads. + // This gives the worst-case latency impact of the shrink_to_fit fix. + void DeleteZeroVectorsInPlace(benchmark::State &state) { + // Insert zero vectors using async mode before switching to in-place + insert_zero_vectors(); + + // Switch to in-place mode for the timed deletion phase + VecSim_SetWriteMode(VecSim_WriteInPlace); + + int iteration = 0; + for (auto _ : state) { + // TIMED: delete all 50K zero vectors through the in-place path + // All repair + swap happens synchronously on this thread + for (size_t i = 0; i < BM_N_ZERO; i++) { + VecSimIndex_DeleteVector(tiered_index_, BM_N_BASELINE + i); + } + // No thread_pool_wait() or executeReadySwapJobs() needed — fully synchronous + + state.PauseTiming(); + + // Measure ghost memory after deletion, before shrink (the "problem" state) + std::cout << "\n--- InPlace iteration " << iteration + << ": After deletion (before shrink) ---"; + measure_ghost_memory(state, iteration, true); + + // Shrink all incoming edges to reclaim ghost memory + shrink_all_incoming_edges(); + + // Measure ghost memory after shrink (should be near-zero) + std::cout << "\n--- InPlace iteration " << iteration << ": After shrink (baseline) ---"; + measure_ghost_memory(state, iteration); + + // Re-insert zero vectors using async mode for next iteration + VecSim_SetWriteMode(VecSim_WriteAsync); + insert_zero_vectors(); + VecSim_SetWriteMode(VecSim_WriteInPlace); + + iteration++; + state.ResumeTiming(); + } + + // Restore async mode for clean teardown + VecSim_SetWriteMode(VecSim_WriteAsync); + } +}; diff --git a/tests/benchmark/run_files/bm_index_internals_incoming_edges_fp32.cpp b/tests/benchmark/run_files/bm_index_internals_incoming_edges_fp32.cpp new file mode 100644 index 000000000..0e3c058a7 --- /dev/null +++ b/tests/benchmark/run_files/bm_index_internals_incoming_edges_fp32.cpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2006-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +// Run file for incoming edges ghost memory benchmarks (fp32). +// This file will instantiate and register the benchmark classes defined +// in index_internals/bm_incoming_edges.h once they are implemented. + +#include "benchmark/index_internals/bm_incoming_edges.h" + +BENCHMARK_DEFINE_F(BM_IncomingEdgesBase, DeleteZeroVectorsAsync) +(benchmark::State &st) { DeleteZeroVectorsAsync(st); } +BENCHMARK_REGISTER_F(BM_IncomingEdgesBase, DeleteZeroVectorsAsync) + ->Iterations(3) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_DEFINE_F(BM_IncomingEdgesBase, InsertZeroVectorsTimed) +(benchmark::State &st) { InsertZeroVectorsTimed(st); } +BENCHMARK_REGISTER_F(BM_IncomingEdgesBase, InsertZeroVectorsTimed) + ->Iterations(3) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_DEFINE_F(BM_IncomingEdgesBase, DeleteZeroVectorsInPlace) +(benchmark::State &st) { DeleteZeroVectorsInPlace(st); } +BENCHMARK_REGISTER_F(BM_IncomingEdgesBase, DeleteZeroVectorsInPlace) + ->Iterations(3) + ->Unit(benchmark::kMillisecond); + +BENCHMARK_MAIN();