Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -200,10 +201,43 @@ private void sendLookupRequest(
int destination, List<AbstractLookupQuery<?>> lookups, boolean insertIfNotExists) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();

// For insertIfNotExists batches, two lookups for the same key bytes in the same bucket
// within the same drain cycle would be sent to the server together. The server processes
// them sequentially under a row-level latch — the second lookup for the same key blocks
// on the latch held by the first, causing a deadlock within the single RPC handler.
// Fix: deduplicate by key bytes per bucket. The second lookup for an already-batched key
// chains its future to the first lookup's future so it still gets the correct result,
// but only one RPC entry is sent to the server.
Map<TableBucket, Map<ByteBuffer, LookupQuery>> inflightInsertKeys =
insertIfNotExists ? new HashMap<>() : null;

for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
LookupQuery lookup = (LookupQuery) abstractLookupQuery;
TableBucket tb = lookup.tableBucket();
long tableId = tb.getTableId();

if (insertIfNotExists) {
ByteBuffer keyBuf = ByteBuffer.wrap(lookup.key());
LookupQuery existing =
inflightInsertKeys.computeIfAbsent(tb, k -> new HashMap<>()).get(keyBuf);
if (existing != null) {
// Chain the duplicate's future to the first one's future.
// ByteBuffer.wrap shares the same key bytes — result is identical for both.
existing.future()
.whenComplete(
(result, err) -> {
if (err != null) {
lookup.future().completeExceptionally(err);
} else {
lookup.future().complete(result);
}
});
continue;
}
inflightInsertKeys.get(tb).put(keyBuf, lookup);
}

lookupByTableId
.computeIfAbsent(tableId, k -> new HashMap<>())
.computeIfAbsent(tb, k -> new LookupBatch(tb))
Expand Down
Loading