[ISSUE #10423] Optimize LiteConsumerLagCalculator.getLagCountTopK by deferring getStoreTimestamp to topK results only#10424
Conversation
061b5f2 to
634bca0
Compare
43a1259 to
21e215a
Compare
…pK by deferring getStoreTimestamp to topK results only
21e215a to
d46924f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10424 +/- ##
=============================================
- Coverage 48.05% 47.94% -0.11%
+ Complexity 13303 13276 -27
=============================================
Files 1377 1377
Lines 100611 100629 +18
Branches 12991 12994 +3
=============================================
- Hits 48347 48246 -101
- Misses 46348 46437 +89
- Partials 5916 5946 +30 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
oss-taishan-ai
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Optimizes LiteConsumerLagCalculator.getLagCountTopK by deferring expensive getStoreTimestamp calls to topK results only, lazy object creation, replacing split() with indexOf(), and replacing AtomicLong with long[].
Findings
-
[Correctness] ✅ Core optimizations are correct:
AtomicLong→long[]: Correct sinceoffsetTableForEachByGroupis single-threadedsplit()→indexOf()+substring(): Equivalent behavior, avoids array allocationisLiteTopicQueue(topicAtGroup)instead ofisLiteTopicQueue(topicGroup[0]): Correct becauseisLiteTopicQueueonly checksstartsWithprefix- Lazy
LiteLagInfocreation: Only instantiates when qualifying for heap insertion
-
[Correctness]
⚠️ Post-heaplmqNamereconstruction (lines 240-247):String parentTopic = LiteMetadataUtil.getLiteBindTopic(group, brokerController); for (LiteLagInfo lagInfo : topList) { String lmqName = LiteUtil.toLmqName(parentTopic, lagInfo.getLiteTopic()); lagInfo.setEarliestUnconsumedTimestamp(getStoreTimestamp(lmqName, consumerOffset)); }
This reconstructs
lmqNamefrom group config instead of using the original lmqName from the offset table. This works in practice because the only caller (LiteManagerProcessor.getLiteGroupInfo) validates the group andliteBindTopicbefore callinggetLagCountTopK. However, the method signature allowsgroupto be null, and ifgetLiteBindTopicreturns null,toLmqNamereturns null, causinggetStoreTimestamp(null, ...)to fail. Consider adding a null guard:if (parentTopic == null) { // Fall back to original behavior or skip timestamp lookup return Pair.of(topList, totalLagCount[0]); }
-
[Readability]
⚠️ Operator precedence (line 222):if (minHeap.size() < topK || minHeap.peek() != null && diff > minHeap.peek().getLagCount())
Java evaluates
&&before||, so this is parsed assize < topK || (peek != null && diff > peek.getLagCount()), which is correct. Adding explicit parentheses would improve readability and prevent future maintenance errors:if (minHeap.size() < topK || (minHeap.peek() != null && diff > minHeap.peek().getLagCount()))
-
[Performance] ✅ Excellent improvements:
- N → K
getStoreTimestampcalls (major I/O reduction for groups with thousands of lite topics) - N → K
LiteLagInfoallocations - No
String.split()array allocations per entry - No
AtomicLongCAS overhead
- N → K
-
[Tests] ✅ Test
testGetLagCountTopK_NormalCasecovers the new behavior including timestamp deferral and mocksSubscriptionGroupManagerforliteBindTopiclookup. -
[CI] ✅ All 10 checks pass (CodeQL, bazel-compile, maven-compile × 4, coverage, license, misspell).
Suggestions
- Add null guard for
parentTopicin the post-heap loop to handle edge cases defensively - Add explicit parentheses in the heap condition for readability
- Consider adding a comment explaining why
lmqNamereconstruction is safe (i.e., caller guarantees valid group withliteBindTopic)
Automated review by github-manager-bot
What is the purpose of the change
LiteConsumerLagCalculator.getLagCountTopKcurrently callsgetStoreTimestamp()(disk I/O) for every offset entry, creates aLiteLagInfofor each, and usesString.split()for key parsing. This PR optimizes these hotspots.Brief changelog
getStoreTimestampto topK results only: accumulateconsumerOffsetinLiteLagInfoduring traversal, compute timestamps for the final topK entries after heap filtering (N → K calls).LiteLagInfowhen the entry qualifies for heap insertion.split()withindexOf(): parsetopicAtGroupkey viaindexOf+substring, avoiding array allocation; check lite prefix before string extraction.AtomicLongwithlong[]: single-threaded traversal needs no atomic overhead.Verifying this change
Existing unit tests cover the changed methods.