Skip to content

[client][fix] Deduplicate insertIfNotExists lookups with same key in same batch to prevent deadlock#2907

Open
Prajwal-banakar wants to merge 1 commit intoapache:mainfrom
Prajwal-banakar:Unstable-test-Flink119TableSourceITCase
Open

[client][fix] Deduplicate insertIfNotExists lookups with same key in same batch to prevent deadlock#2907
Prajwal-banakar wants to merge 1 commit intoapache:mainfrom
Prajwal-banakar:Unstable-test-Flink119TableSourceITCase

Conversation

@Prajwal-banakar
Copy link
Contributor

Purpose

Linked issue: close #2874

When lookup.insert-if-not-exists=true and lookup.async=true, multiple lookups for the same key can land in the same LookupBatch within a single drain cycle. The server acquires a row-level latch per key during insert —
two entries for the same key in one RPC batch cause a deadlock in the server-side handler, leaving LookupQuery futures never completed and hanging the AsyncWaitOperator indefinitely, resulting in a 60s timeout.

This did not affect async=false because sync lookups execute sequentially via .get(), so the same key never appears twice in one drain cycle.

Brief change log

  • In LookupSender.sendLookupRequest, when insertIfNotExists=true, duplicate lookups for the same key bytes within the same batch are detected using ByteBuffer key comparison. The duplicate's future is chained to the first lookup's future instead of being added to the RPC batch, ensuring only one server-side entry is sent per unique key per drain cycle.

Tests

  • Flink119TableSourceITCase#testLookupInsertIfNotExists (existing test, previously flaky with async=true, now passes consistently with -Dsurefire.rerunFailingTestsCount=5)

API and Format

No API or storage format changes.

Documentation

No new feature introduced. Bug fix only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[test] Unstable test Flink119TableSourceITCase.testLookupInsertIfNotExists

1 participant