Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.github.jbellis.jvector.bench;

import io.github.jbellis.jvector.util.DenseIntMap;
import io.github.jbellis.jvector.util.IntMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;

/**
* Measures the throughput of concurrent {@link DenseIntMap} operations — the map that sits on
* the hot path of {@code ConcurrentNeighborMap} and was identified as the top lock-contention
* hotspot in a herddb indexing profile.
* <p>
* Parameters:
* <ul>
* <li>{@code impl} — {@code legacy} (previous RW-lock + single {@code AtomicReferenceArray})
* vs {@code segmented} (current lock-free spine-of-segments). Both implementations run
* in the same JVM under identical conditions so results are directly comparable.</li>
* <li>{@code initialCapacity} — {@code 1024} (default) vs. {@code totalKeys} (pre-sized hint).
* The hinted case isolates the "no resize required" scenario that best-case deployments
* (known shard size) can hit.</li>
* <li>{@code totalKeys} — size of the working set.</li>
* </ul>
* Thread counts are expressed via {@code @Threads} on each benchmark method: 1 and 8.
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1, jvmArgsAppend = {"--enable-preview"})
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 5, time = 3)
@State(Scope.Benchmark)
public class DenseIntMapConcurrentBenchmark {

public enum Impl {
legacy(LegacyDenseIntMap::new),
segmented(DenseIntMap::new);

final IntFunction<IntMap<Integer>> factory;

Impl(IntFunction<IntMap<Integer>> factory) {
this.factory = factory;
}
}

@Param
public Impl impl;

@Param({"1024", "1000000"})
public int initialCapacity;

@Param({"1000000"})
public int totalKeys;

/** Shared map used by the "mixed" (pre-populated) benchmarks. */
private IntMap<Integer> prepopulated;

/** Monotonic counter used by insert benchmarks so threads never collide on keys. */
private final AtomicInteger insertCursor = new AtomicInteger();

/** The map written to by insert benchmarks. Replaced in each trial's setup. */
private IntMap<Integer> insertMap;

@Setup
public void setup() {
this.prepopulated = impl.factory.apply(initialCapacity);
for (int i = 0; i < totalKeys; i++) {
prepopulated.compareAndPut(i, null, i);
}
this.insertMap = impl.factory.apply(initialCapacity);
this.insertCursor.set(0);
}

/**
* Models the {@code addNode} insertion pressure during graph build: many threads inserting
* disjoint dense keys. Under the legacy design this path contended on the read lock whenever
* any writer happened to be resizing the backing array.
*/
@Benchmark
@Threads(8)
public boolean insertDense8(Blackhole bh) {
return doInsert(bh);
}

@Benchmark
@Threads(1)
public boolean insertDense1(Blackhole bh) {
return doInsert(bh);
}

private boolean doInsert(Blackhole bh) {
int key = insertCursor.getAndIncrement();
if (key >= totalKeys) {
// Bounded workload; replace the map once we've filled it to avoid unbounded memory growth.
synchronized (this) {
if (insertCursor.get() >= totalKeys) {
insertMap = impl.factory.apply(initialCapacity);
insertCursor.set(0);
}
}
key = insertCursor.getAndIncrement();
}
boolean ok = insertMap.compareAndPut(key, null, key);
bh.consume(ok);
return ok;
}

/**
* Models the steady-state {@code insertEdge}/{@code insertDiverse} CAS-update pattern on an
* already-built base layer: each thread reads then CAS-updates a random pre-populated key.
*/
@Benchmark
@Threads(8)
public boolean casUpdate8(Blackhole bh) {
return doCasUpdate(bh);
}

@Benchmark
@Threads(1)
public boolean casUpdate1(Blackhole bh) {
return doCasUpdate(bh);
}

private boolean doCasUpdate(Blackhole bh) {
int key = ThreadLocalRandom.current().nextInt(totalKeys);
Integer current = prepopulated.get(key);
boolean ok = prepopulated.compareAndPut(key, current, key + 1);
bh.consume(ok);
return ok;
}

/**
* Pure {@code get()} throughput under heavy read load — sanity check that the lock-free
* read path remains as fast as before (and ideally faster, since there is no RW-lock
* machinery to traverse).
*/
@Benchmark
@Threads(8)
public Integer getHot8() {
int key = ThreadLocalRandom.current().nextInt(totalKeys);
return prepopulated.get(key);
}

@Benchmark
@Threads(1)
public Integer getHot1() {
int key = ThreadLocalRandom.current().nextInt(totalKeys);
return prepopulated.get(key);
}

/**
* Mixed read/write workload approximating the graph-build steady state: 7 readers for each
* writer doing a CAS update. Uses JMH groups so both run against the same shared map.
*/
@Benchmark
@Group("mixed")
@GroupThreads(7)
public Integer mixedRead() {
int key = ThreadLocalRandom.current().nextInt(totalKeys);
return prepopulated.get(key);
}

@Benchmark
@Group("mixed")
@GroupThreads(1)
public boolean mixedWrite() {
int key = ThreadLocalRandom.current().nextInt(totalKeys);
Integer current = prepopulated.get(key);
return prepopulated.compareAndPut(key, current, key + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.github.jbellis.jvector.bench;

import io.github.jbellis.jvector.util.ArrayUtil;
import io.github.jbellis.jvector.util.IntMap;
import io.github.jbellis.jvector.util.RamUsageEstimator;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Verbatim copy of the previous {@code DenseIntMap} implementation (volatile
* {@link AtomicReferenceArray} + {@link ReentrantReadWriteLock} with Lucene-style
* {@code ArrayUtil.oversize} growth). Kept in the benchmarks module so
* {@link DenseIntMapConcurrentBenchmark} can measure the new segmented impl against
* the old one under identical conditions, without needing a separate checkout.
*/
public class LegacyDenseIntMap<T> implements IntMap<T> {
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private volatile AtomicReferenceArray<T> objects;
private final AtomicInteger size;

public LegacyDenseIntMap(int initialCapacity) {
objects = new AtomicReferenceArray<>(initialCapacity);
size = new AtomicInteger();
}

@Override
public boolean compareAndPut(int key, T existing, T value) {
if (value == null) {
throw new IllegalArgumentException("compareAndPut() value cannot be null -- use remove() instead");
}

ensureCapacity(key);
rwl.readLock().lock();
try {
var success = objects.compareAndSet(key, existing, value);
var isInsert = success && existing == null;
if (isInsert) {
size.incrementAndGet();
}
return success;
} finally {
rwl.readLock().unlock();
}
}

@Override
public int size() {
return size.get();
}

@Override
public T get(int key) {
if (key >= objects.length()) {
return null;
}
return objects.get(key);
}

private void ensureCapacity(int node) {
if (node < objects.length()) {
return;
}

rwl.writeLock().lock();
try {
var oldArray = objects;
if (node >= oldArray.length()) {
int newSize = ArrayUtil.oversize(node + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
var newArray = new AtomicReferenceArray<T>(newSize);
for (int i = 0; i < oldArray.length(); i++) {
newArray.set(i, oldArray.get(i));
}
objects = newArray;
}
} finally {
rwl.writeLock().unlock();
}
}

@Override
public T remove(int key) {
if (key >= objects.length()) {
return null;
}
var old = objects.get(key);
if (old == null) {
return null;
}

rwl.readLock().lock();
try {
if (objects.compareAndSet(key, old, null)) {
size.decrementAndGet();
return old;
} else {
return null;
}
} finally {
rwl.readLock().unlock();
}
}

@Override
public boolean containsKey(int key) {
return get(key) != null;
}

@Override
public void forEach(IntBiConsumer<T> consumer) {
var ref = objects;
for (int i = 0; i < ref.length(); i++) {
var value = get(i);
if (value != null) {
consumer.consume(i, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public class ConcurrentNeighborMap {
public final int maxOverflowDegree;

public ConcurrentNeighborMap(DiversityProvider diversityProvider, int maxDegree, int maxOverflowDegree) {
this(new DenseIntMap<>(1024), diversityProvider, maxDegree, maxOverflowDegree);
this(diversityProvider, maxDegree, maxOverflowDegree, 1024);
}

public ConcurrentNeighborMap(DiversityProvider diversityProvider, int maxDegree, int maxOverflowDegree, int initialCapacity) {
this(new DenseIntMap<>(initialCapacity), diversityProvider, maxDegree, maxOverflowDegree);
}

public <T> ConcurrentNeighborMap(IntMap<Neighbors> neighbors, DiversityProvider diversityProvider, int maxDegree, int maxOverflowDegree) {
Expand Down
Loading
Loading