diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java index e0d83784b2..30a7c2dee4 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -200,10 +201,43 @@ private void sendLookupRequest( int destination, List> lookups, boolean insertIfNotExists) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); + + // For insertIfNotExists batches, two lookups for the same key bytes in the same bucket + // within the same drain cycle would be sent to the server together. The server processes + // them sequentially under a row-level latch — the second lookup for the same key blocks + // on the latch held by the first, causing a deadlock within the single RPC handler. + // Fix: deduplicate by key bytes per bucket. The second lookup for an already-batched key + // chains its future to the first lookup's future so it still gets the correct result, + // but only one RPC entry is sent to the server. + Map> inflightInsertKeys = + insertIfNotExists ? new HashMap<>() : null; + for (AbstractLookupQuery abstractLookupQuery : lookups) { LookupQuery lookup = (LookupQuery) abstractLookupQuery; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); + + if (insertIfNotExists) { + ByteBuffer keyBuf = ByteBuffer.wrap(lookup.key()); + LookupQuery existing = + inflightInsertKeys.computeIfAbsent(tb, k -> new HashMap<>()).get(keyBuf); + if (existing != null) { + // Chain the duplicate's future to the first one's future. + // ByteBuffer.wrap shares the same key bytes — result is identical for both. + existing.future() + .whenComplete( + (result, err) -> { + if (err != null) { + lookup.future().completeExceptionally(err); + } else { + lookup.future().complete(result); + } + }); + continue; + } + inflightInsertKeys.get(tb).put(keyBuf, lookup); + } + lookupByTableId .computeIfAbsent(tableId, k -> new HashMap<>()) .computeIfAbsent(tb, k -> new LookupBatch(tb))