Add batched SegmentZKMetadata iteration API and migrate controller usages#17706
Add batched SegmentZKMetadata iteration API and migrate controller usages#17706xiangfu0 wants to merge 1 commit intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17706 +/- ##
==========================================
Coverage 63.23% 63.24%
Complexity 1502 1502
==========================================
Files 3179 3181 +2
Lines 190710 190875 +165
Branches 29153 29171 +18
==========================================
+ Hits 120597 120720 +123
Misses 60746 60746
- Partials 9367 9409 +42
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
17da5ce to
2005659
Compare
| _pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType); | ||
|
|
||
| for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { | ||
| _pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> { |
There was a problem hiding this comment.
Won't this actually be slower due to more ZK point calls rather than earlier getChildren() calls ?
I think if use cases require all data, then earlier getChildren() calls is probably a better choice.
There was a problem hiding this comment.
For propertyStore, underlying it's also wrapping a for loop on top of zkclient to call each path to fetch the znRecord then construct the segment metadata.
So there is no difference in terms of the zk overhead.
GetChildren for all segment name is always just one zk call to fetch all.
2005659 to
32a77d3
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a batched/streaming API for iterating over SegmentZKMetadata to reduce memory pressure when processing large numbers of segments. The changes add a new forEachSegmentZKMetadata method in ZKMetadataProvider that retrieves segment metadata in configurable batches (default 1000) and applies a consumer function to each segment, avoiding the need to load all segment metadata into memory at once.
Changes:
- Added batched segment metadata iteration API (
forEachSegmentZKMetadata) in ZKMetadataProvider with configurable batch size - Migrated RetentionManager, OfflineSegmentValidationManager, RealtimeSegmentValidationManager, and other controller components from list-based to consumer-based iteration
- Added configuration support for retention manager batch size with dynamic updates via cluster config changes
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java | Added core forEachSegmentZKMetadata method with batched ZK reads |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java | Added wrapper methods for batched segment metadata iteration and migrated existing usages |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java | Added forwarding methods for batched segment metadata iteration |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java | Refactored to use batched iteration with fallback for existing test mocks |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java | Migrated all segment metadata processing to batched iteration with configurable batch size |
| pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java | Refactored to use single-pass consumer-based iteration with array-based accumulators |
| pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java | Migrated to batched iteration while maintaining existing functionality |
| pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java | Simplified to use consumer-based iteration for ZK metadata retrieval |
| pinot-controller/src/main/java/org/apache/pinot/controller/util/TableTierReader.java | Migrated to consumer-based iteration for tier details retrieval |
| pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java | Added configuration for segmentsZKMetadataBatchSize with default value of 1000 |
| pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java | Added mock helper for both old and new API methods to support test cases |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java | Contains debug code that should be removed |
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
Show resolved
Hide resolved
...lugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
Outdated
Show resolved
Hide resolved
473c632 to
031b0b7
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
In a lot of places, the code has to cache all SegmentZKMetadata for a table. I don't think the solution in this PR is good enough to reduce memory footprint. We should consider caching them so that we only maintain one copy within memory.
Within each controller, we should make it ZK listener based, and cache the SegmentZKMetadata for the tables managed by the controller
|
|
||
| List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); | ||
| List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); | ||
| _pinotHelixResourceManager.forEachSegmentsZKMetadata(realtimeTableName, segmentsZKMetadata::add); |
There was a problem hiding this comment.
This is not reducing memory footprint
| List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>(); | ||
| for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { | ||
| if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) { | ||
| boolean[] callbackInvoked = new boolean[1]; |
There was a problem hiding this comment.
Why do we need to track this?
| callbackInvoked[0] = true; | ||
| if (segmentsForTable.contains(segmentZKMetadata.getSegmentName()) | ||
| && (segmentMetadataFilter == null || segmentMetadataFilter.test(segmentZKMetadata))) { | ||
| selectedSegmentZKMetadataList.add(segmentZKMetadata); |
There was a problem hiding this comment.
This is not reducing memory footprint
031b0b7 to
8307d49
Compare
8307d49 to
751c271
Compare
Summary