OAK-12170 Offload Elastic async response processing from I/O thread to avoid inflated query time metrics#2828
Open
fabriziofortino wants to merge 1 commit intoapache:trunkfrom
Open
OAK-12170 Offload Elastic async response processing from I/O thread to avoid inflated query time metrics#2828fabriziofortino wants to merge 1 commit intoapache:trunkfrom
fabriziofortino wants to merge 1 commit intoapache:trunkfrom
Conversation
…o avoid inflated query time metrics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
ForkJoinPool.commonPool()by replacingwhenCompletewithwhenCompleteAsyncresponseTimeMsat callback entry and pass it through toonSuccess/onFailureso thatELASTIC_QUERY_TOTAL_TIMEreflects the actual query round-trip timeProblem
The
CompletableFuturereturned by the Elasticsearch async client completes on theelasticsearch-rest-clientI/O thread (Apache HttpAsyncClient I/O dispatcher). WithwhenComplete, thehandleResponsecallback — includingonSuccessand its hit processing loop — runs inline on that same thread.During hit processing,
onSuccessiterates over search hits and callsSearchHitListener.on(hit)for each result. InElasticResultRowAsyncIterator.on(), two operations can block the I/O thread:rowInclusionPredicate.test(path)— evaluates ACLs, which may require remote calls to the underlying persistence layer (e.g. MongoDB, segment store)queue.offer(resultRow, enqueueTimeoutMs, TimeUnit.MILLISECONDS)— blocks when the bounded result queue is full and the consumer is slow to drain itSince the I/O dispatcher threads are shared across all HTTP connections, blocking one thread delays response reading and
CompletableFuturecompletion for other concurrent Elasticsearch queries.This causes
ELASTIC_QUERY_TOTAL_TIMEto include not just the actual query round-trip (ES server processing + network), but also the I/O thread scheduling delay caused by other queries' blocked callbacks.Evidence: Adding
LOG.info("handleResponse running on thread: {}", Thread.currentThread().getName())tohandleResponseconfirmed the callback runs onelasticsearch-rest-client-1-thread-N, the I/O dispatcher threads.Relevant documentation:
Fix
whenComplete→whenCompleteAsync: Response processing now runs onForkJoinPool.commonPool()instead of the I/O dispatcher thread. This frees the I/O thread immediately after the future completes, preventing cross-query interference.responseTimeMsparameter:System.currentTimeMillis()is captured at the entry of thewhenCompleteAsynclambda and passed throughhandleResponsetoonSuccess/onFailure. This ensuresELASTIC_QUERY_TOTAL_TIMEmeasures the actual query round-trip (ES server time + network + response deserialization + minimal ForkJoinPool scheduling overhead) rather than including arbitrary I/O thread contention delays.Risk assessment
searchStartTimeis written before the async call and read in the callback; theCompletableFuturechain guarantees happens-before. No new thread-safety concerns.ForkJoinPoolblocking:queue.offerandrowInclusionPredicate.testcan block, butqueue.offeris bounded byenqueueTimeoutMsand the common pool can compensate with additional threads. This is far better than blocking the I/O dispatcher threads which also prevents socket reads for other connections.ongoingRequest.cancel(true)behaves the same — it cancels thewhenCompleteAsyncstage. TheisClosedcheck inhandleResponseguards against processing after close, same as before.