Skip to content

OAK-12170 Offload Elastic async response processing from I/O thread to avoid inflated query time metrics#2828

Open
fabriziofortino wants to merge 1 commit intoapache:trunkfrom
fabriziofortino:OAK-12174
Open

OAK-12170 Offload Elastic async response processing from I/O thread to avoid inflated query time metrics#2828
fabriziofortino wants to merge 1 commit intoapache:trunkfrom
fabriziofortino:OAK-12174

Conversation

@fabriziofortino
Copy link
Copy Markdown
Contributor

@fabriziofortino fabriziofortino commented Apr 1, 2026

  • Offload Elasticsearch async response processing from the HTTP I/O thread to ForkJoinPool.commonPool() by replacing whenComplete with whenCompleteAsync
  • Capture responseTimeMs at callback entry and pass it through to onSuccess/onFailure so that ELASTIC_QUERY_TOTAL_TIME reflects the actual query round-trip time

Problem

The CompletableFuture returned by the Elasticsearch async client completes on the elasticsearch-rest-client I/O thread (Apache HttpAsyncClient I/O dispatcher). With whenComplete, the handleResponse callback — including onSuccess and its hit processing loop — runs inline on that same thread.

During hit processing, onSuccess iterates over search hits and calls SearchHitListener.on(hit) for each result. In ElasticResultRowAsyncIterator.on(), two operations can block the I/O thread:

  1. rowInclusionPredicate.test(path) — evaluates ACLs, which may require remote calls to the underlying persistence layer (e.g. MongoDB, segment store)
  2. queue.offer(resultRow, enqueueTimeoutMs, TimeUnit.MILLISECONDS) — blocks when the bounded result queue is full and the consumer is slow to drain it

Since the I/O dispatcher threads are shared across all HTTP connections, blocking one thread delays response reading and CompletableFuture completion for other concurrent Elasticsearch queries.
This causes ELASTIC_QUERY_TOTAL_TIME to include not just the actual query round-trip (ES server processing + network), but also the I/O thread scheduling delay caused by other queries' blocked callbacks.

Evidence: Adding LOG.info("handleResponse running on thread: {}", Thread.currentThread().getName()) to handleResponse confirmed the callback runs on elasticsearch-rest-client-1-thread-N, the I/O dispatcher threads.

Relevant documentation:

Fix

whenCompletewhenCompleteAsync: Response processing now runs on ForkJoinPool.commonPool() instead of the I/O dispatcher thread. This frees the I/O thread immediately after the future completes, preventing cross-query interference.

responseTimeMs parameter: System.currentTimeMillis() is captured at the entry of the whenCompleteAsync lambda and passed through handleResponse to onSuccess/onFailure. This ensures ELASTIC_QUERY_TOTAL_TIME measures the actual query round-trip (ES server time + network + response deserialization + minimal ForkJoinPool scheduling overhead) rather than including arbitrary I/O thread contention delays.

Risk assessment

  • Thread safety: The semaphore already ensures at most one in-flight request per scanner. searchStartTime is written before the async call and read in the callback; the CompletableFuture chain guarantees happens-before. No new thread-safety concerns.
  • ForkJoinPool blocking: queue.offer and rowInclusionPredicate.test can block, but queue.offer is bounded by enqueueTimeoutMs and the common pool can compensate with additional threads. This is far better than blocking the I/O dispatcher threads which also prevents socket reads for other connections.
  • Cancellation: ongoingRequest.cancel(true) behaves the same — it cancels the whenCompleteAsync stage. The isClosed check in handleResponse guards against processing after close, same as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant